| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- """
- Homelegance ERP Bridge — FastAPI service
- Calls erp_api.* stored functions in the ERP PostgreSQL database.
- Listens on 127.0.0.1:8080 (internal only, never exposed to the internet).
- Selected columns per endpoint are defined in COLUMNS_* dicts below.
- Adjust those dicts to add / remove fields without touching query logic.
- """
- from __future__ import annotations
- import json
- import logging
- import os
- from contextlib import asynccontextmanager
- from typing import Any
- import asyncpg
- from dotenv import load_dotenv
- from fastapi import Depends, FastAPI, Header, HTTPException, Path
- from pydantic import BaseModel, Field
- # ──────────────────────────────────────────────────────────────────────────────
- # Config
- # ──────────────────────────────────────────────────────────────────────────────
- load_dotenv()
- ERP_DATABASE_URL: str = os.environ["ERP_DATABASE_URL"]
- ERP_API_KEY: str = os.environ["ERP_API_KEY"]
- PORT: int = int(os.getenv("PORT", "8080"))
- logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
- log = logging.getLogger(__name__)
- # ──────────────────────────────────────────────────────────────────────────────
- # Selected-column configuration
- # Each key is the JSON key returned by the ERP stored function.
- # Set the value to True to include it in the API response.
- # ──────────────────────────────────────────────────────────────────────────────
- COLUMNS_CATALOG: set[str] = {
- "Model",
- "Manufacturer",
- "Description",
- "Type", # product category
- "Department",
- "Color",
- "UOM",
- "Status",
- "StockQTY",
- "EstimatedValue",
- }
- COLUMNS_CONTACTS: set[str] = {
- "ContactID",
- "Company",
- "ContactName1",
- "Phone1",
- "Email1",
- "Address1",
- "City",
- "State",
- "Country",
- "Sales1", # primary sales rep
- "AR_CreditHold",
- "Active",
- "Terms",
- "PriceType",
- }
- COLUMNS_ORDERS_LIST: set[str] = {
- "SOID",
- "OrderType",
- "Status",
- "SubStatus",
- "CustomerName",
- "CustomerCID",
- "POID",
- "Model",
- "Quantity",
- "UnitPrice",
- "TotalPrice",
- "InvoiceNo",
- "InvoiceDate",
- "TrackingNumber",
- "ETD",
- "ShipVia",
- "Carrier",
- "SODate",
- "ScheduleDate",
- }
- # Columns for the sales_order_get header (flat fields)
- COLUMNS_ORDER_HEADER: set[str] = {
- "SOID",
- "POID",
- "OrderType",
- "Status",
- "StatusDesc",
- "CustomerName",
- "CustomerCID",
- "ReceiverName",
- "ReceiverAddress1",
- "ReceiverCity",
- "ReceiverState",
- "ReceiverCountry",
- "ReceiverZipcode",
- "ShipVia",
- "Shipper",
- "ScheduleDate",
- "ArrivalDate",
- "Notes",
- "InternalNote",
- "ModifyTime",
- }
- # Columns for each item inside the Items[] array
- COLUMNS_ORDER_ITEM: set[str] = {
- "LineNo",
- "ItemID",
- "Model",
- "Manufacturer",
- "Description",
- "Quantity",
- "UnitPrice",
- "TotalPrice",
- "Status",
- "StatusDesc",
- "ETD",
- "ItemNote",
- "WarehouseID",
- "InvoiceQuantity",
- "TransactedQuantity",
- }
- COLUMNS_STOCK: set[str] = {
- "Model",
- "Manufacturer",
- "Description",
- "Quantity",
- "PalletID",
- "WarehouseCID",
- "SubInv",
- "HoldByPalletID",
- "HoldByRef",
- "B_ETA",
- "A_ETA",
- "SalesOrderID",
- }
- # ──────────────────────────────────────────────────────────────────────────────
- # DB Pool (lifespan)
- # ──────────────────────────────────────────────────────────────────────────────
- DB_POOL: asyncpg.Pool | None = None
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- global DB_POOL
- log.info("Connecting to ERP database …")
- DB_POOL = await asyncpg.create_pool(
- ERP_DATABASE_URL,
- min_size=2,
- max_size=10,
- command_timeout=30,
- statement_cache_size=0, # required when using pgBouncer in transaction mode
- )
- log.info("ERP database pool ready.")
- yield
- if DB_POOL:
- await DB_POOL.close()
- log.info("ERP database pool closed.")
- app = FastAPI(
- title="Homelegance ERP Bridge",
- version="1.0.0",
- docs_url=None, # disable Swagger UI in production
- redoc_url=None,
- lifespan=lifespan,
- )
- # ──────────────────────────────────────────────────────────────────────────────
- # Auth dependency
- # ──────────────────────────────────────────────────────────────────────────────
- def verify_api_key(x_api_key: str = Header(..., alias="X-API-Key")) -> None:
- if x_api_key != ERP_API_KEY:
- raise HTTPException(status_code=401, detail="Invalid API key")
- # ──────────────────────────────────────────────────────────────────────────────
- # Helpers
- # ──────────────────────────────────────────────────────────────────────────────
- def filter_keys(obj: dict, allowed: set[str]) -> dict:
- """Return a new dict containing only the allowed keys."""
- return {k: v for k, v in obj.items() if k in allowed}
- def filter_list(rows: list[dict], allowed: set[str]) -> list[dict]:
- return [filter_keys(r, allowed) for r in rows]
- async def call_list_fn(fn_name: str, conditions: dict, limit: int) -> list[dict]:
- """Call an erp_api list stored function that accepts (jsonb, int) and returns json."""
- assert DB_POOL is not None
- sql = f"SELECT erp_api.{fn_name}($1::jsonb, $2)"
- row = await DB_POOL.fetchrow(sql, json.dumps(conditions), limit)
- if row is None:
- return []
- result = row[0]
- if result is None:
- return []
- data = json.loads(result) if isinstance(result, str) else result
- return data if isinstance(data, list) else []
- def pick(obj: dict | None, keys: set[str]) -> dict:
- if not obj:
- return {}
- return {k: obj[k] for k in keys if k in obj}
- # ──────────────────────────────────────────────────────────────────────────────
- # Request / response models
- # ──────────────────────────────────────────────────────────────────────────────
- class CatalogRequest(BaseModel):
- model: str | None = Field(None, description="Partial model number (ILIKE search)")
- manufacturer: str | None = Field(None, description="Manufacturer filter")
- description: str | None = Field(None, description="Description keyword")
- category: str | None = Field(None, description="Product category / Type")
- status: str | None = Field(None, description="Item status code (e.g. 'A' for active)")
- limit: int = Field(20, ge=1, le=200)
- class ContactsRequest(BaseModel):
- contact_id: str | None = Field(None, description="Exact ContactID")
- company: str | None = Field(None, description="Company name (partial match)")
- name: str | None = Field(None, description="Contact person name")
- limit: int = Field(20, ge=1, le=200)
- class OrdersListRequest(BaseModel):
- so_id: str | None = Field(None, description="Sales Order ID (partial match)")
- customer_name: str | None = Field(None, description="Customer name (partial match)")
- customer_cid: str | None = Field(None, description="Customer CID (exact)")
- po_id: str | None = Field(None, description="PO number (partial match)")
- status: str | None = Field(None, description="Order status code")
- limit: int = Field(20, ge=1, le=200)
- class StockRequest(BaseModel):
- model: str | None = Field(None, description="Model number (partial match)")
- warehouse_cid: str | None = Field(None, description="Warehouse code")
- limit: int = Field(50, ge=1, le=500)
- # ──────────────────────────────────────────────────────────────────────────────
- # Endpoints
- # ──────────────────────────────────────────────────────────────────────────────
- @app.get("/health")
- async def health() -> dict:
- """Simple liveness probe — no auth required."""
- return {"status": "ok", "service": "erp-bridge"}
- @app.post("/catalog", dependencies=[Depends(verify_api_key)])
- async def catalog_search(req: CatalogRequest) -> list[dict[str, Any]]:
- """
- Search the product catalog.
- Calls erp_api.catalog_lists(conditions jsonb, limit int).
- """
- conditions: dict[str, Any] = {}
- if req.model:
- conditions["Model"] = req.model
- if req.manufacturer:
- conditions["Manufacturer"] = req.manufacturer
- if req.description:
- conditions["Description"] = req.description
- if req.category:
- conditions["Type"] = req.category
- if req.status:
- conditions["Status"] = req.status
- rows = await call_list_fn("catalog_lists", conditions, req.limit)
- return filter_list(rows, COLUMNS_CATALOG)
- @app.post("/contacts", dependencies=[Depends(verify_api_key)])
- async def contacts_search(req: ContactsRequest) -> list[dict[str, Any]]:
- """
- Search customers / contacts.
- Calls erp_api.contact_lists(conditions jsonb, limit int).
- """
- conditions: dict[str, Any] = {}
- if req.contact_id:
- conditions["ContactID"] = req.contact_id
- if req.company:
- conditions["Company"] = req.company
- if req.name:
- conditions["ContactName"] = req.name
- rows = await call_list_fn("contact_lists", conditions, req.limit)
- return filter_list(rows, COLUMNS_CONTACTS)
- @app.post("/orders", dependencies=[Depends(verify_api_key)])
- async def orders_list(req: OrdersListRequest) -> list[dict[str, Any]]:
- """
- List sales orders (one row per line item).
- Calls erp_api.sales_orders_lists(conditions jsonb, limit int).
- """
- conditions: dict[str, Any] = {}
- if req.so_id:
- conditions["SOID"] = req.so_id
- if req.customer_name:
- conditions["CustomerName"] = req.customer_name
- if req.customer_cid:
- conditions["CustomerCID"] = req.customer_cid
- if req.po_id:
- conditions["POID"] = req.po_id
- if req.status:
- conditions["Status"] = req.status
- rows = await call_list_fn("sales_orders_lists", conditions, req.limit)
- return filter_list(rows, COLUMNS_ORDER_HEADER)
- @app.get("/orders/{so_id}", dependencies=[Depends(verify_api_key)])
- async def order_get(so_id: str = Path(..., description="Sales Order ID")) -> dict[str, Any]:
- """
- Retrieve a single sales order with nested Items and Notes.
- Calls erp_api.sales_order_get(so_id text).
- """
- assert DB_POOL is not None
- sql = "SELECT erp_api.sales_order_get($1)"
- try:
- row = await DB_POOL.fetchrow(sql, so_id)
- except asyncpg.PostgresError as e:
- raise HTTPException(status_code=404, detail=str(e))
- if row is None or row[0] is None:
- raise HTTPException(status_code=404, detail=f"Sales order not found: {so_id}")
- full: dict = json.loads(row[0]) if isinstance(row[0], str) else row[0]
- # Filter header columns
- result = pick(full, COLUMNS_ORDER_HEADER)
- # Filter Items array
- items_raw: list[dict] = full.get("Items") or []
- result["Items"] = [pick(item, COLUMNS_ORDER_ITEM) for item in items_raw]
- # Include Notes array as-is (already minimal)
- result["Notes"] = full.get("Notes") or []
- return result
- @app.post("/stock", dependencies=[Depends(verify_api_key)])
- async def stock_search(req: StockRequest) -> list[dict[str, Any]]:
- """
- Search available stock (pallet level).
- Calls erp_api.stock_lists(conditions jsonb, limit int).
- """
- conditions: dict[str, Any] = {}
- if req.model:
- conditions["Model"] = req.model
- if req.warehouse_cid:
- conditions["WarehouseCID"] = req.warehouse_cid
- rows = await call_list_fn("stock_lists", conditions, req.limit)
- return filter_list(rows, COLUMNS_STOCK)
- # ──────────────────────────────────────────────────────────────────────────────
- # Dev runner (not used in production — uvicorn is started by systemd)
- # ──────────────────────────────────────────────────────────────────────────────
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run("main:app", host="127.0.0.1", port=PORT, reload=False)
|