""" Homelegance ERP Bridge — FastAPI service Calls chatbot_api.* stored functions in the ERP PostgreSQL database. Listens on 127.0.0.1:8080 (internal only, never exposed to the internet). Permission model (passed by Node.js via request headers): X-User-Role : "admin" | "agent" | "user" X-Customer-CID : ERP ContactID (required when role == "user") admin / agent → calls *_staff function variants — sees all customers user (dealer) → calls *_dealer function variants — CID enforced at DB level Column selection is handled entirely by the chatbot_api.* functions in PostgreSQL. The dealer variants have p_customer_cid as a mandatory parameter with no default, making it physically impossible to return another customer's data even if this application code is compromised. """ from __future__ import annotations import json import logging import os from contextlib import asynccontextmanager from dataclasses import dataclass from typing import Any import asyncpg from dotenv import load_dotenv from fastapi import Depends, FastAPI, Header, HTTPException, Path from pydantic import BaseModel, Field # ────────────────────────────────────────────────────────────────────────────── # Config # ────────────────────────────────────────────────────────────────────────────── load_dotenv() ERP_DATABASE_URL: str = os.environ["ERP_DATABASE_URL"] ERP_API_KEY: str = os.environ["ERP_API_KEY"] PORT: int = int(os.getenv("PORT", "8080")) logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") log = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────────────────────── # DB Pool (lifespan) # ────────────────────────────────────────────────────────────────────────────── DB_POOL: asyncpg.Pool | None = None @asynccontextmanager async def lifespan(app: FastAPI): global DB_POOL log.info("Connecting to ERP database …") DB_POOL = await asyncpg.create_pool( ERP_DATABASE_URL, min_size=2, max_size=10, command_timeout=30, statement_cache_size=0, # required for pgBouncer transaction mode ) log.info("ERP database pool ready.") yield if DB_POOL: await DB_POOL.close() log.info("ERP database pool closed.") app = FastAPI( title="Homelegance ERP Bridge", version="3.0.0", docs_url=None, # disable Swagger UI in production redoc_url=None, lifespan=lifespan, ) # ────────────────────────────────────────────────────────────────────────────── # Auth & permission dependencies # ────────────────────────────────────────────────────────────────────────────── def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")) -> None: if x_api_key != ERP_API_KEY: raise HTTPException(status_code=401, detail="Invalid API key") @dataclass class UserCtx: role: str # "admin" | "agent" | "user" customer_cid: str | None # ERP ContactID; required when role == "user" @property def is_staff(self) -> bool: return self.role in ("admin", "agent") @property def is_dealer(self) -> bool: return self.role == "user" def get_user_ctx( x_user_role: str = Header("admin", alias="X-User-Role"), x_customer_cid: str | None = Header(None, alias="X-Customer-CID"), ) -> UserCtx: role = x_user_role.lower().strip() if role not in ("admin", "agent", "user"): role = "admin" # safe default for unknown values if role == "user" and not x_customer_cid: raise HTTPException( status_code=403, detail="X-Customer-CID header required for dealer (user) role", ) return UserCtx(role=role, customer_cid=x_customer_cid) # ────────────────────────────────────────────────────────────────────────────── # DB call helpers # ────────────────────────────────────────────────────────────────────────────── async def call_list(sql: str, *args: Any) -> list[dict]: """Call a chatbot_api function that returns a JSON array.""" assert DB_POOL is not None row = await DB_POOL.fetchrow(sql, *args) if row is None or row[0] is None: return [] data = json.loads(row[0]) if isinstance(row[0], str) else row[0] return data if isinstance(data, list) else [] async def call_single(sql: str, *args: Any) -> dict | None: """Call a chatbot_api function that returns a single JSON object (or NULL).""" assert DB_POOL is not None row = await DB_POOL.fetchrow(sql, *args) if row is None or row[0] is None: return None data = json.loads(row[0]) if isinstance(row[0], str) else row[0] return data if isinstance(data, dict) else None # ────────────────────────────────────────────────────────────────────────────── # Request models # ────────────────────────────────────────────────────────────────────────────── class CatalogRequest(BaseModel): model: str | None = Field(None, description="Partial model number (ILIKE)") description: str | None = Field(None, description="Description keyword") # manufacturer and category are accepted for API compatibility but are # combined into the keyword search — chatbot_api.catalog_search searches # model + description fields only manufacturer: str | None = Field(None, description="Manufacturer (combined into keyword)") category: str | None = Field(None, description="Category (combined into keyword)") status: str | None = Field(None, description="Item status (not filtered at DB level)") limit: int = Field(20, ge=1, le=200) class ContactsRequest(BaseModel): contact_id: str | None = Field(None, description="ContactID (exact)") company: str | None = Field(None, description="Company name (partial)") name: str | None = Field(None, description="Contact person name (not used, for API compat)") limit: int = Field(20, ge=1, le=200) class OrdersListRequest(BaseModel): so_id: str | None = Field(None, description="Sales Order ID") customer_name: str | None = Field(None, description="Customer name (partial, staff only)") customer_cid: str | None = Field(None, description="Customer CID — overridden for dealer role") po_id: str | None = Field(None, description="PO number") status: str | None = Field(None, description="Order status") limit: int = Field(20, ge=1, le=200) class StockRequest(BaseModel): model: str | None = Field(None, description="Model number (partial)") warehouse_cid: str | None = Field(None, description="Warehouse code") limit: int = Field(50, ge=1, le=500) class ProductImagesRequest(BaseModel): model: str = Field(..., description="Model number (exact or partial ILIKE)") limit: int = Field(10, ge=1, le=50) # ────────────────────────────────────────────────────────────────────────────── # Endpoints # ────────────────────────────────────────────────────────────────────────────── @app.get("/health") async def health() -> dict: """Liveness probe — no auth required.""" return {"status": "ok", "service": "erp-bridge", "version": "3.0.0"} @app.post("/catalog", dependencies=[Depends(verify_api_key)]) async def catalog_search( req: CatalogRequest, ctx: UserCtx = Depends(get_user_ctx), ) -> list[dict[str, Any]]: """Search the product catalog. Catalog is not customer-scoped.""" # Combine description, manufacturer, category into a single keyword search keyword_parts = [p for p in [req.description, req.manufacturer, req.category] if p] keyword = " ".join(keyword_parts) or None return await call_list( "SELECT chatbot_api.catalog_search($1, $2, $3)", keyword, req.model, req.limit, ) @app.post("/contacts", dependencies=[Depends(verify_api_key)]) async def contacts_search( req: ContactsRequest, ctx: UserCtx = Depends(get_user_ctx), ) -> list[dict[str, Any]]: """ Search customers/contacts. Dealer role is scoped to their own record at the DB level. """ if ctx.is_dealer: # contact_get_dealer returns a single object; wrap in list for consistent response record = await call_single( "SELECT chatbot_api.contact_get_dealer($1)", ctx.customer_cid, ) return [record] if record else [] else: return await call_list( "SELECT chatbot_api.contact_get_staff($1, $2, $3)", req.company, req.contact_id, req.limit, ) @app.post("/orders", dependencies=[Depends(verify_api_key)]) async def orders_list( req: OrdersListRequest, ctx: UserCtx = Depends(get_user_ctx), ) -> list[dict[str, Any]]: """ List sales orders. Dealer variant enforces CustomerCID at the DB level — cannot be bypassed. """ if ctx.is_dealer: return await call_list( "SELECT chatbot_api.orders_list_dealer($1, $2, $3, $4, $5)", ctx.customer_cid, req.so_id, req.po_id, req.status, req.limit, ) else: return await call_list( "SELECT chatbot_api.orders_list_staff($1, $2, $3, $4, $5, $6)", req.so_id, req.po_id, req.customer_cid, req.customer_name, req.status, req.limit, ) @app.get("/orders/{so_id}", dependencies=[Depends(verify_api_key)]) async def order_get( so_id: str = Path(..., description="Sales Order ID"), ctx: UserCtx = Depends(get_user_ctx), ) -> dict[str, Any]: """ Get a single sales order with line items and notes. Dealer variant validates CID match at the DB level and returns NULL on mismatch (treated as 404 — prevents order-ID enumeration). """ if ctx.is_dealer: result = await call_single( "SELECT chatbot_api.order_get_dealer($1, $2)", so_id, ctx.customer_cid, ) else: result = await call_single( "SELECT chatbot_api.order_get_staff($1)", so_id, ) if result is None: raise HTTPException(status_code=404, detail=f"Sales order not found: {so_id}") return result @app.post("/stock", dependencies=[Depends(verify_api_key)]) async def stock_search( req: StockRequest, ctx: UserCtx = Depends(get_user_ctx), ) -> list[dict[str, Any]]: """ Search available stock. Dealer variant returns availability info only (no pallet/hold/logistics detail). """ if not req.model: return [] if ctx.is_dealer: return await call_list( "SELECT chatbot_api.stock_search_dealer($1, $2, $3)", req.model, req.warehouse_cid, req.limit, ) else: return await call_list( "SELECT chatbot_api.stock_search_staff($1, $2, $3)", req.model, req.warehouse_cid, req.limit, ) @app.post("/catalog/images", dependencies=[Depends(verify_api_key)]) async def catalog_images( req: ProductImagesRequest, ctx: UserCtx = Depends(get_user_ctx), ) -> dict[str, Any]: """ Return product images for a given model number. Delegates to chatbot_api.catalog_images which reads File_Url from public.config and joins shop.shop_product_picture with catalog. Returns: { base_url, images: [{ model, apppicture_path, full_url }] } """ result = await call_single( "SELECT chatbot_api.catalog_images($1, $2)", req.model, req.limit, ) return result or {"base_url": "", "images": []} # ────────────────────────────────────────────────────────────────────────────── # Dev runner (production uses uvicorn via systemd) # ────────────────────────────────────────────────────────────────────────────── if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=PORT, reload=False)