|
|
@@ -0,0 +1,384 @@
|
|
|
+"""
|
|
|
+Homelegance ERP Bridge — FastAPI service
|
|
|
+Calls erp_api.* stored functions in the ERP PostgreSQL database.
|
|
|
+Listens on 127.0.0.1:8080 (internal only, never exposed to the internet).
|
|
|
+
|
|
|
+Selected columns per endpoint are defined in COLUMNS_* dicts below.
|
|
|
+Adjust those dicts to add / remove fields without touching query logic.
|
|
|
+"""
|
|
|
+
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+import json
|
|
|
+import logging
|
|
|
+import os
|
|
|
+from contextlib import asynccontextmanager
|
|
|
+from typing import Any
|
|
|
+
|
|
|
+import asyncpg
|
|
|
+from dotenv import load_dotenv
|
|
|
+from fastapi import Depends, FastAPI, Header, HTTPException, Path
|
|
|
+from pydantic import BaseModel, Field
|
|
|
+
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+# Config
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+load_dotenv()
|
|
|
+
|
|
|
+ERP_DATABASE_URL: str = os.environ["ERP_DATABASE_URL"]
|
|
|
+ERP_API_KEY: str = os.environ["ERP_API_KEY"]
|
|
|
+PORT: int = int(os.getenv("PORT", "8080"))
|
|
|
+
|
|
|
+logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
|
|
|
+log = logging.getLogger(__name__)
|
|
|
+
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+# Selected-column configuration
|
|
|
+# Each key is the JSON key returned by the ERP stored function.
|
|
|
+# Set the value to True to include it in the API response.
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+
|
|
|
+COLUMNS_CATALOG: set[str] = {
|
|
|
+ "Model",
|
|
|
+ "Manufacturer",
|
|
|
+ "Description",
|
|
|
+ "Type", # product category
|
|
|
+ "Department",
|
|
|
+ "Color",
|
|
|
+ "UOM",
|
|
|
+ "Status",
|
|
|
+ "StockQTY",
|
|
|
+ "EstimatedValue",
|
|
|
+}
|
|
|
+
|
|
|
+COLUMNS_CONTACTS: set[str] = {
|
|
|
+ "ContactID",
|
|
|
+ "Company",
|
|
|
+ "ContactName1",
|
|
|
+ "Phone1",
|
|
|
+ "Email1",
|
|
|
+ "Address1",
|
|
|
+ "City",
|
|
|
+ "State",
|
|
|
+ "Country",
|
|
|
+ "Sales1", # primary sales rep
|
|
|
+ "AR_CreditHold",
|
|
|
+ "Active",
|
|
|
+ "Terms",
|
|
|
+ "PriceType",
|
|
|
+}
|
|
|
+
|
|
|
+COLUMNS_ORDERS_LIST: set[str] = {
|
|
|
+ "SOID",
|
|
|
+ "OrderType",
|
|
|
+ "Status",
|
|
|
+ "SubStatus",
|
|
|
+ "CustomerName",
|
|
|
+ "CustomerCID",
|
|
|
+ "POID",
|
|
|
+ "Model",
|
|
|
+ "Quantity",
|
|
|
+ "UnitPrice",
|
|
|
+ "TotalPrice",
|
|
|
+ "InvoiceNo",
|
|
|
+ "InvoiceDate",
|
|
|
+ "TrackingNumber",
|
|
|
+ "ETD",
|
|
|
+ "ShipVia",
|
|
|
+ "Carrier",
|
|
|
+ "SODate",
|
|
|
+ "ScheduleDate",
|
|
|
+}
|
|
|
+
|
|
|
+# Columns for the sales_order_get header (flat fields)
|
|
|
+COLUMNS_ORDER_HEADER: set[str] = {
|
|
|
+ "SOID",
|
|
|
+ "POID",
|
|
|
+ "OrderType",
|
|
|
+ "Status",
|
|
|
+ "StatusDesc",
|
|
|
+ "CustomerName",
|
|
|
+ "CustomerCID",
|
|
|
+ "ReceiverName",
|
|
|
+ "ReceiverAddress1",
|
|
|
+ "ReceiverCity",
|
|
|
+ "ReceiverState",
|
|
|
+ "ReceiverCountry",
|
|
|
+ "ReceiverZipcode",
|
|
|
+ "ShipVia",
|
|
|
+ "Shipper",
|
|
|
+ "ScheduleDate",
|
|
|
+ "ArrivalDate",
|
|
|
+ "Notes",
|
|
|
+ "InternalNote",
|
|
|
+ "ModifyTime",
|
|
|
+}
|
|
|
+
|
|
|
+# Columns for each item inside the Items[] array
|
|
|
+COLUMNS_ORDER_ITEM: set[str] = {
|
|
|
+ "LineNo",
|
|
|
+ "ItemID",
|
|
|
+ "Model",
|
|
|
+ "Manufacturer",
|
|
|
+ "Description",
|
|
|
+ "Quantity",
|
|
|
+ "UnitPrice",
|
|
|
+ "TotalPrice",
|
|
|
+ "Status",
|
|
|
+ "StatusDesc",
|
|
|
+ "ETD",
|
|
|
+ "ItemNote",
|
|
|
+ "WarehouseID",
|
|
|
+ "InvoiceQuantity",
|
|
|
+ "TransactedQuantity",
|
|
|
+}
|
|
|
+
|
|
|
+COLUMNS_STOCK: set[str] = {
|
|
|
+ "Model",
|
|
|
+ "Manufacturer",
|
|
|
+ "Description",
|
|
|
+ "Quantity",
|
|
|
+ "PalletID",
|
|
|
+ "WarehouseCID",
|
|
|
+ "SubInv",
|
|
|
+ "HoldByPalletID",
|
|
|
+ "HoldByRef",
|
|
|
+ "B_ETA",
|
|
|
+ "A_ETA",
|
|
|
+ "SalesOrderID",
|
|
|
+}
|
|
|
+
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+# DB Pool (lifespan)
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+DB_POOL: asyncpg.Pool | None = None
|
|
|
+
|
|
|
+
|
|
|
+@asynccontextmanager
|
|
|
+async def lifespan(app: FastAPI):
|
|
|
+ global DB_POOL
|
|
|
+ log.info("Connecting to ERP database …")
|
|
|
+ DB_POOL = await asyncpg.create_pool(
|
|
|
+ ERP_DATABASE_URL,
|
|
|
+ min_size=2,
|
|
|
+ max_size=10,
|
|
|
+ command_timeout=30,
|
|
|
+ statement_cache_size=0, # required when using pgBouncer in transaction mode
|
|
|
+ )
|
|
|
+ log.info("ERP database pool ready.")
|
|
|
+ yield
|
|
|
+ if DB_POOL:
|
|
|
+ await DB_POOL.close()
|
|
|
+ log.info("ERP database pool closed.")
|
|
|
+
|
|
|
+
|
|
|
+app = FastAPI(
|
|
|
+ title="Homelegance ERP Bridge",
|
|
|
+ version="1.0.0",
|
|
|
+ docs_url=None, # disable Swagger UI in production
|
|
|
+ redoc_url=None,
|
|
|
+ lifespan=lifespan,
|
|
|
+)
|
|
|
+
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+# Auth dependency
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+
|
|
|
+def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")) -> None:
|
|
|
+ if x_api_key != ERP_API_KEY:
|
|
|
+ raise HTTPException(status_code=401, detail="Invalid API key")
|
|
|
+
|
|
|
+
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+# Helpers
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+
|
|
|
+def filter_keys(obj: dict, allowed: set[str]) -> dict:
|
|
|
+ """Return a new dict containing only the allowed keys."""
|
|
|
+ return {k: v for k, v in obj.items() if k in allowed}
|
|
|
+
|
|
|
+
|
|
|
+def filter_list(rows: list[dict], allowed: set[str]) -> list[dict]:
|
|
|
+ return [filter_keys(r, allowed) for r in rows]
|
|
|
+
|
|
|
+
|
|
|
+async def call_list_fn(fn_name: str, conditions: dict, limit: int) -> list[dict]:
|
|
|
+ """Call an erp_api list stored function that accepts (jsonb, int) and returns json."""
|
|
|
+ assert DB_POOL is not None
|
|
|
+ sql = f"SELECT erp_api.{fn_name}($1::jsonb, $2)"
|
|
|
+ row = await DB_POOL.fetchrow(sql, json.dumps(conditions), limit)
|
|
|
+ if row is None:
|
|
|
+ return []
|
|
|
+ result = row[0]
|
|
|
+ if result is None:
|
|
|
+ return []
|
|
|
+ data = json.loads(result) if isinstance(result, str) else result
|
|
|
+ return data if isinstance(data, list) else []
|
|
|
+
|
|
|
+
|
|
|
+def pick(obj: dict | None, keys: set[str]) -> dict:
|
|
|
+ if not obj:
|
|
|
+ return {}
|
|
|
+ return {k: obj[k] for k in keys if k in obj}
|
|
|
+
|
|
|
+
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+# Request / response models
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+
|
|
|
+class CatalogRequest(BaseModel):
|
|
|
+ model: str | None = Field(None, description="Partial model number (ILIKE search)")
|
|
|
+ manufacturer: str | None = Field(None, description="Manufacturer filter")
|
|
|
+ description: str | None = Field(None, description="Description keyword")
|
|
|
+ category: str | None = Field(None, description="Product category / Type")
|
|
|
+ status: str | None = Field(None, description="Item status code (e.g. 'A' for active)")
|
|
|
+ limit: int = Field(20, ge=1, le=200)
|
|
|
+
|
|
|
+
|
|
|
+class ContactsRequest(BaseModel):
|
|
|
+ contact_id: str | None = Field(None, description="Exact ContactID")
|
|
|
+ company: str | None = Field(None, description="Company name (partial match)")
|
|
|
+ name: str | None = Field(None, description="Contact person name")
|
|
|
+ limit: int = Field(20, ge=1, le=200)
|
|
|
+
|
|
|
+
|
|
|
+class OrdersListRequest(BaseModel):
|
|
|
+ so_id: str | None = Field(None, description="Sales Order ID (partial match)")
|
|
|
+ customer_name: str | None = Field(None, description="Customer name (partial match)")
|
|
|
+ customer_cid: str | None = Field(None, description="Customer CID (exact)")
|
|
|
+ po_id: str | None = Field(None, description="PO number (partial match)")
|
|
|
+ status: str | None = Field(None, description="Order status code")
|
|
|
+ limit: int = Field(20, ge=1, le=200)
|
|
|
+
|
|
|
+
|
|
|
+class StockRequest(BaseModel):
|
|
|
+ model: str | None = Field(None, description="Model number (partial match)")
|
|
|
+ warehouse_cid: str | None = Field(None, description="Warehouse code")
|
|
|
+ limit: int = Field(50, ge=1, le=500)
|
|
|
+
|
|
|
+
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+# Endpoints
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+
|
|
|
+@app.get("/health")
|
|
|
+async def health() -> dict:
|
|
|
+ """Simple liveness probe — no auth required."""
|
|
|
+ return {"status": "ok", "service": "erp-bridge"}
|
|
|
+
|
|
|
+
|
|
|
+@app.post("/catalog", dependencies=[Depends(verify_api_key)])
|
|
|
+async def catalog_search(req: CatalogRequest) -> list[dict[str, Any]]:
|
|
|
+ """
|
|
|
+ Search the product catalog.
|
|
|
+ Calls erp_api.catalog_lists(conditions jsonb, limit int).
|
|
|
+ """
|
|
|
+ conditions: dict[str, Any] = {}
|
|
|
+ if req.model:
|
|
|
+ conditions["Model"] = req.model
|
|
|
+ if req.manufacturer:
|
|
|
+ conditions["Manufacturer"] = req.manufacturer
|
|
|
+ if req.description:
|
|
|
+ conditions["Description"] = req.description
|
|
|
+ if req.category:
|
|
|
+ conditions["Type"] = req.category
|
|
|
+ if req.status:
|
|
|
+ conditions["Status"] = req.status
|
|
|
+
|
|
|
+ rows = await call_list_fn("catalog_lists", conditions, req.limit)
|
|
|
+ return filter_list(rows, COLUMNS_CATALOG)
|
|
|
+
|
|
|
+
|
|
|
+@app.post("/contacts", dependencies=[Depends(verify_api_key)])
|
|
|
+async def contacts_search(req: ContactsRequest) -> list[dict[str, Any]]:
|
|
|
+ """
|
|
|
+ Search customers / contacts.
|
|
|
+ Calls erp_api.contact_lists(conditions jsonb, limit int).
|
|
|
+ """
|
|
|
+ conditions: dict[str, Any] = {}
|
|
|
+ if req.contact_id:
|
|
|
+ conditions["ContactID"] = req.contact_id
|
|
|
+ if req.company:
|
|
|
+ conditions["Company"] = req.company
|
|
|
+ if req.name:
|
|
|
+ conditions["ContactName"] = req.name
|
|
|
+
|
|
|
+ rows = await call_list_fn("contact_lists", conditions, req.limit)
|
|
|
+ return filter_list(rows, COLUMNS_CONTACTS)
|
|
|
+
|
|
|
+
|
|
|
+@app.post("/orders", dependencies=[Depends(verify_api_key)])
|
|
|
+async def orders_list(req: OrdersListRequest) -> list[dict[str, Any]]:
|
|
|
+ """
|
|
|
+ List sales orders (one row per line item).
|
|
|
+ Calls erp_api.sales_orders_lists(conditions jsonb, limit int).
|
|
|
+ """
|
|
|
+ conditions: dict[str, Any] = {}
|
|
|
+ if req.so_id:
|
|
|
+ conditions["SOID"] = req.so_id
|
|
|
+ if req.customer_name:
|
|
|
+ conditions["CustomerName"] = req.customer_name
|
|
|
+ if req.customer_cid:
|
|
|
+ conditions["CustomerCID"] = req.customer_cid
|
|
|
+ if req.po_id:
|
|
|
+ conditions["POID"] = req.po_id
|
|
|
+ if req.status:
|
|
|
+ conditions["Status"] = req.status
|
|
|
+
|
|
|
+ rows = await call_list_fn("sales_orders_lists", conditions, req.limit)
|
|
|
+ return filter_list(rows, COLUMNS_ORDER_HEADER)
|
|
|
+
|
|
|
+
|
|
|
+@app.get("/orders/{so_id}", dependencies=[Depends(verify_api_key)])
|
|
|
+async def order_get(so_id: str = Path(..., description="Sales Order ID")) -> dict[str, Any]:
|
|
|
+ """
|
|
|
+ Retrieve a single sales order with nested Items and Notes.
|
|
|
+ Calls erp_api.sales_order_get(so_id text).
|
|
|
+ """
|
|
|
+ assert DB_POOL is not None
|
|
|
+ sql = "SELECT erp_api.sales_order_get($1)"
|
|
|
+ try:
|
|
|
+ row = await DB_POOL.fetchrow(sql, so_id)
|
|
|
+ except asyncpg.PostgresError as e:
|
|
|
+ raise HTTPException(status_code=404, detail=str(e))
|
|
|
+
|
|
|
+ if row is None or row[0] is None:
|
|
|
+ raise HTTPException(status_code=404, detail=f"Sales order not found: {so_id}")
|
|
|
+
|
|
|
+ full: dict = json.loads(row[0]) if isinstance(row[0], str) else row[0]
|
|
|
+
|
|
|
+ # Filter header columns
|
|
|
+ result = pick(full, COLUMNS_ORDER_HEADER)
|
|
|
+
|
|
|
+ # Filter Items array
|
|
|
+ items_raw: list[dict] = full.get("Items") or []
|
|
|
+ result["Items"] = [pick(item, COLUMNS_ORDER_ITEM) for item in items_raw]
|
|
|
+
|
|
|
+ # Include Notes array as-is (already minimal)
|
|
|
+ result["Notes"] = full.get("Notes") or []
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+@app.post("/stock", dependencies=[Depends(verify_api_key)])
|
|
|
+async def stock_search(req: StockRequest) -> list[dict[str, Any]]:
|
|
|
+ """
|
|
|
+ Search available stock (pallet level).
|
|
|
+ Calls erp_api.stock_lists(conditions jsonb, limit int).
|
|
|
+ """
|
|
|
+ conditions: dict[str, Any] = {}
|
|
|
+ if req.model:
|
|
|
+ conditions["Model"] = req.model
|
|
|
+ if req.warehouse_cid:
|
|
|
+ conditions["WarehouseCID"] = req.warehouse_cid
|
|
|
+
|
|
|
+ rows = await call_list_fn("stock_lists", conditions, req.limit)
|
|
|
+ return filter_list(rows, COLUMNS_STOCK)
|
|
|
+
|
|
|
+
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+# Dev runner (not used in production — uvicorn is started by systemd)
|
|
|
+# ──────────────────────────────────────────────────────────────────────────────
|
|
|
+if __name__ == "__main__":
|
|
|
+ import uvicorn
|
|
|
+ uvicorn.run("main:app", host="127.0.0.1", port=PORT, reload=False)
|