import { eq, desc, asc, and, sql, like, or, lt, gte, lte, isNotNull } from "drizzle-orm"; import { drizzle } from "drizzle-orm/postgres-js"; import { InsertUser, users, conversations, InsertConversation, Conversation, messages, InsertMessage, workflowNodes, InsertWorkflowNode, workflowEdges, InsertWorkflowEdge, workflowSuggestions, InsertWorkflowSuggestion, invitations, InsertInvitation, auditLogs, InsertAuditLog, passwordResetTokens, InsertPasswordResetToken, analyticsEvents, InsertAnalyticsEvent, dataSources, InsertDataSource, apiConnections, InsertApiConnection, } from "../drizzle/schema"; import { ENV } from './_core/env'; let _db: ReturnType | null = null; export async function getDb() { if (!_db && process.env.DATABASE_URL) { try { _db = drizzle(process.env.DATABASE_URL); } catch (error) { console.warn("[Database] Failed to connect:", error); _db = null; } } return _db; } /* ─── User helpers ─── */ export async function upsertUser(user: InsertUser): Promise { if (!user.openId) { throw new Error("User openId is required for upsert"); } const db = await getDb(); if (!db) { console.warn("[Database] Cannot upsert user: database not available"); return; } try { const values: InsertUser = { openId: user.openId }; const updateSet: Record = {}; const textFields = ["name", "email", "loginMethod"] as const; type TextField = (typeof textFields)[number]; const assignNullable = (field: TextField) => { const value = user[field]; if (value === undefined) return; const normalized = value ?? null; values[field] = normalized; updateSet[field] = normalized; }; textFields.forEach(assignNullable); if (user.lastSignedIn !== undefined) { values.lastSignedIn = user.lastSignedIn; updateSet.lastSignedIn = user.lastSignedIn; } if (user.role !== undefined) { values.role = user.role; updateSet.role = user.role; } else if (user.openId === ENV.ownerOpenId) { values.role = 'admin'; updateSet.role = 'admin'; } if (!values.lastSignedIn) { values.lastSignedIn = new Date(); } if (Object.keys(updateSet).length === 0) { updateSet.lastSignedIn = new Date(); } await db.insert(users).values(values).onDuplicateKeyUpdate({ set: updateSet }); } catch (error) { console.error("[Database] Failed to upsert user:", error); throw error; } } export async function getUserByOpenId(openId: string) { const db = await getDb(); if (!db) { console.warn("[Database] Cannot get user: database not available"); return undefined; } const result = await db.select().from(users).where(eq(users.openId, openId)).limit(1); return result.length > 0 ? result[0] : undefined; } /* ─── User management helpers ─── */ export async function getAllUsers() { const db = await getDb(); if (!db) return []; return db.select({ id: users.id, openId: users.openId, name: users.name, email: users.email, role: users.role, createdAt: users.createdAt, lastSignedIn: users.lastSignedIn, }).from(users).orderBy(desc(users.lastSignedIn)); } export async function updateUserRole(userId: number, role: "user" | "agent" | "admin") { const db = await getDb(); if (!db) throw new Error("Database not available"); await db.update(users).set({ role }).where(eq(users.id, userId)); const result = await db.select().from(users).where(eq(users.id, userId)).limit(1); return result[0]; } export async function getUserById(userId: number) { const db = await getDb(); if (!db) return undefined; const result = await db.select().from(users).where(eq(users.id, userId)).limit(1); return result[0]; } export async function deleteUser(userId: number) { const db = await getDb(); if (!db) throw new Error("Database not available"); // Get user before deleting const user = await getUserById(userId); if (!user) return null; await db.delete(users).where(eq(users.id, userId)); return user; } export async function getUserByEmail(email: string) { const db = await getDb(); if (!db) return undefined; const result = await db.select().from(users).where(eq(users.email, email)).limit(1); return result[0]; } export async function getUserByEmailWithPassword(email: string) { const db = await getDb(); if (!db) return undefined; const result = await db.select().from(users).where(eq(users.email, email)).limit(1); return result[0]; // includes passwordHash } export async function createUserWithPassword(data: { email: string; name: string; passwordHash: string; role?: "user" | "agent" | "admin"; }) { const db = await getDb(); if (!db) throw new Error("Database not available"); const openId = `local_${data.email}`; // local users get a synthetic openId await db.insert(users).values({ openId, email: data.email, name: data.name, passwordHash: data.passwordHash, loginMethod: "email", role: data.role || "user", lastSignedIn: new Date(), }); const result = await db.select().from(users).where(eq(users.openId, openId)).limit(1); return result[0]; } export async function updateUserPassword(userId: number, passwordHash: string) { const db = await getDb(); if (!db) throw new Error("Database not available"); await db.update(users).set({ passwordHash }).where(eq(users.id, userId)); } /* ─── Password reset token helpers ─── */ export async function createPasswordResetToken(data: InsertPasswordResetToken) { const db = await getDb(); if (!db) throw new Error("Database not available"); await db.insert(passwordResetTokens).values(data); const result = await db.select().from(passwordResetTokens).where(eq(passwordResetTokens.token, data.token)).limit(1); return result[0]; } export async function getPasswordResetToken(token: string) { const db = await getDb(); if (!db) return undefined; const result = await db.select().from(passwordResetTokens).where(eq(passwordResetTokens.token, token)).limit(1); return result[0]; } export async function markPasswordResetTokenUsed(tokenId: number) { const db = await getDb(); if (!db) throw new Error("Database not available"); await db.update(passwordResetTokens).set({ usedAt: new Date() }).where(eq(passwordResetTokens.id, tokenId)); } /* ─── Invitation helpers ─── */ export async function createInvitation(data: InsertInvitation) { const db = await getDb(); if (!db) throw new Error("Database not available"); await db.insert(invitations).values(data); const result = await db.select().from(invitations).where(eq(invitations.token, data.token)).limit(1); return result[0]; } export async function getAllInvitations() { const db = await getDb(); if (!db) return []; return db.select().from(invitations).orderBy(desc(invitations.createdAt)); } export async function getInvitationByToken(token: string) { const db = await getDb(); if (!db) return undefined; const result = await db.select().from(invitations).where(eq(invitations.token, token)).limit(1); return result[0]; } export async function getInvitationByEmail(email: string) { const db = await getDb(); if (!db) return []; return db.select().from(invitations) .where(eq(invitations.email, email)) .orderBy(desc(invitations.createdAt)); } export async function updateInvitationStatus( id: number, status: "pending" | "accepted" | "expired" | "revoked", acceptedByUserId?: number, ) { const db = await getDb(); if (!db) throw new Error("Database not available"); const updateData: Record = { status }; if (status === "accepted") { updateData.acceptedAt = new Date(); if (acceptedByUserId) updateData.acceptedByUserId = acceptedByUserId; } await db.update(invitations).set(updateData).where(eq(invitations.id, id)); const result = await db.select().from(invitations).where(eq(invitations.id, id)).limit(1); return result[0]; } export async function expireOldInvitations() { const db = await getDb(); if (!db) return 0; const result = await db.update(invitations) .set({ status: "expired" }) .where( and( eq(invitations.status, "pending"), lt(invitations.expiresAt, new Date()), ) ); return result[0]?.affectedRows || 0; } /* ─── Audit log helpers ─── */ export async function createAuditLog(data: InsertAuditLog) { const db = await getDb(); if (!db) { console.warn("[Database] Cannot create audit log: database not available"); return; } await db.insert(auditLogs).values(data); } export async function getAuditLogs(limit = 50) { const db = await getDb(); if (!db) return []; return db.select().from(auditLogs).orderBy(desc(auditLogs.createdAt)).limit(limit); } /* ─── Conversation helpers ─── */ export async function createConversation(data: InsertConversation) { const db = await getDb(); if (!db) throw new Error("Database not available"); await db.insert(conversations).values(data); const result = await db.select().from(conversations).where(eq(conversations.sessionId, data.sessionId)).limit(1); return result[0]; } export async function getConversations(status?: string) { const db = await getDb(); if (!db) return []; if (status) { return db.select().from(conversations) .where(eq(conversations.status, status as any)) .orderBy(desc(conversations.updatedAt)); } return db.select().from(conversations).orderBy(desc(conversations.updatedAt)); } /** Advanced conversation query with pagination, search, agent filter, date range, and sorting */ export async function getConversationsAdvanced(params: { page?: number; pageSize?: number; status?: string; search?: string; agentId?: number; dateFrom?: string; dateTo?: string; sortBy?: string; sortOrder?: "asc" | "desc"; }) { const db = await getDb(); if (!db) return { conversations: [], total: 0, page: 1, pageSize: 20, totalPages: 0 }; const page = params.page || 1; const pageSize = params.pageSize || 20; const offset = (page - 1) * pageSize; // Build conditions const conditions = []; if (params.status) { conditions.push(eq(conversations.status, params.status as any)); } if (params.agentId) { conditions.push(eq(conversations.assignedAgentId, params.agentId)); } if (params.search) { const searchTerm = `%${params.search}%`; conditions.push( or( like(conversations.visitorName, searchTerm), like(conversations.visitorEmail, searchTerm), like(conversations.sessionId, searchTerm), like(conversations.customerId, searchTerm), like(conversations.salesRep, searchTerm), )! ); } if (params.dateFrom) { conditions.push(gte(conversations.createdAt, new Date(params.dateFrom))); } if (params.dateTo) { const endDate = new Date(params.dateTo); endDate.setHours(23, 59, 59, 999); conditions.push(lte(conversations.createdAt, endDate)); } const whereClause = conditions.length > 0 ? and(...conditions) : undefined; // Sort const sortFn = params.sortOrder === "asc" ? asc : desc; let orderByCol; switch (params.sortBy) { case "created": orderByCol = sortFn(conversations.createdAt); break; case "visitor": orderByCol = sortFn(conversations.visitorName); break; case "status": orderByCol = sortFn(conversations.status); break; case "customerId": orderByCol = sortFn(conversations.customerId); break; case "salesRep": orderByCol = sortFn(conversations.salesRep); break; case "agent": orderByCol = sortFn(users.name); break; default: orderByCol = sortFn(conversations.updatedAt); break; } // Count const countResult = whereClause ? await db.select({ count: sql`COUNT(*)` }).from(conversations).where(whereClause) : await db.select({ count: sql`COUNT(*)` }).from(conversations); const total = Number(countResult[0]?.count || 0); const totalPages = Math.ceil(total / pageSize); // Fetch page with agent name join const baseQuery = db.select({ id: conversations.id, sessionId: conversations.sessionId, visitorName: conversations.visitorName, visitorEmail: conversations.visitorEmail, customerId: conversations.customerId, salesRep: conversations.salesRep, status: conversations.status, assignedAgentId: conversations.assignedAgentId, metadata: conversations.metadata, createdAt: conversations.createdAt, updatedAt: conversations.updatedAt, agentName: users.name, }) .from(conversations) .leftJoin(users, eq(conversations.assignedAgentId, users.id)); const rows = whereClause ? await baseQuery.where(whereClause).orderBy(orderByCol).limit(pageSize).offset(offset) : await baseQuery.orderBy(orderByCol).limit(pageSize).offset(offset); return { conversations: rows, total, page, pageSize, totalPages }; } /** Get all agents (users with agent or admin role) for filter dropdown */ export async function getAgentUsers() { const db = await getDb(); if (!db) return []; return db.select({ id: users.id, name: users.name, email: users.email, role: users.role, }).from(users).where( or( eq(users.role, "agent"), eq(users.role, "admin"), )! ).orderBy(users.name); } /** Get message count per conversation */ export async function getConversationMessageCounts(conversationIds: number[]) { const db = await getDb(); if (!db || conversationIds.length === 0) return {}; const result = await db.select({ conversationId: messages.conversationId, count: sql`COUNT(*)`, }).from(messages) .where(sql`${messages.conversationId} IN (${sql.join(conversationIds.map(id => sql`${id}`), sql`, `)})`) .groupBy(messages.conversationId); const counts: Record = {}; for (const row of result) { counts[row.conversationId] = Number(row.count); } return counts; } /** Bulk update conversation status */ export async function bulkUpdateConversationStatus( ids: number[], status: "active" | "escalated" | "resolved" | "closed", agentId?: number ) { const db = await getDb(); if (!db) throw new Error("Database not available"); const updateData: Record = { status }; if (agentId !== undefined) updateData.assignedAgentId = agentId; for (const id of ids) { await db.update(conversations).set(updateData).where(eq(conversations.id, id)); } return { updated: ids.length }; } /** Delete conversations */ export async function deleteConversations(ids: number[]) { const db = await getDb(); if (!db) throw new Error("Database not available"); for (const id of ids) { await db.delete(messages).where(eq(messages.conversationId, id)); await db.delete(conversations).where(eq(conversations.id, id)); } return { deleted: ids.length }; } export async function getConversationById(id: number) { const db = await getDb(); if (!db) return undefined; const result = await db.select().from(conversations).where(eq(conversations.id, id)).limit(1); return result[0]; } export async function getConversationBySessionId(sessionId: string) { const db = await getDb(); if (!db) return undefined; const result = await db.select().from(conversations).where(eq(conversations.sessionId, sessionId)).limit(1); return result[0]; } export async function updateConversationStatus(id: number, status: "active" | "escalated" | "resolved" | "closed", agentId?: number) { const db = await getDb(); if (!db) throw new Error("Database not available"); const updateData: Record = { status }; if (agentId !== undefined) updateData.assignedAgentId = agentId; await db.update(conversations).set(updateData).where(eq(conversations.id, id)); return getConversationById(id); } export async function getConversationStats() { const db = await getDb(); if (!db) return { total: 0, active: 0, escalated: 0, resolved: 0, closed: 0 }; const result = await db.select({ status: conversations.status, count: sql`COUNT(*)`, }).from(conversations).groupBy(conversations.status); const stats = { total: 0, active: 0, escalated: 0, resolved: 0, closed: 0 }; for (const row of result) { const count = Number(row.count); stats[row.status as keyof typeof stats] = count; stats.total += count; } return stats; } /* ─── Message helpers ─── */ export async function addMessage(data: InsertMessage) { const db = await getDb(); if (!db) throw new Error("Database not available"); const result = await db.insert(messages).values(data); await db.update(conversations).set({ updatedAt: new Date() }).where(eq(conversations.id, data.conversationId)); return { id: Number(result[0].insertId), ...data }; } export async function getMessagesByConversation(conversationId: number) { const db = await getDb(); if (!db) return []; return db.select().from(messages) .where(eq(messages.conversationId, conversationId)) .orderBy(messages.createdAt); } /* ─── Workflow helpers ─── */ export async function saveWorkflow(workflowId: string, nodes: InsertWorkflowNode[], edges: InsertWorkflowEdge[]) { const db = await getDb(); if (!db) throw new Error("Database not available"); await db.delete(workflowEdges).where(eq(workflowEdges.workflowId, workflowId)); await db.delete(workflowNodes).where(eq(workflowNodes.workflowId, workflowId)); if (nodes.length > 0) await db.insert(workflowNodes).values(nodes); if (edges.length > 0) await db.insert(workflowEdges).values(edges); return { workflowId, nodeCount: nodes.length, edgeCount: edges.length }; } export async function getWorkflow(workflowId: string) { const db = await getDb(); if (!db) return { nodes: [], edges: [] }; const nodes = await db.select().from(workflowNodes).where(eq(workflowNodes.workflowId, workflowId)); const edges = await db.select().from(workflowEdges).where(eq(workflowEdges.workflowId, workflowId)); return { nodes, edges }; } /* ─── Workflow suggestion helpers ─── */ export async function createWorkflowSuggestion(data: InsertWorkflowSuggestion) { const db = await getDb(); if (!db) throw new Error("Database not available"); await db.insert(workflowSuggestions).values(data); return data; } export async function getWorkflowSuggestions(workflowId: string, status?: string) { const db = await getDb(); if (!db) return []; const conditions = [eq(workflowSuggestions.workflowId, workflowId)]; if (status) conditions.push(eq(workflowSuggestions.status, status as any)); return db.select().from(workflowSuggestions).where(and(...conditions)).orderBy(desc(workflowSuggestions.frequency)); } export async function updateWorkflowSuggestionStatus(id: number, status: "approved" | "declined" | "waiting", reviewedById: number) { const db = await getDb(); if (!db) throw new Error("Database not available"); await db.update(workflowSuggestions) .set({ status, reviewedById, reviewedAt: new Date() }) .where(eq(workflowSuggestions.id, id)); return { id, status }; } export async function bulkCreateWorkflowSuggestions(suggestions: InsertWorkflowSuggestion[]) { const db = await getDb(); if (!db) throw new Error("Database not available"); if (suggestions.length === 0) return { created: 0 }; await db.insert(workflowSuggestions).values(suggestions); return { created: suggestions.length }; } /* ─── Analytics helpers ─── */ export async function trackAnalyticsEvent(event: Omit) { const db = await getDb(); if (!db) return null; const [result] = await db.insert(analyticsEvents).values(event as any); return result.insertId; } export async function getAnalyticsEvents(filters?: { eventType?: string; category?: string; startDate?: Date; endDate?: Date; }) { const db = await getDb(); if (!db) return []; const conditions: any[] = []; if (filters?.eventType) conditions.push(eq(analyticsEvents.eventType, filters.eventType as any)); if (filters?.category) conditions.push(eq(analyticsEvents.category, filters.category)); if (filters?.startDate) conditions.push(gte(analyticsEvents.createdAt, filters.startDate)); if (filters?.endDate) conditions.push(lte(analyticsEvents.createdAt, filters.endDate)); const query = db.select().from(analyticsEvents); if (conditions.length > 0) { return query.where(and(...conditions)).orderBy(desc(analyticsEvents.createdAt)).limit(1000); } return query.orderBy(desc(analyticsEvents.createdAt)).limit(1000); } export async function getAnalyticsSummary(startDate?: Date, endDate?: Date) { const db = await getDb(); if (!db) return null; const conditions: any[] = []; if (startDate) conditions.push(gte(analyticsEvents.createdAt, startDate)); if (endDate) conditions.push(lte(analyticsEvents.createdAt, endDate)); const baseQuery = conditions.length > 0 ? db.select().from(analyticsEvents).where(and(...conditions)) : db.select().from(analyticsEvents); const allEvents = await baseQuery; const totalSessions = allEvents.filter(e => e.eventType === "session_start").length; const resolvedByBot = allEvents.filter(e => e.eventType === "resolved_by_bot").length; const resolvedByAgent = allEvents.filter(e => e.eventType === "resolved_by_agent").length; const escalated = allEvents.filter(e => e.eventType === "escalated").length; const abandoned = allEvents.filter(e => e.eventType === "abandoned").length; const messagesSent = allEvents.filter(e => e.eventType === "message_sent").length; const messagesReceived = allEvents.filter(e => e.eventType === "message_received").length; const buttonClicks = allEvents.filter(e => e.eventType === "button_clicked").length; const positiveFeedback = allEvents.filter(e => e.eventType === "feedback_positive").length; const negativeFeedback = allEvents.filter(e => e.eventType === "feedback_negative").length; // Category breakdown const categories = ["orders", "shipping", "returning", "cancelling"]; const categoryBreakdown = categories.map(cat => ({ category: cat, count: allEvents.filter(e => e.category === cat).length, resolved: allEvents.filter(e => e.category === cat && (e.eventType === "resolved_by_bot" || e.eventType === "resolved_by_agent")).length, })); return { totalSessions, resolvedByBot, resolvedByAgent, escalated, abandoned, messagesSent, messagesReceived, buttonClicks, positiveFeedback, negativeFeedback, resolutionRate: totalSessions > 0 ? Math.round(((resolvedByBot + resolvedByAgent) / totalSessions) * 100) : 0, botResolutionRate: totalSessions > 0 ? Math.round((resolvedByBot / totalSessions) * 100) : 0, categoryBreakdown, totalEvents: allEvents.length, }; } /* ─── Data Sources helpers ─── */ export async function createDataSource(source: Omit) { const db = await getDb(); if (!db) return null; const [result] = await db.insert(dataSources).values(source as any); return result.insertId; } export async function getDataSources() { const db = await getDb(); if (!db) return []; return db.select().from(dataSources).orderBy(desc(dataSources.createdAt)); } export async function getDataSourceById(id: number) { const db = await getDb(); if (!db) return null; const [source] = await db.select().from(dataSources).where(eq(dataSources.id, id)); return source || null; } export async function updateDataSource(id: number, updates: Partial) { const db = await getDb(); if (!db) return null; await db.update(dataSources).set(updates as any).where(eq(dataSources.id, id)); return getDataSourceById(id); } export async function deleteDataSource(id: number) { const db = await getDb(); if (!db) return; await db.delete(dataSources).where(eq(dataSources.id, id)); } /* ─── API Connections helpers ─── */ export async function createApiConnection(conn: Omit) { const db = await getDb(); if (!db) return null; const [result] = await db.insert(apiConnections).values(conn as any); return result.insertId; } export async function getApiConnections() { const db = await getDb(); if (!db) return []; return db.select().from(apiConnections).orderBy(desc(apiConnections.createdAt)); } export async function getApiConnectionById(id: number) { const db = await getDb(); if (!db) return null; const [conn] = await db.select().from(apiConnections).where(eq(apiConnections.id, id)); return conn || null; } export async function updateApiConnection(id: number, updates: Partial) { const db = await getDb(); if (!db) return null; await db.update(apiConnections).set(updates as any).where(eq(apiConnections.id, id)); return getApiConnectionById(id); } export async function deleteApiConnection(id: number) { const db = await getDb(); if (!db) return; await db.delete(apiConnections).where(eq(apiConnections.id, id)); } export async function incrementApiConnectionExecution(id: number) { const db = await getDb(); if (!db) return; const conn = await getApiConnectionById(id); if (conn) { await db.update(apiConnections).set({ executionCount: conn.executionCount + 1, lastExecutedAt: new Date(), } as any).where(eq(apiConnections.id, id)); } }