Documentation
ℹ️
Pre-installed on all nodes The elements library is installed at /opt/venv/lib/python3.13/site-packages/elements/ on every app node. Your services import from it directly — no pip install needed on production nodes.

Database — FastAPI Routes

In FastAPI routes, use Depends() injection. The connection pool is initialized once and shared across all requests.

# your_service/core/databases.py
from elements.db.db_dependencies import create_db_dependency

get_db      = create_db_dependency("your_db", use_replica=False)  # writes
get_read_db = create_db_dependency("your_db", use_replica=True)   # reads
# your_service/api/routes.py
from fastapi import Depends
from ..core.databases import get_db, get_read_db

@router.post("/items")
async def create_item(data: ItemSchema, db=Depends(get_db)):
    await db.execute(
        "INSERT INTO items (id, name) VALUES ($1, $2)",
        uuid.uuid4(), data.name
    )

@router.get("/items")
async def list_items(db=Depends(get_read_db)):
    return await db.fetch("SELECT * FROM items ORDER BY created_at DESC")

Database — Async Workers

In background workers (not FastAPI), use the async for generator pattern. The connection is scoped to the block and released automatically.

from elements.db.db_dependencies import get_database_connection

async for conn in get_database_connection("your_db", use_replica=False):
    await conn.execute("UPDATE items SET status=$1 WHERE id=$2", "done", item_id)
    rows = await conn.fetch("SELECT * FROM queue WHERE status='pending'")

Database — Connection Details

PoolEndpointPurpose
primarymanagement:5432Writes → HAProxy → PgBouncer → PostgreSQL primary
replicamanagement:5433Reads → HAProxy → PgBouncer → PostgreSQL replica

Pools are initialized lazily on first use and shared for the process lifetime. Pool sizes are configured from environment (default: 5–20 connections for primary, half that for replica).

Redis — FastAPI Routes

from elements.redis.redis_dependencies import get_redis

@router.post("/cache")
async def cache_item(redis=Depends(get_redis)):
    await redis.set("key", "value", ex=3600)
    value = await redis.get("key")
    await redis.lpush("my:queue", json.dumps(payload))

Redis — Async Workers

from elements.redis.redis_dependencies import init_discovery_redis

class MyWorker:
    async def start(self):
        self.redis_manager = await init_discovery_redis()

    async def process(self):
        async with self.redis_manager.get_connection() as redis:
            item = await redis.brpop("my:queue", timeout=5)
            if item:
                await self.handle(json.loads(item[1]))

FastAPI App Factory

Use create_app() from elements.fastapi to get a pre-configured FastAPI instance with all middleware, CORS, error handlers, and dependency injection wired up.

# your_service/main.py
from elements.fastapi.app import create_app
from contextlib import asynccontextmanager
from .core.databases import init_databases, shutdown_databases
from .api import router
from elements.internal.state_check import validate, LicenseValidationError

@asynccontextmanager
async def lifespan(app):
    try:
        validate()  # runs once, result cached for process lifetime
    except LicenseValidationError as e:
        raise SystemExit(f"License validation failed: {e}")
    await init_databases()
    yield
    await shutdown_databases()

app = create_app(lifespan=lifespan)
app.include_router(router, prefix="/api/your-service")

Logging

Structured JSON logging. All logs are written to /var/log/xeroops/{service}-{component}.log, uploaded to S3 by cron, and indexed in management Redis for full-text search via the ops dashboard.

from elements.logger import get_logger

logger = get_logger(component="my_component")

logger.info("User created", extra={"user_id": str(user_id)})
logger.error("Payment failed", extra={"intent_id": intent_id, "error": str(e)})
logger.debug("Cache hit", extra={"key": cache_key})

Log output (JSON, one line per entry):

{"timestamp": "2026-05-13T10:14:29Z", "level": "info", "service": "your_service",
 "component": "my_component", "hostname": "app1", "message": "User created",
 "user_id": "6eb083f1-..."}

Email via SES

from elements.email.email_service import send_email

# Plain email
await send_email(
    to_email="user@example.com",
    subject="Welcome to Acme",
    html_body="<p>Thanks for signing up!</p>",
    from_email="noreply@yourdomain.com",
)

# Email with attachment
await send_email(
    to_email="user@example.com",
    subject="Your license",
    html_body=html,
    from_email="noreply@yourdomain.com",
    attachments=[{
        "filename": "license.json",
        "content": json.dumps(license_data),
        "content_type": "application/json",
    }]
)

Config Manager

All environment variables are available through ConfigManager — a singleton backed by lru_cache that reads /etc/environment once.

from elements.config.manager import get_config_manager

config = get_config_manager()

# DB config
config.db_config.haproxy_host        # "management"
config.db_config.haproxy_primary_port # 5432
config.db_config.user                # "postgres"
config.db_config.password            # from PGPASSWORD env

# App config
config.app_env      # "prod"
config.aws_region   # "us-east-1"
config.s3_bucket    # "uploads-123456789012"

License Validation

Call validate() once in your service's lifespan startup. It's decorated with @lru_cache — the IMDSv2 call and signature verification happen exactly once per process. All subsequent calls return the cached result instantly.

from elements.internal.state_check import validate, LicenseValidationError

# In lifespan startup — raises LicenseValidationError on failure
try:
    validate()
except LicenseValidationError as e:
    raise SystemExit(f"License check failed: {e}")
⚠️
Do not check in module-level code Call validate() in your lifespan function, not at import time. Module-level calls run at import, causing confusing failures and redundant network calls during testing.

Building & Deploying Your Service

# your_service/pyproject.toml defines the wheel
# build-and-deploy.sh does the rest:

cd your-service-repo/
./build-and-deploy.sh

  Building wheel...        OK
  Uploading to S3...       OK
  Publishing to Redis...   OK
  [app1] Installing...     OK
  [app2] Installing...     OK
  Deployment complete.

The deployment subscriber on each app node picks up the Redis message, downloads the wheel from S3, installs it, and restarts the service — all in under 30 seconds, in parallel across all nodes.