main.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. """
  2. Homelegance ERP Bridge — FastAPI service
  3. Calls chatbot_api.* stored functions in the ERP PostgreSQL database.
  4. Listens on 127.0.0.1:8080 (internal only, never exposed to the internet).
  5. Permission model (passed by Node.js via request headers):
  6. X-User-Role : "admin" | "agent" | "user"
  7. X-Customer-CID : ERP ContactID (required when role == "user")
  8. admin / agent → calls *_staff function variants — sees all customers
  9. user (dealer) → calls *_dealer function variants — CID enforced at DB level
  10. Column selection is handled entirely by the chatbot_api.* functions in PostgreSQL.
  11. The dealer variants have p_customer_cid as a mandatory parameter with no default,
  12. making it physically impossible to return another customer's data even if this
  13. application code is compromised.
  14. """
  15. from __future__ import annotations
  16. import json
  17. import logging
  18. import os
  19. from contextlib import asynccontextmanager
  20. from dataclasses import dataclass
  21. from typing import Any
  22. import asyncpg
  23. from dotenv import load_dotenv
  24. from fastapi import Depends, FastAPI, Header, HTTPException, Path
  25. from pydantic import BaseModel, Field
  26. # ──────────────────────────────────────────────────────────────────────────────
  27. # Config
  28. # ──────────────────────────────────────────────────────────────────────────────
  29. load_dotenv()
  30. ERP_DATABASE_URL: str = os.environ["ERP_DATABASE_URL"]
  31. ERP_API_KEY: str = os.environ["ERP_API_KEY"]
  32. PORT: int = int(os.getenv("PORT", "8080"))
  33. logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
  34. log = logging.getLogger(__name__)
  35. # ──────────────────────────────────────────────────────────────────────────────
  36. # DB Pool (lifespan)
  37. # ──────────────────────────────────────────────────────────────────────────────
  38. DB_POOL: asyncpg.Pool | None = None
  39. @asynccontextmanager
  40. async def lifespan(app: FastAPI):
  41. global DB_POOL
  42. log.info("Connecting to ERP database …")
  43. DB_POOL = await asyncpg.create_pool(
  44. ERP_DATABASE_URL,
  45. min_size=2,
  46. max_size=10,
  47. command_timeout=30,
  48. statement_cache_size=0, # required for pgBouncer transaction mode
  49. )
  50. log.info("ERP database pool ready.")
  51. yield
  52. if DB_POOL:
  53. await DB_POOL.close()
  54. log.info("ERP database pool closed.")
  55. app = FastAPI(
  56. title="Homelegance ERP Bridge",
  57. version="3.0.0",
  58. docs_url=None, # disable Swagger UI in production
  59. redoc_url=None,
  60. lifespan=lifespan,
  61. )
  62. # ──────────────────────────────────────────────────────────────────────────────
  63. # Auth & permission dependencies
  64. # ──────────────────────────────────────────────────────────────────────────────
  65. def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")) -> None:
  66. if x_api_key != ERP_API_KEY:
  67. raise HTTPException(status_code=401, detail="Invalid API key")
  68. @dataclass
  69. class UserCtx:
  70. role: str # "admin" | "agent" | "user"
  71. customer_cid: str | None # ERP ContactID; required when role == "user"
  72. @property
  73. def is_staff(self) -> bool:
  74. return self.role in ("admin", "agent")
  75. @property
  76. def is_dealer(self) -> bool:
  77. return self.role == "user"
  78. def get_user_ctx(
  79. x_user_role: str = Header("admin", alias="X-User-Role"),
  80. x_customer_cid: str | None = Header(None, alias="X-Customer-CID"),
  81. ) -> UserCtx:
  82. role = x_user_role.lower().strip()
  83. if role not in ("admin", "agent", "user"):
  84. role = "admin" # safe default for unknown values
  85. if role == "user" and not x_customer_cid:
  86. raise HTTPException(
  87. status_code=403,
  88. detail="X-Customer-CID header required for dealer (user) role",
  89. )
  90. return UserCtx(role=role, customer_cid=x_customer_cid)
  91. # ──────────────────────────────────────────────────────────────────────────────
  92. # DB call helpers
  93. # ──────────────────────────────────────────────────────────────────────────────
  94. async def call_list(sql: str, *args: Any) -> list[dict]:
  95. """Call a chatbot_api function that returns a JSON array."""
  96. assert DB_POOL is not None
  97. row = await DB_POOL.fetchrow(sql, *args)
  98. if row is None or row[0] is None:
  99. return []
  100. data = json.loads(row[0]) if isinstance(row[0], str) else row[0]
  101. return data if isinstance(data, list) else []
  102. async def call_single(sql: str, *args: Any) -> dict | None:
  103. """Call a chatbot_api function that returns a single JSON object (or NULL)."""
  104. assert DB_POOL is not None
  105. row = await DB_POOL.fetchrow(sql, *args)
  106. if row is None or row[0] is None:
  107. return None
  108. data = json.loads(row[0]) if isinstance(row[0], str) else row[0]
  109. return data if isinstance(data, dict) else None
  110. # ──────────────────────────────────────────────────────────────────────────────
  111. # Request models
  112. # ──────────────────────────────────────────────────────────────────────────────
  113. class CatalogRequest(BaseModel):
  114. model: str | None = Field(None, description="Partial model number (ILIKE)")
  115. description: str | None = Field(None, description="Description keyword")
  116. # manufacturer and category are accepted for API compatibility but are
  117. # combined into the keyword search — chatbot_api.catalog_search searches
  118. # model + description fields only
  119. manufacturer: str | None = Field(None, description="Manufacturer (combined into keyword)")
  120. category: str | None = Field(None, description="Category (combined into keyword)")
  121. status: str | None = Field(None, description="Item status (not filtered at DB level)")
  122. limit: int = Field(20, ge=1, le=200)
  123. class ContactsRequest(BaseModel):
  124. contact_id: str | None = Field(None, description="ContactID (exact)")
  125. company: str | None = Field(None, description="Company name (partial)")
  126. name: str | None = Field(None, description="Contact person name (not used, for API compat)")
  127. limit: int = Field(20, ge=1, le=200)
  128. class OrdersListRequest(BaseModel):
  129. so_id: str | None = Field(None, description="Sales Order ID")
  130. customer_name: str | None = Field(None, description="Customer name (partial, staff only)")
  131. customer_cid: str | None = Field(None, description="Customer CID — overridden for dealer role")
  132. po_id: str | None = Field(None, description="PO number")
  133. status: str | None = Field(None, description="Order status")
  134. limit: int = Field(20, ge=1, le=200)
  135. class StockRequest(BaseModel):
  136. model: str | None = Field(None, description="Model number (partial)")
  137. warehouse_cid: str | None = Field(None, description="Warehouse code")
  138. limit: int = Field(50, ge=1, le=500)
  139. # ──────────────────────────────────────────────────────────────────────────────
  140. # Endpoints
  141. # ──────────────────────────────────────────────────────────────────────────────
  142. @app.get("/health")
  143. async def health() -> dict:
  144. """Liveness probe — no auth required."""
  145. return {"status": "ok", "service": "erp-bridge", "version": "3.0.0"}
  146. @app.post("/catalog", dependencies=[Depends(verify_api_key)])
  147. async def catalog_search(
  148. req: CatalogRequest,
  149. ctx: UserCtx = Depends(get_user_ctx),
  150. ) -> list[dict[str, Any]]:
  151. """Search the product catalog. Catalog is not customer-scoped."""
  152. # Combine description, manufacturer, category into a single keyword search
  153. keyword_parts = [p for p in [req.description, req.manufacturer, req.category] if p]
  154. keyword = " ".join(keyword_parts) or None
  155. return await call_list(
  156. "SELECT chatbot_api.catalog_search($1, $2, $3)",
  157. keyword, req.model, req.limit,
  158. )
  159. @app.post("/contacts", dependencies=[Depends(verify_api_key)])
  160. async def contacts_search(
  161. req: ContactsRequest,
  162. ctx: UserCtx = Depends(get_user_ctx),
  163. ) -> list[dict[str, Any]]:
  164. """
  165. Search customers/contacts.
  166. Dealer role is scoped to their own record at the DB level.
  167. """
  168. if ctx.is_dealer:
  169. # contact_get_dealer returns a single object; wrap in list for consistent response
  170. record = await call_single(
  171. "SELECT chatbot_api.contact_get_dealer($1)",
  172. ctx.customer_cid,
  173. )
  174. return [record] if record else []
  175. else:
  176. return await call_list(
  177. "SELECT chatbot_api.contact_get_staff($1, $2, $3)",
  178. req.company, req.contact_id, req.limit,
  179. )
  180. @app.post("/orders", dependencies=[Depends(verify_api_key)])
  181. async def orders_list(
  182. req: OrdersListRequest,
  183. ctx: UserCtx = Depends(get_user_ctx),
  184. ) -> list[dict[str, Any]]:
  185. """
  186. List sales orders.
  187. Dealer variant enforces CustomerCID at the DB level — cannot be bypassed.
  188. """
  189. if ctx.is_dealer:
  190. return await call_list(
  191. "SELECT chatbot_api.orders_list_dealer($1, $2, $3, $4, $5)",
  192. ctx.customer_cid, req.so_id, req.po_id, req.status, req.limit,
  193. )
  194. else:
  195. return await call_list(
  196. "SELECT chatbot_api.orders_list_staff($1, $2, $3, $4, $5, $6)",
  197. req.so_id, req.po_id, req.customer_cid, req.customer_name, req.status, req.limit,
  198. )
  199. @app.get("/orders/{so_id}", dependencies=[Depends(verify_api_key)])
  200. async def order_get(
  201. so_id: str = Path(..., description="Sales Order ID"),
  202. ctx: UserCtx = Depends(get_user_ctx),
  203. ) -> dict[str, Any]:
  204. """
  205. Get a single sales order with line items and notes.
  206. Dealer variant validates CID match at the DB level and returns NULL on mismatch
  207. (treated as 404 — prevents order-ID enumeration).
  208. """
  209. if ctx.is_dealer:
  210. result = await call_single(
  211. "SELECT chatbot_api.order_get_dealer($1, $2)",
  212. so_id, ctx.customer_cid,
  213. )
  214. else:
  215. result = await call_single(
  216. "SELECT chatbot_api.order_get_staff($1)",
  217. so_id,
  218. )
  219. if result is None:
  220. raise HTTPException(status_code=404, detail=f"Sales order not found: {so_id}")
  221. return result
  222. @app.post("/stock", dependencies=[Depends(verify_api_key)])
  223. async def stock_search(
  224. req: StockRequest,
  225. ctx: UserCtx = Depends(get_user_ctx),
  226. ) -> list[dict[str, Any]]:
  227. """
  228. Search available stock.
  229. Dealer variant returns availability info only (no pallet/hold/logistics detail).
  230. """
  231. if not req.model:
  232. return []
  233. if ctx.is_dealer:
  234. return await call_list(
  235. "SELECT chatbot_api.stock_search_dealer($1, $2, $3)",
  236. req.model, req.warehouse_cid, req.limit,
  237. )
  238. else:
  239. return await call_list(
  240. "SELECT chatbot_api.stock_search_staff($1, $2, $3)",
  241. req.model, req.warehouse_cid, req.limit,
  242. )
  243. # ──────────────────────────────────────────────────────────────────────────────
  244. # Dev runner (production uses uvicorn via systemd)
  245. # ──────────────────────────────────────────────────────────────────────────────
  246. if __name__ == "__main__":
  247. import uvicorn
  248. uvicorn.run("main:app", host="127.0.0.1", port=PORT, reload=False)