/** * API Connection Runner — fetches live data from external APIs defined in the * apiConnections table and stores the result as a knowledge entry. */ import { getDb } from "./db"; import { apiConnections, knowledgeEntries } from "../drizzle/schema"; import { eq, and } from "drizzle-orm"; export interface RunResult { success: boolean; connectionId: number; rowsStored?: number; error?: string; } /** * Execute a single API connection, store the response in knowledgeEntries, * and update lastExecutedAt + executionCount on the connection record. */ export async function runApiConnection(connectionId: number): Promise { const db = await getDb(); if (!db) return { success: false, connectionId, error: "Database not available" }; const [conn] = await db.select().from(apiConnections) .where(eq(apiConnections.id, connectionId)); if (!conn) return { success: false, connectionId, error: "Connection not found" }; if (!conn.isActive) return { success: false, connectionId, error: "Connection is inactive" }; // Build request headers const headers: Record = { "Content-Type": "application/json" }; if (conn.headers && typeof conn.headers === "object") { Object.assign(headers, conn.headers); } // Build request body (POST/PUT only) const hasBody = conn.method === "POST" || conn.method === "PUT"; const body = hasBody ? JSON.stringify(conn.testPayload ?? {}) : undefined; let rawData: unknown; try { const res = await fetch(conn.endpoint, { method: conn.method, headers, body }); if (!res.ok) { throw new Error(`HTTP ${res.status} ${res.statusText}`); } rawData = await res.json().catch(() => res.text()); } catch (err: unknown) { const msg = err instanceof Error ? err.message : String(err); return { success: false, connectionId, error: `Fetch failed: ${msg}` }; } // Apply output variable mapping to extract relevant fields const extracted = applyOutputMapping(rawData, conn.outputVariables as Record | null); // Persist as a knowledge entry (upsert by connection id stored in metadata) const content = JSON.stringify(extracted, null, 2); const title = `API: ${conn.name}`; const existing = await db.select({ id: knowledgeEntries.id }) .from(knowledgeEntries) .where(eq(knowledgeEntries.source, "api")) .limit(1); if (existing.length > 0) { await db.update(knowledgeEntries) .set({ answer: content, updatedAt: new Date() }) .where(eq(knowledgeEntries.id, existing[0].id)); } else { await db.insert(knowledgeEntries).values({ question: title, answer: content, category: conn.category ?? "api", source: "api", }); } // Update connection metadata await db.update(apiConnections) .set({ lastExecutedAt: new Date(), executionCount: (conn.executionCount ?? 0) + 1, }) .where(eq(apiConnections.id, connectionId)); return { success: true, connectionId, rowsStored: 1 }; } /** * Apply dot-path output variable mappings to extract fields from the response. * outputVariables format: { "fieldName": "path.to.value" } * If no mappings defined, returns the raw data as-is. */ function applyOutputMapping( data: unknown, mappings: Record | null ): unknown { if (!mappings || Object.keys(mappings).length === 0) return data; const result: Record = {}; for (const [key, path] of Object.entries(mappings)) { result[key] = resolvePath(data, path); } return result; } function resolvePath(obj: unknown, path: string): unknown { return path.split(".").reduce((cur: unknown, key) => { if (cur && typeof cur === "object") return (cur as Record)[key]; return undefined; }, obj); }