""" Homelegance ERP Bridge — FastAPI service Calls erp_api.* stored functions in the ERP PostgreSQL database. Listens on 127.0.0.1:8080 (internal only, never exposed to the internet). Selected columns per endpoint are defined in COLUMNS_* dicts below. Adjust those dicts to add / remove fields without touching query logic. """ from __future__ import annotations import json import logging import os from contextlib import asynccontextmanager from typing import Any import asyncpg from dotenv import load_dotenv from fastapi import Depends, FastAPI, Header, HTTPException, Path from pydantic import BaseModel, Field # ────────────────────────────────────────────────────────────────────────────── # Config # ────────────────────────────────────────────────────────────────────────────── load_dotenv() ERP_DATABASE_URL: str = os.environ["ERP_DATABASE_URL"] ERP_API_KEY: str = os.environ["ERP_API_KEY"] PORT: int = int(os.getenv("PORT", "8080")) logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") log = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────────────────────── # Selected-column configuration # Each key is the JSON key returned by the ERP stored function. # Set the value to True to include it in the API response. # ────────────────────────────────────────────────────────────────────────────── COLUMNS_CATALOG: set[str] = { "Model", "Manufacturer", "Description", "Type", # product category "Department", "Color", "UOM", "Status", "StockQTY", "EstimatedValue", } COLUMNS_CONTACTS: set[str] = { "ContactID", "Company", "ContactName1", "Phone1", "Email1", "Address1", "City", "State", "Country", "Sales1", # primary sales rep "AR_CreditHold", "Active", "Terms", "PriceType", } COLUMNS_ORDERS_LIST: set[str] = { "SOID", "OrderType", "Status", "SubStatus", "CustomerName", "CustomerCID", "POID", "Model", "Quantity", "UnitPrice", "TotalPrice", "InvoiceNo", "InvoiceDate", "TrackingNumber", "ETD", "ShipVia", "Carrier", "SODate", "ScheduleDate", } # Columns for the sales_order_get header (flat fields) COLUMNS_ORDER_HEADER: set[str] = { "SOID", "POID", "OrderType", "Status", "StatusDesc", "CustomerName", "CustomerCID", "ReceiverName", "ReceiverAddress1", "ReceiverCity", "ReceiverState", "ReceiverCountry", "ReceiverZipcode", "ShipVia", "Shipper", "ScheduleDate", "ArrivalDate", "Notes", "InternalNote", "ModifyTime", } # Columns for each item inside the Items[] array COLUMNS_ORDER_ITEM: set[str] = { "LineNo", "ItemID", "Model", "Manufacturer", "Description", "Quantity", "UnitPrice", "TotalPrice", "Status", "StatusDesc", "ETD", "ItemNote", "WarehouseID", "InvoiceQuantity", "TransactedQuantity", } COLUMNS_STOCK: set[str] = { "Model", "Manufacturer", "Description", "Quantity", "PalletID", "WarehouseCID", "SubInv", "HoldByPalletID", "HoldByRef", "B_ETA", "A_ETA", "SalesOrderID", } # ────────────────────────────────────────────────────────────────────────────── # DB Pool (lifespan) # ────────────────────────────────────────────────────────────────────────────── DB_POOL: asyncpg.Pool | None = None @asynccontextmanager async def lifespan(app: FastAPI): global DB_POOL log.info("Connecting to ERP database …") DB_POOL = await asyncpg.create_pool( ERP_DATABASE_URL, min_size=2, max_size=10, command_timeout=30, statement_cache_size=0, # required when using pgBouncer in transaction mode ) log.info("ERP database pool ready.") yield if DB_POOL: await DB_POOL.close() log.info("ERP database pool closed.") app = FastAPI( title="Homelegance ERP Bridge", version="1.0.0", docs_url=None, # disable Swagger UI in production redoc_url=None, lifespan=lifespan, ) # ────────────────────────────────────────────────────────────────────────────── # Auth dependency # ────────────────────────────────────────────────────────────────────────────── def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")) -> None: if x_api_key != ERP_API_KEY: raise HTTPException(status_code=401, detail="Invalid API key") # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── def filter_keys(obj: dict, allowed: set[str]) -> dict: """Return a new dict containing only the allowed keys.""" return {k: v for k, v in obj.items() if k in allowed} def filter_list(rows: list[dict], allowed: set[str]) -> list[dict]: return [filter_keys(r, allowed) for r in rows] async def call_list_fn(fn_name: str, conditions: dict, limit: int) -> list[dict]: """Call an erp_api list stored function that accepts (jsonb, int) and returns json.""" assert DB_POOL is not None sql = f"SELECT erp_api.{fn_name}($1::jsonb, $2)" row = await DB_POOL.fetchrow(sql, json.dumps(conditions), limit) if row is None: return [] result = row[0] if result is None: return [] data = json.loads(result) if isinstance(result, str) else result return data if isinstance(data, list) else [] def pick(obj: dict | None, keys: set[str]) -> dict: if not obj: return {} return {k: obj[k] for k in keys if k in obj} # ────────────────────────────────────────────────────────────────────────────── # Request / response models # ────────────────────────────────────────────────────────────────────────────── class CatalogRequest(BaseModel): model: str | None = Field(None, description="Partial model number (ILIKE search)") manufacturer: str | None = Field(None, description="Manufacturer filter") description: str | None = Field(None, description="Description keyword") category: str | None = Field(None, description="Product category / Type") status: str | None = Field(None, description="Item status code (e.g. 'A' for active)") limit: int = Field(20, ge=1, le=200) class ContactsRequest(BaseModel): contact_id: str | None = Field(None, description="Exact ContactID") company: str | None = Field(None, description="Company name (partial match)") name: str | None = Field(None, description="Contact person name") limit: int = Field(20, ge=1, le=200) class OrdersListRequest(BaseModel): so_id: str | None = Field(None, description="Sales Order ID (partial match)") customer_name: str | None = Field(None, description="Customer name (partial match)") customer_cid: str | None = Field(None, description="Customer CID (exact)") po_id: str | None = Field(None, description="PO number (partial match)") status: str | None = Field(None, description="Order status code") limit: int = Field(20, ge=1, le=200) class StockRequest(BaseModel): model: str | None = Field(None, description="Model number (partial match)") warehouse_cid: str | None = Field(None, description="Warehouse code") limit: int = Field(50, ge=1, le=500) # ────────────────────────────────────────────────────────────────────────────── # Endpoints # ────────────────────────────────────────────────────────────────────────────── @app.get("/health") async def health() -> dict: """Simple liveness probe — no auth required.""" return {"status": "ok", "service": "erp-bridge"} @app.post("/catalog", dependencies=[Depends(verify_api_key)]) async def catalog_search(req: CatalogRequest) -> list[dict[str, Any]]: """ Search the product catalog. Calls erp_api.catalog_lists(conditions jsonb, limit int). """ conditions: dict[str, Any] = {} if req.model: conditions["Model"] = req.model if req.manufacturer: conditions["Manufacturer"] = req.manufacturer if req.description: conditions["Description"] = req.description if req.category: conditions["Type"] = req.category if req.status: conditions["Status"] = req.status rows = await call_list_fn("catalog_lists", conditions, req.limit) return filter_list(rows, COLUMNS_CATALOG) @app.post("/contacts", dependencies=[Depends(verify_api_key)]) async def contacts_search(req: ContactsRequest) -> list[dict[str, Any]]: """ Search customers / contacts. Calls erp_api.contact_lists(conditions jsonb, limit int). """ conditions: dict[str, Any] = {} if req.contact_id: conditions["ContactID"] = req.contact_id if req.company: conditions["Company"] = req.company if req.name: conditions["ContactName"] = req.name rows = await call_list_fn("contact_lists", conditions, req.limit) return filter_list(rows, COLUMNS_CONTACTS) @app.post("/orders", dependencies=[Depends(verify_api_key)]) async def orders_list(req: OrdersListRequest) -> list[dict[str, Any]]: """ List sales orders (one row per line item). Calls erp_api.sales_orders_lists(conditions jsonb, limit int). """ conditions: dict[str, Any] = {} if req.so_id: conditions["SOID"] = req.so_id if req.customer_name: conditions["CustomerName"] = req.customer_name if req.customer_cid: conditions["CustomerCID"] = req.customer_cid if req.po_id: conditions["POID"] = req.po_id if req.status: conditions["Status"] = req.status rows = await call_list_fn("sales_orders_lists", conditions, req.limit) return filter_list(rows, COLUMNS_ORDER_HEADER) @app.get("/orders/{so_id}", dependencies=[Depends(verify_api_key)]) async def order_get(so_id: str = Path(..., description="Sales Order ID")) -> dict[str, Any]: """ Retrieve a single sales order with nested Items and Notes. Calls erp_api.sales_order_get(so_id text). """ assert DB_POOL is not None sql = "SELECT erp_api.sales_order_get($1)" try: row = await DB_POOL.fetchrow(sql, so_id) except asyncpg.PostgresError as e: raise HTTPException(status_code=404, detail=str(e)) if row is None or row[0] is None: raise HTTPException(status_code=404, detail=f"Sales order not found: {so_id}") full: dict = json.loads(row[0]) if isinstance(row[0], str) else row[0] # Filter header columns result = pick(full, COLUMNS_ORDER_HEADER) # Filter Items array items_raw: list[dict] = full.get("Items") or [] result["Items"] = [pick(item, COLUMNS_ORDER_ITEM) for item in items_raw] # Include Notes array as-is (already minimal) result["Notes"] = full.get("Notes") or [] return result @app.post("/stock", dependencies=[Depends(verify_api_key)]) async def stock_search(req: StockRequest) -> list[dict[str, Any]]: """ Search available stock (pallet level). Calls erp_api.stock_lists(conditions jsonb, limit int). """ conditions: dict[str, Any] = {} if req.model: conditions["Model"] = req.model if req.warehouse_cid: conditions["WarehouseCID"] = req.warehouse_cid rows = await call_list_fn("stock_lists", conditions, req.limit) return filter_list(rows, COLUMNS_STOCK) # ────────────────────────────────────────────────────────────────────────────── # Dev runner (not used in production — uvicorn is started by systemd) # ────────────────────────────────────────────────────────────────────────────── if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="127.0.0.1", port=PORT, reload=False)