elements SDK
Python library pre-wired to your XeroOps infrastructure. Database connections, Redis, FastAPI boilerplate, email, and structured logging — all pre-configured to your deployed cluster.
/opt/venv/lib/python3.13/site-packages/elements/ on every app node. Your services import from it directly — no pip install needed on production nodes.
Database — FastAPI Routes
In FastAPI routes, use Depends() injection. The connection pool is initialized once and shared across all requests.
# your_service/core/databases.py
from elements.db.db_dependencies import create_db_dependency
get_db = create_db_dependency("your_db", use_replica=False) # writes
get_read_db = create_db_dependency("your_db", use_replica=True) # reads
# your_service/api/routes.py
from fastapi import Depends
from ..core.databases import get_db, get_read_db
@router.post("/items")
async def create_item(data: ItemSchema, db=Depends(get_db)):
await db.execute(
"INSERT INTO items (id, name) VALUES ($1, $2)",
uuid.uuid4(), data.name
)
@router.get("/items")
async def list_items(db=Depends(get_read_db)):
return await db.fetch("SELECT * FROM items ORDER BY created_at DESC")
Database — Async Workers
In background workers (not FastAPI), use the async for generator pattern. The connection is scoped to the block and released automatically.
from elements.db.db_dependencies import get_database_connection
async for conn in get_database_connection("your_db", use_replica=False):
await conn.execute("UPDATE items SET status=$1 WHERE id=$2", "done", item_id)
rows = await conn.fetch("SELECT * FROM queue WHERE status='pending'")
Database — Connection Details
| Pool | Endpoint | Purpose |
|---|---|---|
| primary | management:5432 | Writes → HAProxy → PgBouncer → PostgreSQL primary |
| replica | management:5433 | Reads → HAProxy → PgBouncer → PostgreSQL replica |
Pools are initialized lazily on first use and shared for the process lifetime. Pool sizes are configured from environment (default: 5–20 connections for primary, half that for replica).
Redis — FastAPI Routes
from elements.redis.redis_dependencies import get_redis
@router.post("/cache")
async def cache_item(redis=Depends(get_redis)):
await redis.set("key", "value", ex=3600)
value = await redis.get("key")
await redis.lpush("my:queue", json.dumps(payload))
Redis — Async Workers
from elements.redis.redis_dependencies import init_discovery_redis
class MyWorker:
async def start(self):
self.redis_manager = await init_discovery_redis()
async def process(self):
async with self.redis_manager.get_connection() as redis:
item = await redis.brpop("my:queue", timeout=5)
if item:
await self.handle(json.loads(item[1]))
FastAPI App Factory
Use create_app() from elements.fastapi to get a pre-configured FastAPI instance with all middleware, CORS, error handlers, and dependency injection wired up.
# your_service/main.py
from elements.fastapi.app import create_app
from contextlib import asynccontextmanager
from .core.databases import init_databases, shutdown_databases
from .api import router
from elements.internal.state_check import validate, LicenseValidationError
@asynccontextmanager
async def lifespan(app):
try:
validate() # runs once, result cached for process lifetime
except LicenseValidationError as e:
raise SystemExit(f"License validation failed: {e}")
await init_databases()
yield
await shutdown_databases()
app = create_app(lifespan=lifespan)
app.include_router(router, prefix="/api/your-service")
Logging
Structured JSON logging. All logs are written to /var/log/xeroops/{service}-{component}.log, uploaded to S3 by cron, and indexed in management Redis for full-text search via the ops dashboard.
from elements.logger import get_logger
logger = get_logger(component="my_component")
logger.info("User created", extra={"user_id": str(user_id)})
logger.error("Payment failed", extra={"intent_id": intent_id, "error": str(e)})
logger.debug("Cache hit", extra={"key": cache_key})
Log output (JSON, one line per entry):
{"timestamp": "2026-05-13T10:14:29Z", "level": "info", "service": "your_service",
"component": "my_component", "hostname": "app1", "message": "User created",
"user_id": "6eb083f1-..."}
Email via SES
from elements.email.email_service import send_email
# Plain email
await send_email(
to_email="user@example.com",
subject="Welcome to Acme",
html_body="<p>Thanks for signing up!</p>",
from_email="noreply@yourdomain.com",
)
# Email with attachment
await send_email(
to_email="user@example.com",
subject="Your license",
html_body=html,
from_email="noreply@yourdomain.com",
attachments=[{
"filename": "license.json",
"content": json.dumps(license_data),
"content_type": "application/json",
}]
)
Config Manager
All environment variables are available through ConfigManager — a singleton backed by lru_cache that reads /etc/environment once.
from elements.config.manager import get_config_manager
config = get_config_manager()
# DB config
config.db_config.haproxy_host # "management"
config.db_config.haproxy_primary_port # 5432
config.db_config.user # "postgres"
config.db_config.password # from PGPASSWORD env
# App config
config.app_env # "prod"
config.aws_region # "us-east-1"
config.s3_bucket # "uploads-123456789012"
License Validation
Call validate() once in your service's lifespan startup. It's decorated with @lru_cache — the IMDSv2 call and signature verification happen exactly once per process. All subsequent calls return the cached result instantly.
from elements.internal.state_check import validate, LicenseValidationError
# In lifespan startup — raises LicenseValidationError on failure
try:
validate()
except LicenseValidationError as e:
raise SystemExit(f"License check failed: {e}")
validate() in your lifespan function, not at import time. Module-level calls run at import, causing confusing failures and redundant network calls during testing.
Building & Deploying Your Service
# your_service/pyproject.toml defines the wheel
# build-and-deploy.sh does the rest:
cd your-service-repo/
./build-and-deploy.sh
Building wheel... OK
Uploading to S3... OK
Publishing to Redis... OK
[app1] Installing... OK
[app2] Installing... OK
Deployment complete.
The deployment subscriber on each app node picks up the Redis message, downloads the wheel from S3, installs it, and restarts the service — all in under 30 seconds, in parallel across all nodes.