main.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. """
  2. Homelegance ERP Bridge — FastAPI service
  3. Calls erp_api.* stored functions in the ERP PostgreSQL database.
  4. Listens on 127.0.0.1:8080 (internal only, never exposed to the internet).
  5. Selected columns per endpoint are defined in COLUMNS_* dicts below.
  6. Adjust those dicts to add / remove fields without touching query logic.
  7. """
  8. from __future__ import annotations
  9. import json
  10. import logging
  11. import os
  12. from contextlib import asynccontextmanager
  13. from typing import Any
  14. import asyncpg
  15. from dotenv import load_dotenv
  16. from fastapi import Depends, FastAPI, Header, HTTPException, Path
  17. from pydantic import BaseModel, Field
  18. # ──────────────────────────────────────────────────────────────────────────────
  19. # Config
  20. # ──────────────────────────────────────────────────────────────────────────────
  21. load_dotenv()
  22. ERP_DATABASE_URL: str = os.environ["ERP_DATABASE_URL"]
  23. ERP_API_KEY: str = os.environ["ERP_API_KEY"]
  24. PORT: int = int(os.getenv("PORT", "8080"))
  25. logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
  26. log = logging.getLogger(__name__)
  27. # ──────────────────────────────────────────────────────────────────────────────
  28. # Selected-column configuration
  29. # Each key is the JSON key returned by the ERP stored function.
  30. # Set the value to True to include it in the API response.
  31. # ──────────────────────────────────────────────────────────────────────────────
  32. COLUMNS_CATALOG: set[str] = {
  33. "Model",
  34. "Manufacturer",
  35. "Description",
  36. "Type", # product category
  37. "Department",
  38. "Color",
  39. "UOM",
  40. "Status",
  41. "StockQTY",
  42. "EstimatedValue",
  43. }
  44. COLUMNS_CONTACTS: set[str] = {
  45. "ContactID",
  46. "Company",
  47. "ContactName1",
  48. "Phone1",
  49. "Email1",
  50. "Address1",
  51. "City",
  52. "State",
  53. "Country",
  54. "Sales1", # primary sales rep
  55. "AR_CreditHold",
  56. "Active",
  57. "Terms",
  58. "PriceType",
  59. }
  60. COLUMNS_ORDERS_LIST: set[str] = {
  61. "SOID",
  62. "OrderType",
  63. "Status",
  64. "SubStatus",
  65. "CustomerName",
  66. "CustomerCID",
  67. "POID",
  68. "Model",
  69. "Quantity",
  70. "UnitPrice",
  71. "TotalPrice",
  72. "InvoiceNo",
  73. "InvoiceDate",
  74. "TrackingNumber",
  75. "ETD",
  76. "ShipVia",
  77. "Carrier",
  78. "SODate",
  79. "ScheduleDate",
  80. }
  81. # Columns for the sales_order_get header (flat fields)
  82. COLUMNS_ORDER_HEADER: set[str] = {
  83. "SOID",
  84. "POID",
  85. "OrderType",
  86. "Status",
  87. "StatusDesc",
  88. "CustomerName",
  89. "CustomerCID",
  90. "ReceiverName",
  91. "ReceiverAddress1",
  92. "ReceiverCity",
  93. "ReceiverState",
  94. "ReceiverCountry",
  95. "ReceiverZipcode",
  96. "ShipVia",
  97. "Shipper",
  98. "ScheduleDate",
  99. "ArrivalDate",
  100. "Notes",
  101. "InternalNote",
  102. "ModifyTime",
  103. }
  104. # Columns for each item inside the Items[] array
  105. COLUMNS_ORDER_ITEM: set[str] = {
  106. "LineNo",
  107. "ItemID",
  108. "Model",
  109. "Manufacturer",
  110. "Description",
  111. "Quantity",
  112. "UnitPrice",
  113. "TotalPrice",
  114. "Status",
  115. "StatusDesc",
  116. "ETD",
  117. "ItemNote",
  118. "WarehouseID",
  119. "InvoiceQuantity",
  120. "TransactedQuantity",
  121. }
  122. COLUMNS_STOCK: set[str] = {
  123. "Model",
  124. "Manufacturer",
  125. "Description",
  126. "Quantity",
  127. "PalletID",
  128. "WarehouseCID",
  129. "SubInv",
  130. "HoldByPalletID",
  131. "HoldByRef",
  132. "B_ETA",
  133. "A_ETA",
  134. "SalesOrderID",
  135. }
  136. # ──────────────────────────────────────────────────────────────────────────────
  137. # DB Pool (lifespan)
  138. # ──────────────────────────────────────────────────────────────────────────────
  139. DB_POOL: asyncpg.Pool | None = None
  140. @asynccontextmanager
  141. async def lifespan(app: FastAPI):
  142. global DB_POOL
  143. log.info("Connecting to ERP database …")
  144. DB_POOL = await asyncpg.create_pool(
  145. ERP_DATABASE_URL,
  146. min_size=2,
  147. max_size=10,
  148. command_timeout=30,
  149. statement_cache_size=0, # required when using pgBouncer in transaction mode
  150. )
  151. log.info("ERP database pool ready.")
  152. yield
  153. if DB_POOL:
  154. await DB_POOL.close()
  155. log.info("ERP database pool closed.")
  156. app = FastAPI(
  157. title="Homelegance ERP Bridge",
  158. version="1.0.0",
  159. docs_url=None, # disable Swagger UI in production
  160. redoc_url=None,
  161. lifespan=lifespan,
  162. )
  163. # ──────────────────────────────────────────────────────────────────────────────
  164. # Auth dependency
  165. # ──────────────────────────────────────────────────────────────────────────────
  166. def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")) -> None:
  167. if x_api_key != ERP_API_KEY:
  168. raise HTTPException(status_code=401, detail="Invalid API key")
  169. # ──────────────────────────────────────────────────────────────────────────────
  170. # Helpers
  171. # ──────────────────────────────────────────────────────────────────────────────
  172. def filter_keys(obj: dict, allowed: set[str]) -> dict:
  173. """Return a new dict containing only the allowed keys."""
  174. return {k: v for k, v in obj.items() if k in allowed}
  175. def filter_list(rows: list[dict], allowed: set[str]) -> list[dict]:
  176. return [filter_keys(r, allowed) for r in rows]
  177. async def call_list_fn(fn_name: str, conditions: dict, limit: int) -> list[dict]:
  178. """Call an erp_api list stored function that accepts (jsonb, int) and returns json."""
  179. assert DB_POOL is not None
  180. sql = f"SELECT erp_api.{fn_name}($1::jsonb, $2)"
  181. row = await DB_POOL.fetchrow(sql, json.dumps(conditions), limit)
  182. if row is None:
  183. return []
  184. result = row[0]
  185. if result is None:
  186. return []
  187. data = json.loads(result) if isinstance(result, str) else result
  188. return data if isinstance(data, list) else []
  189. def pick(obj: dict | None, keys: set[str]) -> dict:
  190. if not obj:
  191. return {}
  192. return {k: obj[k] for k in keys if k in obj}
  193. # ──────────────────────────────────────────────────────────────────────────────
  194. # Request / response models
  195. # ──────────────────────────────────────────────────────────────────────────────
  196. class CatalogRequest(BaseModel):
  197. model: str | None = Field(None, description="Partial model number (ILIKE search)")
  198. manufacturer: str | None = Field(None, description="Manufacturer filter")
  199. description: str | None = Field(None, description="Description keyword")
  200. category: str | None = Field(None, description="Product category / Type")
  201. status: str | None = Field(None, description="Item status code (e.g. 'A' for active)")
  202. limit: int = Field(20, ge=1, le=200)
  203. class ContactsRequest(BaseModel):
  204. contact_id: str | None = Field(None, description="Exact ContactID")
  205. company: str | None = Field(None, description="Company name (partial match)")
  206. name: str | None = Field(None, description="Contact person name")
  207. limit: int = Field(20, ge=1, le=200)
  208. class OrdersListRequest(BaseModel):
  209. so_id: str | None = Field(None, description="Sales Order ID (partial match)")
  210. customer_name: str | None = Field(None, description="Customer name (partial match)")
  211. customer_cid: str | None = Field(None, description="Customer CID (exact)")
  212. po_id: str | None = Field(None, description="PO number (partial match)")
  213. status: str | None = Field(None, description="Order status code")
  214. limit: int = Field(20, ge=1, le=200)
  215. class StockRequest(BaseModel):
  216. model: str | None = Field(None, description="Model number (partial match)")
  217. warehouse_cid: str | None = Field(None, description="Warehouse code")
  218. limit: int = Field(50, ge=1, le=500)
  219. # ──────────────────────────────────────────────────────────────────────────────
  220. # Endpoints
  221. # ──────────────────────────────────────────────────────────────────────────────
  222. @app.get("/health")
  223. async def health() -> dict:
  224. """Simple liveness probe — no auth required."""
  225. return {"status": "ok", "service": "erp-bridge"}
  226. @app.post("/catalog", dependencies=[Depends(verify_api_key)])
  227. async def catalog_search(req: CatalogRequest) -> list[dict[str, Any]]:
  228. """
  229. Search the product catalog.
  230. Calls erp_api.catalog_lists(conditions jsonb, limit int).
  231. """
  232. conditions: dict[str, Any] = {}
  233. if req.model:
  234. conditions["Model"] = req.model
  235. if req.manufacturer:
  236. conditions["Manufacturer"] = req.manufacturer
  237. if req.description:
  238. conditions["Description"] = req.description
  239. if req.category:
  240. conditions["Type"] = req.category
  241. if req.status:
  242. conditions["Status"] = req.status
  243. rows = await call_list_fn("catalog_lists", conditions, req.limit)
  244. return filter_list(rows, COLUMNS_CATALOG)
  245. @app.post("/contacts", dependencies=[Depends(verify_api_key)])
  246. async def contacts_search(req: ContactsRequest) -> list[dict[str, Any]]:
  247. """
  248. Search customers / contacts.
  249. Calls erp_api.contact_lists(conditions jsonb, limit int).
  250. """
  251. conditions: dict[str, Any] = {}
  252. if req.contact_id:
  253. conditions["ContactID"] = req.contact_id
  254. if req.company:
  255. conditions["Company"] = req.company
  256. if req.name:
  257. conditions["ContactName"] = req.name
  258. rows = await call_list_fn("contact_lists", conditions, req.limit)
  259. return filter_list(rows, COLUMNS_CONTACTS)
  260. @app.post("/orders", dependencies=[Depends(verify_api_key)])
  261. async def orders_list(req: OrdersListRequest) -> list[dict[str, Any]]:
  262. """
  263. List sales orders (one row per line item).
  264. Calls erp_api.sales_orders_lists(conditions jsonb, limit int).
  265. """
  266. conditions: dict[str, Any] = {}
  267. if req.so_id:
  268. conditions["SOID"] = req.so_id
  269. if req.customer_name:
  270. conditions["CustomerName"] = req.customer_name
  271. if req.customer_cid:
  272. conditions["CustomerCID"] = req.customer_cid
  273. if req.po_id:
  274. conditions["POID"] = req.po_id
  275. if req.status:
  276. conditions["Status"] = req.status
  277. rows = await call_list_fn("sales_orders_lists", conditions, req.limit)
  278. return filter_list(rows, COLUMNS_ORDER_HEADER)
  279. @app.get("/orders/{so_id}", dependencies=[Depends(verify_api_key)])
  280. async def order_get(so_id: str = Path(..., description="Sales Order ID")) -> dict[str, Any]:
  281. """
  282. Retrieve a single sales order with nested Items and Notes.
  283. Calls erp_api.sales_order_get(so_id text).
  284. """
  285. assert DB_POOL is not None
  286. sql = "SELECT erp_api.sales_order_get($1)"
  287. try:
  288. row = await DB_POOL.fetchrow(sql, so_id)
  289. except asyncpg.PostgresError as e:
  290. raise HTTPException(status_code=404, detail=str(e))
  291. if row is None or row[0] is None:
  292. raise HTTPException(status_code=404, detail=f"Sales order not found: {so_id}")
  293. full: dict = json.loads(row[0]) if isinstance(row[0], str) else row[0]
  294. # Filter header columns
  295. result = pick(full, COLUMNS_ORDER_HEADER)
  296. # Filter Items array
  297. items_raw: list[dict] = full.get("Items") or []
  298. result["Items"] = [pick(item, COLUMNS_ORDER_ITEM) for item in items_raw]
  299. # Include Notes array as-is (already minimal)
  300. result["Notes"] = full.get("Notes") or []
  301. return result
  302. @app.post("/stock", dependencies=[Depends(verify_api_key)])
  303. async def stock_search(req: StockRequest) -> list[dict[str, Any]]:
  304. """
  305. Search available stock (pallet level).
  306. Calls erp_api.stock_lists(conditions jsonb, limit int).
  307. """
  308. conditions: dict[str, Any] = {}
  309. if req.model:
  310. conditions["Model"] = req.model
  311. if req.warehouse_cid:
  312. conditions["WarehouseCID"] = req.warehouse_cid
  313. rows = await call_list_fn("stock_lists", conditions, req.limit)
  314. return filter_list(rows, COLUMNS_STOCK)
  315. # ──────────────────────────────────────────────────────────────────────────────
  316. # Dev runner (not used in production — uvicorn is started by systemd)
  317. # ──────────────────────────────────────────────────────────────────────────────
  318. if __name__ == "__main__":
  319. import uvicorn
  320. uvicorn.run("main:app", host="127.0.0.1", port=PORT, reload=False)