| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- """
- Homelegance ERP Bridge — FastAPI service
- Calls chatbot_api.* stored functions in the ERP PostgreSQL database.
- Listens on 127.0.0.1:8080 (internal only, never exposed to the internet).
- Permission model (passed by Node.js via request headers):
- X-User-Role : "admin" | "agent" | "user"
- X-Customer-CID : ERP ContactID (required when role == "user")
- admin / agent → calls *_staff function variants — sees all customers
- user (dealer) → calls *_dealer function variants — CID enforced at DB level
- Column selection is handled entirely by the chatbot_api.* functions in PostgreSQL.
- The dealer variants have p_customer_cid as a mandatory parameter with no default,
- making it physically impossible to return another customer's data even if this
- application code is compromised.
- """
- from __future__ import annotations
- import json
- import logging
- import os
- from contextlib import asynccontextmanager
- from dataclasses import dataclass
- from typing import Any
- import asyncpg
- from dotenv import load_dotenv
- from fastapi import Depends, FastAPI, Header, HTTPException, Path
- from pydantic import BaseModel, Field
- # ──────────────────────────────────────────────────────────────────────────────
- # Config
- # ──────────────────────────────────────────────────────────────────────────────
- load_dotenv()
- ERP_DATABASE_URL: str = os.environ["ERP_DATABASE_URL"]
- ERP_API_KEY: str = os.environ["ERP_API_KEY"]
- PORT: int = int(os.getenv("PORT", "8080"))
- logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
- log = logging.getLogger(__name__)
- # ──────────────────────────────────────────────────────────────────────────────
- # DB Pool (lifespan)
- # ──────────────────────────────────────────────────────────────────────────────
- DB_POOL: asyncpg.Pool | None = None
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- global DB_POOL
- log.info("Connecting to ERP database …")
- DB_POOL = await asyncpg.create_pool(
- ERP_DATABASE_URL,
- min_size=2,
- max_size=10,
- command_timeout=30,
- statement_cache_size=0, # required for pgBouncer transaction mode
- )
- log.info("ERP database pool ready.")
- yield
- if DB_POOL:
- await DB_POOL.close()
- log.info("ERP database pool closed.")
- app = FastAPI(
- title="Homelegance ERP Bridge",
- version="3.0.0",
- docs_url=None, # disable Swagger UI in production
- redoc_url=None,
- lifespan=lifespan,
- )
- # ──────────────────────────────────────────────────────────────────────────────
- # Auth & permission dependencies
- # ──────────────────────────────────────────────────────────────────────────────
- def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")) -> None:
- if x_api_key != ERP_API_KEY:
- raise HTTPException(status_code=401, detail="Invalid API key")
- @dataclass
- class UserCtx:
- role: str # "admin" | "agent" | "user"
- customer_cid: str | None # ERP ContactID; required when role == "user"
- @property
- def is_staff(self) -> bool:
- return self.role in ("admin", "agent")
- @property
- def is_dealer(self) -> bool:
- return self.role == "user"
- def get_user_ctx(
- x_user_role: str = Header("admin", alias="X-User-Role"),
- x_customer_cid: str | None = Header(None, alias="X-Customer-CID"),
- ) -> UserCtx:
- role = x_user_role.lower().strip()
- if role not in ("admin", "agent", "user"):
- role = "admin" # safe default for unknown values
- if role == "user" and not x_customer_cid:
- raise HTTPException(
- status_code=403,
- detail="X-Customer-CID header required for dealer (user) role",
- )
- return UserCtx(role=role, customer_cid=x_customer_cid)
- # ──────────────────────────────────────────────────────────────────────────────
- # DB call helpers
- # ──────────────────────────────────────────────────────────────────────────────
- async def call_list(sql: str, *args: Any) -> list[dict]:
- """Call a chatbot_api function that returns a JSON array."""
- assert DB_POOL is not None
- row = await DB_POOL.fetchrow(sql, *args)
- if row is None or row[0] is None:
- return []
- data = json.loads(row[0]) if isinstance(row[0], str) else row[0]
- return data if isinstance(data, list) else []
- async def call_single(sql: str, *args: Any) -> dict | None:
- """Call a chatbot_api function that returns a single JSON object (or NULL)."""
- assert DB_POOL is not None
- row = await DB_POOL.fetchrow(sql, *args)
- if row is None or row[0] is None:
- return None
- data = json.loads(row[0]) if isinstance(row[0], str) else row[0]
- return data if isinstance(data, dict) else None
- # ──────────────────────────────────────────────────────────────────────────────
- # Request models
- # ──────────────────────────────────────────────────────────────────────────────
- class CatalogRequest(BaseModel):
- model: str | None = Field(None, description="Partial model number (ILIKE)")
- description: str | None = Field(None, description="Description keyword")
- # manufacturer and category are accepted for API compatibility but are
- # combined into the keyword search — chatbot_api.catalog_search searches
- # model + description fields only
- manufacturer: str | None = Field(None, description="Manufacturer (combined into keyword)")
- category: str | None = Field(None, description="Category (combined into keyword)")
- status: str | None = Field(None, description="Item status (not filtered at DB level)")
- limit: int = Field(20, ge=1, le=200)
- class ContactsRequest(BaseModel):
- contact_id: str | None = Field(None, description="ContactID (exact)")
- company: str | None = Field(None, description="Company name (partial)")
- name: str | None = Field(None, description="Contact person name (not used, for API compat)")
- limit: int = Field(20, ge=1, le=200)
- class OrdersListRequest(BaseModel):
- so_id: str | None = Field(None, description="Sales Order ID")
- customer_name: str | None = Field(None, description="Customer name (partial, staff only)")
- customer_cid: str | None = Field(None, description="Customer CID — overridden for dealer role")
- po_id: str | None = Field(None, description="PO number")
- status: str | None = Field(None, description="Order status")
- limit: int = Field(20, ge=1, le=200)
- class StockRequest(BaseModel):
- model: str | None = Field(None, description="Model number (partial)")
- warehouse_cid: str | None = Field(None, description="Warehouse code")
- limit: int = Field(50, ge=1, le=500)
- # ──────────────────────────────────────────────────────────────────────────────
- # Endpoints
- # ──────────────────────────────────────────────────────────────────────────────
- @app.get("/health")
- async def health() -> dict:
- """Liveness probe — no auth required."""
- return {"status": "ok", "service": "erp-bridge", "version": "3.0.0"}
- @app.post("/catalog", dependencies=[Depends(verify_api_key)])
- async def catalog_search(
- req: CatalogRequest,
- ctx: UserCtx = Depends(get_user_ctx),
- ) -> list[dict[str, Any]]:
- """Search the product catalog. Catalog is not customer-scoped."""
- # Combine description, manufacturer, category into a single keyword search
- keyword_parts = [p for p in [req.description, req.manufacturer, req.category] if p]
- keyword = " ".join(keyword_parts) or None
- return await call_list(
- "SELECT chatbot_api.catalog_search($1, $2, $3)",
- keyword, req.model, req.limit,
- )
- @app.post("/contacts", dependencies=[Depends(verify_api_key)])
- async def contacts_search(
- req: ContactsRequest,
- ctx: UserCtx = Depends(get_user_ctx),
- ) -> list[dict[str, Any]]:
- """
- Search customers/contacts.
- Dealer role is scoped to their own record at the DB level.
- """
- if ctx.is_dealer:
- # contact_get_dealer returns a single object; wrap in list for consistent response
- record = await call_single(
- "SELECT chatbot_api.contact_get_dealer($1)",
- ctx.customer_cid,
- )
- return [record] if record else []
- else:
- return await call_list(
- "SELECT chatbot_api.contact_get_staff($1, $2, $3)",
- req.company, req.contact_id, req.limit,
- )
- @app.post("/orders", dependencies=[Depends(verify_api_key)])
- async def orders_list(
- req: OrdersListRequest,
- ctx: UserCtx = Depends(get_user_ctx),
- ) -> list[dict[str, Any]]:
- """
- List sales orders.
- Dealer variant enforces CustomerCID at the DB level — cannot be bypassed.
- """
- if ctx.is_dealer:
- return await call_list(
- "SELECT chatbot_api.orders_list_dealer($1, $2, $3, $4, $5)",
- ctx.customer_cid, req.so_id, req.po_id, req.status, req.limit,
- )
- else:
- return await call_list(
- "SELECT chatbot_api.orders_list_staff($1, $2, $3, $4, $5, $6)",
- req.so_id, req.po_id, req.customer_cid, req.customer_name, req.status, req.limit,
- )
- @app.get("/orders/{so_id}", dependencies=[Depends(verify_api_key)])
- async def order_get(
- so_id: str = Path(..., description="Sales Order ID"),
- ctx: UserCtx = Depends(get_user_ctx),
- ) -> dict[str, Any]:
- """
- Get a single sales order with line items and notes.
- Dealer variant validates CID match at the DB level and returns NULL on mismatch
- (treated as 404 — prevents order-ID enumeration).
- """
- if ctx.is_dealer:
- result = await call_single(
- "SELECT chatbot_api.order_get_dealer($1, $2)",
- so_id, ctx.customer_cid,
- )
- else:
- result = await call_single(
- "SELECT chatbot_api.order_get_staff($1)",
- so_id,
- )
- if result is None:
- raise HTTPException(status_code=404, detail=f"Sales order not found: {so_id}")
- return result
- @app.post("/stock", dependencies=[Depends(verify_api_key)])
- async def stock_search(
- req: StockRequest,
- ctx: UserCtx = Depends(get_user_ctx),
- ) -> list[dict[str, Any]]:
- """
- Search available stock.
- Dealer variant returns availability info only (no pallet/hold/logistics detail).
- """
- if not req.model:
- return []
- if ctx.is_dealer:
- return await call_list(
- "SELECT chatbot_api.stock_search_dealer($1, $2, $3)",
- req.model, req.warehouse_cid, req.limit,
- )
- else:
- return await call_list(
- "SELECT chatbot_api.stock_search_staff($1, $2, $3)",
- req.model, req.warehouse_cid, req.limit,
- )
- # ──────────────────────────────────────────────────────────────────────────────
- # Dev runner (production uses uvicorn via systemd)
- # ──────────────────────────────────────────────────────────────────────────────
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run("main:app", host="127.0.0.1", port=PORT, reload=False)
|