FastAPI Guide
44

Gunicorn + Uvicorn Workers

Production

Running uvicorn main:app directly starts a single process with a single event loop — fine for development, not for production. In production, Gunicorn acts as a process manager that spawns multiple Uvicorn workers, each with its own event loop and Python interpreter. This uses all CPU cores and provides fault isolation (a crashed worker is restarted automatically). The key: use Gunicorn with UvicornWorker class, not the default sync worker.

# ── Direct command ─────────────────────────────────────────────────────────
# -w = number of worker processes
# -k = worker class (UvicornWorker = async ASGI worker)
# --bind = address:port
gunicorn main:app \
  -w 4 \
  -k uvicorn.workers.UvicornWorker \
  --bind 0.0.0.0:8000 \
  --timeout 60 \
  --graceful-timeout 30 \
  --keep-alive 5

# Worker count rule of thumb:
# CPU-bound: workers = CPU_cores + 1
# I/O-bound (typical FastAPI): workers = CPU_cores * 2 + 1
# With 4 cores → 4*2+1 = 9 workers for an I/O-heavy API
# gunicorn.conf.py — configuration file (cleaner than command-line flags)
import multiprocessing

# Worker settings
worker_class = "uvicorn.workers.UvicornWorker"
workers = multiprocessing.cpu_count() * 2 + 1
threads = 1       # Uvicorn workers are async — threads add no benefit

# Networking
bind = "0.0.0.0:8000"
backlog = 2048    # max pending connections before refusing

# Timeouts (seconds)
timeout = 60           # kill worker if it doesn't respond in 60s
graceful_timeout = 30  # wait 30s for in-flight requests on shutdown
keepalive = 5          # seconds to keep idle connections open

# Process management
max_requests = 1000        # restart worker after 1000 requests (prevents leaks)
max_requests_jitter = 100  # +/- 100 random jitter (avoids all workers restarting simultaneously)
preload_app = True         # load app code once in master, copy-on-write to workers

# Logging
accesslog = "-"     # log to stdout (captured by Docker/K8s)
errorlog  = "-"
loglevel  = "info"

def on_starting(server):
    print("Gunicorn starting")

def worker_init(worker):
    print(f"Worker {worker.pid} initialized")

def worker_exit(worker, server):
    print(f"Worker {worker.pid} exiting")
📖 Concept Breakdown
UvicornWorker
A Gunicorn worker class that runs Uvicorn’s ASGI event loop. Enables async FastAPI code inside Gunicorn’s process management framework.
workers formula
CPU * 2 + 1 for I/O-heavy apps. Each worker has its own event loop so this isn’t the same as threads — each handles many concurrent requests.
max_requests jitter
Without jitter, all workers restart at the same count, causing a brief traffic cliff. Jitter spreads restarts over a range of requests.
preload_app=True
App code is loaded in the master process and copy-on-write shared to workers — saves memory. But startup code (model loading) runs once, not per-worker.
graceful_timeout
On SIGTERM, Gunicorn stops accepting new connections and waits this many seconds for in-flight requests to finish before force-killing workers.

💬 Interview Tip

“Why use Gunicorn in front of Uvicorn?” — Gunicorn adds process management (auto-restart crashed workers, zero-downtime reload with kill -HUP, worker count control). Uvicorn alone can’t restart itself if it crashes. Together they give production resilience.

⚠ Gotcha

Never put shared mutable state (counters, caches) in Python global variables when using multiple workers — each worker is a separate process with its own memory. They don’t share globals. Use Redis or a database for shared state.

45

Docker & Docker Compose

Production

Docker packages your FastAPI app and all its dependencies into an immutable image that runs identically in development, staging, and production. A multi-stage Dockerfile uses one stage to install dependencies and a second stage that copies only what’s needed into the final image — keeping the image small and secure. Docker Compose orchestrates multiple containers (app + database + Redis) locally.

# ── Stage 1: build dependencies ───────────────────────────────────────────
FROM python:3.12-slim AS builder

WORKDIR /build

RUN pip install --upgrade pip

COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

# ── Stage 2: production image ──────────────────────────────────────────────
FROM python:3.12-slim AS production

# Create non-root user — never run as root in production
RUN addgroup --system app && adduser --system --group app

WORKDIR /app

COPY --from=builder /install /usr/local
COPY --chown=app:app . .

USER app

EXPOSE 8000

# Use exec form — PID 1 receives signals correctly
CMD ["gunicorn", "main:app", \
     "-k", "uvicorn.workers.UvicornWorker", \
     "-w", "4", \
     "--bind", "0.0.0.0:8000", \
     "--timeout", "60"]
# docker-compose.yml — local development stack
version: "3.9"

services:
  api:
    build:
      context: .
      target: production
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://user:pass@db:5432/mydb
      - REDIS_URL=redis://redis:6379
      - SECRET_KEY=dev-secret-key
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_started
    volumes:
      - .:/app
    command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload

  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
      POSTGRES_DB: mydb
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U user -d mydb"]
      interval: 5s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

volumes:
  postgres_data:
📖 Concept Breakdown
Multi-stage build
First stage installs tools (pip, gcc). Second stage copies only runtime artifacts. Final image has no build tools — smaller, fewer vulnerabilities.
Non-root user
Running as root in Docker means a container escape gives an attacker root on the host. Always create and use a dedicated user (adduser --system).
CMD exec form
CMD ["gunicorn", …] (JSON array) vs CMD gunicorn … (string). Exec form: process is PID 1, receives SIGTERM directly. Shell form: shell is PID 1, signals may not propagate.
depends_on + healthcheck
condition: service_healthy waits until the DB passes its health check before starting the API — prevents “connection refused” on startup.
Layer caching
Copy requirements.txt before application code — only if requirements change does the pip install layer rebuild. Saves minutes on every build.

⚠ Gotcha

Never copy .env files into Docker images — they end up in image history and can be extracted. Use --env-file at runtime (docker run --env-file .env) or mount secrets as files. Add .env to .dockerignore.

46

Environment Management

Production

Production apps need different configuration for different environments (dev has a local DB; prod has a managed cloud DB). The FastAPI-idiomatic approach uses Pydantic Settings (from pydantic-settings) to read configuration from environment variables with type validation and secret handling. A single cached settings instance is injected via Depends(get_settings), making it testable and overridable.

from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import PostgresDsn, RedisDsn, SecretStr, AnyHttpUrl
from functools import lru_cache
from typing import Literal

class Settings(BaseSettings):
    # ── App ────────────────────────────────────────────────────────────────
    app_name: str = "My FastAPI App"
    environment: Literal["development", "staging", "production"] = "development"
    debug: bool = False
    log_level: str = "INFO"

    # ── Security ───────────────────────────────────────────────────────────
    # SecretStr prevents the value from appearing in repr/logs
    secret_key: SecretStr
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    # ── Database ───────────────────────────────────────────────────────────
    # PostgresDsn validates the URL format
    database_url: PostgresDsn
    db_pool_size: int = 10
    db_max_overflow: int = 20

    # ── Redis (optional) ──────────────────────────────────────────────────
    redis_url: RedisDsn | None = None

    # ── CORS ───────────────────────────────────────────────────────────────
    allowed_origins: list[AnyHttpUrl] = ["http://localhost:3000"]

    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=False,
    )

    @property
    def is_production(self) -> bool:
        return self.environment == "production"

    @property
    def db_url_str(self) -> str:
        return str(self.database_url)

# lru_cache: settings are read ONCE and cached for the app lifetime.
@lru_cache
def get_settings() -> Settings:
    return Settings()

from fastapi import Depends
from typing import Annotated

SettingsDep = Annotated[Settings, Depends(get_settings)]

# Override in tests:
# def get_test_settings():
#     return Settings(database_url="postgresql+asyncpg://user:pass@localhost/test", ...)
# app.dependency_overrides[get_settings] = get_test_settings
📖 Concept Breakdown
BaseSettings
Pydantic model that reads from env vars automatically. Field database_url maps to env var DATABASE_URL. Validates types — wrong format raises on startup, not mid-request.
SecretStr
Wraps sensitive strings. str(settings.secret_key) returns **********. Access real value with settings.secret_key.get_secret_value(). Prevents accidental log exposure.
PostgresDsn
Validates that the URL is a valid PostgreSQL DSN. Raises ValidationError on startup if the format is wrong — fail fast, not at first query.
@lru_cache
Settings are parsed once. Without it, every request creates a new Settings() instance and re-reads env vars. lru_cache ensures a singleton.
env_file='.env'
For local development. In production, set real environment variables (K8s secrets, cloud provider env config) — don’t deploy .env files.

★ Key Info

Put .env in .gitignore. Check in an .env.example with placeholder values so teammates know which variables are needed. Never commit real secrets — they persist in git history even after deletion.

47

Structured JSON Logging

Production

Plain text logs are unqueryable in log aggregators (Datadog, CloudWatch, Loki). Structured logging emits JSON per line: {"level":"info","event":"user_created","user_id":5,"request_id":"abc"} — each field is searchable and filterable. structlog is the standard library for this in Python, with context binding that carries request-scoped data (request_id, user_id) through all log calls without manually passing them.

import structlog, logging, sys
from contextvars import ContextVar
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware

# ── Configure structlog ───────────────────────────────────────────────────
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,   # inject bound context vars
        structlog.stdlib.add_log_level,            # add "level" field
        structlog.stdlib.add_logger_name,          # add "logger" field
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.JSONRenderer(),       # final output as JSON
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
)

log = structlog.get_logger()

# ── Middleware: bind request context to all logs ──────────────────────────
class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        import uuid
        req_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))

        structlog.contextvars.bind_contextvars(
            request_id=req_id,
            method=request.method,
            path=str(request.url.path),
        )

        response = await call_next(request)
        log.info("request_complete", status_code=response.status_code)

        structlog.contextvars.clear_contextvars()
        response.headers["X-Request-ID"] = req_id
        return response

app = FastAPI()
app.add_middleware(LoggingMiddleware)

# ── Usage in route handlers ───────────────────────────────────────────────
@app.post("/users")
async def create_user(name: str, email: str):
    log.info("creating_user", name=name, email=email)
    user_id = 42
    structlog.contextvars.bind_contextvars(user_id=user_id)
    log.info("user_created", user_id=user_id)
    return {"id": user_id}

# Output (one JSON line per log — queryable in Datadog/CloudWatch):
# {"event":"creating_user","name":"Alice","email":"a@b.com",
#  "request_id":"abc-123","method":"POST","path":"/users",
#  "level":"info","timestamp":"2024-01-15T10:30:00Z"}
📖 Concept Breakdown
structlog
Structured logging library. Each log.info("event", key=value) call emits a JSON line. Fields are machine-readable and searchable in log aggregators.
contextvars integration
merge_contextvars processor automatically injects values bound with bind_contextvars() into every log call — no need to pass request_id to every function.
clear_contextvars()
Clears bound context after the request. Without this, a worker’s context bleeds into the next request if the worker is reused.
JSONRenderer
Final processor that converts the event dict to a JSON string. Other renderers: ConsoleRenderer (human-readable dev output), KeyValueRenderer.

💬 Interview Tip

“How do you trace a request through your logs?” — Assign a UUID to each request (from X-Request-ID header or generated), bind it to the logging context, and return it in the response header. Log aggregators can then filter all logs for a specific request_id value.

48

Distributed Tracing

Production

In a microservices architecture, a single user request may touch 5 different services. Logs tell you what happened inside each service, but not how they connect. Distributed tracing follows a request across service boundaries using a shared trace_id propagated in HTTP headers. OpenTelemetry is the open standard — FastAPIInstrumentor auto-instruments every route, creating spans with timing, HTTP method, status code, and error details, all sent to a tracing backend (Jaeger, Tempo, Datadog APM).

# pip install opentelemetry-sdk opentelemetry-instrumentation-fastapi
#             opentelemetry-exporter-otlp opentelemetry-instrumentation-sqlalchemy
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.resources import Resource
from fastapi import FastAPI

# ── 1. Configure the tracer ───────────────────────────────────────────────
resource = Resource.create({"service.name": "my-fastapi-service", "deployment.environment": "production"})

exporter = OTLPSpanExporter(endpoint="http://otel-collector:4317", insecure=True)

provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)

app = FastAPI()

# ── 2. Auto-instrument FastAPI ────────────────────────────────────────────
# Automatically creates spans for every route with HTTP attributes.
FastAPIInstrumentor.instrument_app(app)

# ── 3. Manual spans for custom operations ─────────────────────────────────
@app.get("/users/{uid}")
async def get_user(uid: int):
    with tracer.start_as_current_span("fetch_user_from_db") as span:
        span.set_attribute("user.id", uid)
        span.set_attribute("db.operation", "SELECT")

        user = {"id": uid, "name": "Alice"}

        if not user:
            span.set_status(trace.StatusCode.ERROR, "User not found")
            span.record_exception(ValueError(f"User {uid} not found"))

    return user

# ── 4. Propagate trace context in outgoing HTTP calls ─────────────────────
# HTTPXClientInstrumentor().instrument()  # auto-propagates W3C traceparent header
📖 Concept Breakdown
Span
A single timed operation (a DB query, an HTTP call, a function). Has a start time, end time, status, and attributes. Multiple spans form a trace tree.
Trace ID
A 128-bit ID shared across all services handling one request. Propagated in the traceparent HTTP header (W3C Trace Context standard).
FastAPIInstrumentor
Auto-instruments every route — no code changes needed. Creates a span per request with HTTP metadata. Call .instrument_app(app) once at startup.
BatchSpanProcessor
Buffers spans and sends in batches — non-blocking. SimpleSpanProcessor sends synchronously (only for debugging — blocks the event loop).
OTLP
OpenTelemetry Protocol — the wire format for sending telemetry. Works with Jaeger, Zipkin, Grafana Tempo, Datadog, AWS X-Ray via an OTEL collector.

★ Rarely Known

OpenTelemetry supports baggage — key-value pairs propagated alongside traces across service boundaries. Useful for propagating tenant ID, feature flags, or user context without modifying every function signature.

49

Health Check & Readiness Endpoints

Production

Kubernetes (and load balancers) use liveness and readiness probes to manage your containers. Liveness: “is the process alive?” — if this fails, K8s restarts the container. Readiness: “is the app ready to serve traffic?” — if this fails, K8s stops routing to the pod (but doesn’t restart it). A readiness probe should check that all dependencies (DB, Redis) are reachable, not just that the process is running. Return actual dependency health, not just HTTP 200.

from fastapi import FastAPI, Response
from pydantic import BaseModel
from typing import Literal
import asyncio

app = FastAPI()

# ── Liveness probe ────────────────────────────────────────────────────────
# Just proves the process is alive. K8s restarts pod if this fails.
# Keep it FAST — no DB calls.
@app.get("/health/live", include_in_schema=False)
async def liveness():
    return {"status": "alive"}

# ── Readiness probe ───────────────────────────────────────────────────────
class DependencyStatus(BaseModel):
    status: Literal["ok", "error"]
    latency_ms: float | None = None
    error: str | None = None

class ReadinessResponse(BaseModel):
    status: Literal["ready", "degraded", "not_ready"]
    dependencies: dict[str, DependencyStatus]

@app.get("/health/ready", response_model=ReadinessResponse, include_in_schema=False)
async def readiness(response: Response):
    checks: dict[str, DependencyStatus] = {}

    # Check database
    try:
        import time
        start = time.perf_counter()
        await asyncio.sleep(0.001)   # replace with: pool.fetchval("SELECT 1")
        checks["database"] = DependencyStatus(
            status="ok",
            latency_ms=round((time.perf_counter() - start) * 1000, 2),
        )
    except Exception as e:
        checks["database"] = DependencyStatus(status="error", error=str(e))

    # Check Redis
    try:
        start = time.perf_counter()
        # await redis.ping()
        checks["redis"] = DependencyStatus(
            status="ok",
            latency_ms=round((time.perf_counter() - start) * 1000, 2),
        )
    except Exception as e:
        checks["redis"] = DependencyStatus(status="error", error=str(e))

    all_ok = all(v.status == "ok" for v in checks.values())
    critical_ok = checks.get("database", DependencyStatus(status="error")).status == "ok"

    if all_ok:
        overall, response.status_code = "ready", 200
    elif critical_ok:
        overall, response.status_code = "degraded", 200   # still serve traffic
    else:
        overall, response.status_code = "not_ready", 503  # remove from load balancer

    return ReadinessResponse(status=overall, dependencies=checks)

# ── Startup probe (K8s 1.16+) ─────────────────────────────────────────────
@app.get("/health/startup", include_in_schema=False)
async def startup_probe():
    return {"initialized": True}
📖 Concept Breakdown
Liveness probe
Minimal check — just return 200. If deadlocked or OOM, the process won’t respond and K8s will restart. Don’t add DB calls here — it would cause restarts when DB is slow.
Readiness probe
Checks real dependencies. Returning 503 removes the pod from the load balancer — traffic goes to healthy pods. Does NOT restart the pod.
503 status code
The correct status for “not ready.” Load balancers interpret 5xx on the health path as “unhealthy” and stop routing to that instance.
include_in_schema=False
Hides health endpoints from the OpenAPI docs — they’re internal infrastructure, not part of your public API contract.
Startup probe
Prevents K8s from killing a slow-starting pod before it’s ready. Use when startup takes >10s (ML model loading, running migrations).

⚠ Gotcha

Health endpoints should be excluded from authentication middleware — Kubernetes probes don’t send auth tokens. Check that your JWT middleware has an allowlist for /health/* paths, or they’ll always return 401.

50

Horizontal Scaling & Stateless Design

Production

Horizontal scaling means running multiple identical copies of your API (pods/containers) behind a load balancer. This only works if each instance is stateless — it doesn’t store any request-specific data in local memory between requests. In-memory sessions, local file storage, or in-process caches all break horizontal scaling. Everything shared between requests must live in an external service: session state in Redis, files in S3, rate limit counters in Redis, cache in Redis.

# ── WRONG: stateful designs that break horizontal scaling ─────────────────

# ❌ In-memory session store — only visible on THIS pod
in_memory_sessions: dict[str, dict] = {}

@app.post("/login-bad")
async def login_bad(username: str):
    import uuid
    session_id = str(uuid.uuid4())
    in_memory_sessions[session_id] = {"username": username}  # lost on restart!
    return {"session_id": session_id}

# ❌ Local file storage — only on THIS pod's filesystem
@app.post("/upload-bad")
async def upload_bad(file: bytes):
    with open(f"/tmp/{uuid.uuid4()}.bin", "wb") as f:
        f.write(file)   # other pods can't access /tmp on this pod
    return {"saved": True}

# ── CORRECT: stateless designs ────────────────────────────────────────────

# ✅ JWT — self-contained token, no server-side state needed
from jose import jwt as jose_jwt

@app.post("/login")
async def login(username: str, secret: str = "secret-key"):
    token = jose_jwt.encode(
        {"sub": username, "exp": 9999999999},
        secret, algorithm="HS256"
    )
    return {"access_token": token}

# ✅ Redis-backed sessions (shared across pods)
# async def get_session(session_id: str, redis) -> dict | None:
#     data = await redis.get(f"session:{session_id}")
#     return json.loads(data) if data else None

# ✅ S3/object storage for files (shared across pods)
# async def upload_file(filename: str, data: bytes, s3_client) -> str:
#     await s3_client.put_object(Bucket="my-bucket", Key=filename, Body=data)
#     return f"s3://my-bucket/{filename}"

# ── Stateless checklist ───────────────────────────────────────────────────
# ✅ Auth: JWT (stateless) or Redis sessions
# ✅ Rate limits: Redis counters (not in-memory)
# ✅ Caches: Redis (not dict)
# ✅ Files: S3 / GCS / Azure Blob (not local disk)
# ✅ Background jobs: task queue (Celery/ARQ) — not asyncio tasks on one pod
# ✅ WebSocket connections: Redis Pub/Sub for cross-pod messaging (see topic 52)
# ✅ App config: env vars (not files baked into image with local state)
📖 Concept Breakdown
Stateless
Each request is fully self-contained — the server doesn’t need to remember anything about previous requests from the same client. Any pod can handle any request.
JWT (stateless auth)
The token itself carries identity and claims. Server only needs the secret key to verify — no DB lookup, no shared session store. Works with any number of pods.
Sticky sessions
Load balancer always routes the same user to the same pod. Avoids stateless design but breaks if the pod dies. Avoid — use Redis-backed shared state instead.
Shared external state
Redis, PostgreSQL, S3 — visible to all pods simultaneously. This is where state lives in a horizontally scaled system.

💬 Interview Tip

“How do you scale a FastAPI app?” — Run multiple instances behind a load balancer. The key prerequisite is stateless design: JWT for auth, Redis for sessions/caches/rate limits, S3 for files. If someone asks why, the answer is: any instance can handle any request without sharing in-process memory.

51

Background Job Queues

Production

Some work shouldn’t block the HTTP response: sending emails, resizing images, generating reports. FastAPI’s built-in BackgroundTasks runs work in the same process after the response is sent — simple but not durable (crashes lose queued work). For production, use a persistent job queue: ARQ (async, Redis-backed), Celery (battle-tested, multi-language), or TaskIQ (modern async). Jobs survive restarts, can be distributed across worker machines, and provide retry logic.

# ── Option 1: FastAPI BackgroundTasks (simple, in-process) ────────────────
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def send_email_sync(email: str, message: str):
    """Runs after response is sent — in the same process."""
    print(f"Sending email to {email}: {message}")

@app.post("/register-simple")
async def register_simple(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_email_sync, email, "Welcome!")
    return {"status": "registered"}

# ── Option 2: ARQ (async, Redis-backed, production-grade) ─────────────────
# pip install arq redis
import arq

async def send_welcome_email(ctx: dict, email: str, name: str):
    """ARQ task — ctx contains the Redis connection and settings."""
    print(f"Sending welcome email to {email}")
    return {"sent_to": email}

async def generate_report(ctx: dict, report_id: int):
    print(f"Generating report {report_id}")
    return {"report_id": report_id, "status": "complete"}

class WorkerSettings:
    functions = [send_welcome_email, generate_report]
    redis_settings = arq.connections.RedisSettings(host="localhost", port=6379)
    max_jobs = 10
    job_timeout = 300
    keep_result = 3600

@app.post("/register")
async def register(email: str, name: str):
    redis = await arq.create_pool(arq.connections.RedisSettings())
    await redis.enqueue_job("send_welcome_email", email, name)
    return {"status": "registered", "email_queued": True}

# Run worker: arq app.jobs.WorkerSettings

# ── Option 3: Celery (mature, multi-language workers) ────────────────────
# from celery import Celery
# celery_app = Celery("tasks", broker="redis://localhost:6379", backend="redis://localhost:6379")
#
# @celery_app.task(bind=True, max_retries=3)
# def send_email_task(self, email: str, message: str):
#     try:
#         pass  # ... send email ...
#     except Exception as exc:
#         raise self.retry(exc=exc, countdown=60)
📖 Concept Breakdown
BackgroundTasks
In-process, runs after response is sent. Not durable — lost if process crashes. Good for fire-and-forget tasks that are fast and non-critical.
ARQ
Async Redis Queue. Jobs are serialized to Redis — survive restarts. Worker processes run separately. Native asyncio — no threading needed.
Job durability
Redis-backed queues persist enqueued jobs. If the worker crashes mid-job, the job is marked as failed and can be retried. BackgroundTasks provides no such guarantee.
Separate worker process
Job workers run independently of the web server. Scale them independently: more web pods for traffic, more worker pods for heavy background processing.
max_retries
Retry failed jobs with exponential backoff — handles transient failures (email server timeout, momentary DB unavailability).

★ Key Info

Use BackgroundTasks only for truly non-critical, fast work. For anything you must complete (email on signup, payment confirmation, report generation), use a durable queue. Ask: “what happens if the server crashes 1ms after the response is sent?” — if the answer is “data is lost,” use a queue.

52

WebSocket Scaling with Redis Pub/Sub

Production

WebSocket connections are stateful — they’re held open to a specific server instance. With multiple instances, a message published to Pod A won’t reach clients connected to Pod B. The solution: use Redis Pub/Sub as a message bus. When any pod receives a message to broadcast, it publishes to a Redis channel. All pods subscribe to that channel and forward the message to their local WebSocket connections.

import asyncio, json
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import redis.asyncio as aioredis

app = FastAPI()

# ── Local connection manager (per-pod) ───────────────────────────────────
class LocalConnectionManager:
    def __init__(self):
        # room_id -> set of WebSocket connections ON THIS POD
        self.rooms: dict[str, set[WebSocket]] = {}

    async def connect(self, ws: WebSocket, room: str):
        await ws.accept()
        self.rooms.setdefault(room, set()).add(ws)

    def disconnect(self, ws: WebSocket, room: str):
        if room in self.rooms:
            self.rooms[room].discard(ws)

    async def broadcast_local(self, room: str, message: str):
        """Send to all WebSocket connections on THIS pod."""
        for ws in list(self.rooms.get(room, set())):
            try:
                await ws.send_text(message)
            except Exception:
                self.disconnect(ws, room)

manager = LocalConnectionManager()
redis_client = aioredis.from_url("redis://localhost:6379")

async def redis_listener():
    """Subscribe to Redis and forward messages to local connections."""
    pubsub = redis_client.pubsub()
    await pubsub.psubscribe("room:*")   # subscribe to all room channels

    async for message in pubsub.listen():
        if message["type"] == "pmessage":
            channel = message["channel"].decode()
            room_id = channel.split(":", 1)[1]
            data = message["data"].decode()
            await manager.broadcast_local(room_id, data)

@app.on_event("startup")
async def start_redis_listener():
    asyncio.create_task(redis_listener())

@app.websocket("/ws/{room_id}")
async def websocket_endpoint(ws: WebSocket, room_id: str):
    await manager.connect(ws, room_id)
    try:
        while True:
            data = await ws.receive_text()
            # Publish to Redis — ALL pods subscribed to this room receive it
            payload = json.dumps({"room": room_id, "message": data})
            await redis_client.publish(f"room:{room_id}", payload)
    except WebSocketDisconnect:
        manager.disconnect(ws, room_id)
📖 Concept Breakdown
Redis Pub/Sub
Publish a message to a channel; all subscribers receive it. Ephemeral — no persistence, no replay. Ideal for live event broadcasting where losing a message is acceptable.
psubscribe('room:*')
Pattern subscribe — matches all channels starting with “room:”. One Redis subscription handles all rooms on this pod.
asyncio.create_task()
Spawns the listener as a concurrent background coroutine. It runs alongside request handling — the event loop switches between them.
broadcast_local
Only sends to connections on THIS pod — the Redis message has already done the cross-pod fan-out. Don’t re-publish to Redis (infinite loop).

⚠ Gotcha

Redis Pub/Sub is not durable — if a pod is down when a message is published, it misses it. For guaranteed delivery, use Redis Streams (XADD/XREAD) instead, which allow catching up on missed messages after reconnect.

53

Database Connection Pooling

Production

Every database operation needs a connection. Opening a new connection takes 10–100ms and consumes server resources. A connection pool maintains a set of open connections, lending them to requests and returning them when done. With multiple Gunicorn workers, each worker has its own pool — the total connections to the database is pool_size × workers. PostgreSQL typically allows 100 connections by default; exceeding this causes “too many connections” errors.

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from typing import AsyncGenerator

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"

engine = create_async_engine(
    DATABASE_URL,
    # ── Pool size math ────────────────────────────────────────────────────
    # Total max per worker = pool_size + max_overflow
    # With 4 Gunicorn workers: total DB connections = 4 × (5 + 10) = 60
    pool_size=5,
    max_overflow=10,

    pool_timeout=30,       # seconds to wait for a free connection before error
    pool_recycle=1800,     # recycle connections after 30min (prevents stale state)
    pool_pre_ping=True,    # test connection health before lending from pool

    echo=False,            # set True to log all SQL (dev only — noisy in prod)
)

AsyncSessionLocal = async_sessionmaker(
    bind=engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autoflush=False,
    autocommit=False,
)

async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

# ── Pool sizing formula ────────────────────────────────────────────────────
# Max DB connections = pool_size × gunicorn_workers
# PostgreSQL default max_connections = 100
# With 4 workers:
#   pool_size=20 → 4 × 20 = 80 connections (safe)
#   pool_size=30 → 4 × 30 = 120 → ERROR: too many connections

def pool_status() -> dict:
    pool = engine.pool
    return {
        "size": pool.size(),
        "checked_in": pool.checkedin(),
        "checked_out": pool.checkedout(),
        "overflow": pool.overflow(),
    }
📖 Concept Breakdown
pool_size
Number of connections maintained persistently. When all are checked out, new requests wait up to pool_timeout seconds.
max_overflow
Burst capacity above pool_size. These connections are created on demand and closed when returned — not kept warm.
pool_pre_ping
Before lending a pooled connection, sends a lightweight ping (SELECT 1). Detects dead connections (DB restart, firewall timeout) before they cause request errors.
pool_recycle
Connections older than this (seconds) are closed and replaced. Prevents server-side timeout issues where the DB server closes idle connections unilaterally.
PgBouncer
A connection pooler that sits between app and PostgreSQL. Multiplexes many app connections into fewer real DB connections. Critical for large deployments.

⚠ Gotcha

pool_size × gunicorn_workers must be less than PostgreSQL’s max_connections (usually 100). With 10 workers and pool_size=15, you’d need 150 DB connections — causing connection errors under load. Always calculate this and set pool_size accordingly or use PgBouncer.

54

Async Task Patterns

Production

Python’s asyncio provides powerful primitives for concurrent async work. gather() runs coroutines concurrently and returns when all finish. wait_for() adds a timeout. Semaphore limits how many coroutines run at once (prevents overwhelming a downstream service). Python 3.11’s TaskGroup is the modern, safer alternative to gather() — it cancels all sibling tasks when one fails.

import asyncio
from fastapi import FastAPI

app = FastAPI()

# ── 1. asyncio.gather — run multiple tasks concurrently ───────────────────
@app.get("/dashboard")
async def dashboard(user_id: int):
    # Without gather: sequential, ~300ms total
    # With gather: concurrent, ~100ms total (slowest wins)
    user, orders, notifications = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_notifications(user_id),
    )
    return {"user": user, "orders": orders, "notifications": notifications}

# ── 2. gather with error handling ─────────────────────────────────────────
@app.get("/dashboard-safe")
async def dashboard_safe(user_id: int):
    # return_exceptions=True: failed tasks return the exception, not raise it
    results = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_notifications(user_id),
        return_exceptions=True,
    )
    return {
        "user":          results[0] if not isinstance(results[0], Exception) else None,
        "orders":        results[1] if not isinstance(results[1], Exception) else [],
        "notifications": results[2] if not isinstance(results[2], Exception) else [],
    }

# ── 3. asyncio.wait_for — timeout on slow operations ─────────────────────
@app.get("/slow-service")
async def call_slow_service():
    try:
        result = await asyncio.wait_for(slow_external_api(), timeout=2.0)
        return result
    except asyncio.TimeoutError:
        from fastapi import HTTPException
        raise HTTPException(504, "Upstream service timed out")

# ── 4. Semaphore — limit concurrency ──────────────────────────────────────
_db_semaphore = asyncio.Semaphore(10)

async def fetch_with_limit(item_id: int):
    async with _db_semaphore:   # blocks here if 10 are already running
        return await fetch_item(item_id)

@app.get("/bulk")
async def bulk_fetch(ids: list[int]):
    return await asyncio.gather(*[fetch_with_limit(i) for i in ids])

# ── 5. TaskGroup (Python 3.11+) — structured concurrency ────────────────
@app.get("/taskgroup/{user_id}")
async def with_task_group(user_id: int):
    async with asyncio.TaskGroup() as tg:
        user_task   = tg.create_task(fetch_user(user_id))
        orders_task = tg.create_task(fetch_orders(user_id))
        # If ANY task raises, all others are CANCELLED immediately
    return {"user": user_task.result(), "orders": orders_task.result()}

async def fetch_user(uid): await asyncio.sleep(0.1); return {"id": uid}
async def fetch_orders(uid): await asyncio.sleep(0.05); return []
async def fetch_notifications(uid): await asyncio.sleep(0.08); return []
async def slow_external_api(): await asyncio.sleep(5); return {}
async def fetch_item(item_id): await asyncio.sleep(0.01); return {"id": item_id}
📖 Concept Breakdown
asyncio.gather()
Runs all coroutines concurrently. Completes when all finish. If one fails (without return_exceptions), the exception propagates and others are cancelled.
return_exceptions=True
Failed tasks return their exception as a value instead of raising. Lets you inspect each result individually and handle partial failures gracefully.
wait_for(coro, timeout)
Wraps a coroutine with a deadline. Raises asyncio.TimeoutError if not done in time. Critical for calls to external services that may hang.
Semaphore
A counter-based lock. async with sem decrements; waits if count = 0. Limits concurrency to prevent overwhelming downstream resources.
TaskGroup (3.11+)
Structured concurrency — all tasks in the group share a lifetime. One failure cancels siblings and re-raises cleanly. Safer than bare gather().

💬 Interview Tip

“How would you call 3 APIs in parallel?” — asyncio.gather(). “What if one might fail?” — return_exceptions=True or TaskGroup. “What if they might be slow?” — wrap each in asyncio.wait_for(coro, timeout=X). This triad of answers shows production async awareness.

55

Kubernetes Deployment Basics

Production

Kubernetes (K8s) orchestrates containers at scale: it schedules pods on nodes, restarts crashed pods, scales based on CPU/memory, and does rolling deployments with zero downtime. A FastAPI app in K8s needs: a Deployment (manages replicas), a Service (stable internal DNS), an HPA (auto-scaling), and properly configured liveness/readiness probes and resource limits.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: fastapi-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: fastapi-app
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1        # allow 1 extra pod during rollout
      maxUnavailable: 0  # never have fewer than desired count
  template:
    metadata:
      labels:
        app: fastapi-app
    spec:
      containers:
      - name: fastapi
        image: myrepo/fastapi-app:v1.2.3   # always use specific tags, never "latest"
        ports:
        - containerPort: 8000
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health/live
            port: 8000
          initialDelaySeconds: 15
          periodSeconds: 10
          failureThreshold: 3
        readinessProbe:
          httpGet:
            path: /health/ready
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 5
          failureThreshold: 3
        env:
        - name: SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: fastapi-secrets
              key: secret-key
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: fastapi-secrets
              key: database-url
---
apiVersion: v1
kind: Service
metadata:
  name: fastapi-service
spec:
  selector:
    app: fastapi-app
  ports:
  - port: 80
    targetPort: 8000
  type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: fastapi-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: fastapi-app
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
📖 Concept Breakdown
RollingUpdate
Replaces pods one at a time. Old pods serve traffic while new ones start. With maxUnavailable: 0 and maxSurge: 1, always have full capacity during deploy.
resources.requests
What K8s guarantees the pod will have. Used for scheduling (only place pod on node with enough capacity). Container may use more up to limits.
resources.limits
Hard cap. CPU: throttled if exceeded. Memory: OOM killed if exceeded. Without limits, one pod can consume all node resources.
secretKeyRef
Reads env var from a K8s Secret (base64-encoded, optionally encrypted at rest). Safer than baking secrets into ConfigMaps or image.
HPA
Horizontal Pod Autoscaler. Watches CPU/memory metrics and adjusts replicas automatically between minReplicas and maxReplicas.

⚠ Gotcha

Never use image: myapp:latest in K8s — pods may pull different versions if the tag updates. Always use immutable tags (:v1.2.3 or a git SHA). :latest in production is a deployment reliability anti-pattern.

56

CI/CD Pipeline

Production

A CI/CD pipeline automates: run tests → build Docker image → push to registry → deploy. For FastAPI, CI needs a running PostgreSQL service to run integration tests (no mocking DB). GitHub Actions is the most common platform. The pipeline ensures every commit is tested before it can be merged, and every merge to main is deployed automatically.

name: CI/CD

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: postgres:16-alpine
        env:
          POSTGRES_USER: test
          POSTGRES_PASSWORD: test
          POSTGRES_DB: testdb
        ports:
          - 5432:5432
        options: >-
          --health-cmd pg_isready
          --health-interval 5s
          --health-timeout 5s
          --health-retries 5

      redis:
        image: redis:7-alpine
        ports:
          - 6379:6379

    steps:
    - uses: actions/checkout@v4

    - uses: actions/setup-python@v5
      with:
        python-version: "3.12"
        cache: pip

    - name: Install dependencies
      run: pip install -r requirements.txt -r requirements-dev.txt

    - name: Run Alembic migrations
      env:
        DATABASE_URL: postgresql+asyncpg://test:test@localhost:5432/testdb
      run: alembic upgrade head

    - name: Run tests with coverage
      env:
        DATABASE_URL: postgresql+asyncpg://test:test@localhost:5432/testdb
        REDIS_URL: redis://localhost:6379
        SECRET_KEY: test-secret-key-ci
      run: pytest --cov=app --cov-report=xml -v

    - name: Upload coverage
      uses: codecov/codecov-action@v4

  build-and-push:
    needs: test
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v4

    - name: Login to Container Registry
      uses: docker/login-action@v3
      with:
        registry: ghcr.io
        username: ${{ github.actor }}
        password: ${{ secrets.GITHUB_TOKEN }}

    - name: Build and push
      uses: docker/build-push-action@v5
      with:
        context: .
        push: true
        tags: |
          ghcr.io/${{ github.repository }}:${{ github.sha }}
          ghcr.io/${{ github.repository }}:latest
        cache-from: type=gha
        cache-to: type=gha,mode=max
📖 Concept Breakdown
services: postgres
GitHub Actions spins up a real PostgreSQL container for the job. Tests connect to localhost:5432 — same as running locally. No mocking needed.
needs: test
The build job only runs if test succeeds. This gates Docker builds — broken code never makes it into a deployable image.
github.sha tag
Tags the image with the git commit SHA — immutable, traceable. You can always tell exactly which code version is running in production.
cache: pip
Caches pip downloads between runs. Saves 1–2 minutes per run for large dependency sets.
cache-from: type=gha
Docker layer caching in GitHub Actions. Layers that haven’t changed are pulled from cache — faster builds when only application code changes.

💬 Interview Tip

“How do you test with a real database in CI?” — GitHub Actions services: block. Runs a sidecar container (postgres, redis) accessible on localhost. This catches schema migration issues and query bugs that mocks would miss.

57

API Versioning Strategies

Production

When you need to change an API in a breaking way (remove a field, change a type), existing clients break. API versioning lets you run old and new versions simultaneously. Three main approaches: URL path versioning (/v1/users, /v2/users) — most common, explicit, easy to route; header versioning (API-Version: 2) — cleaner URLs but harder to test in browsers; mount-based versioning (separate FastAPI apps mounted under a prefix) — most isolation, recommended for major version differences.

from fastapi import FastAPI, APIRouter, Header, HTTPException
from typing import Annotated

# ── Approach 1: URL path versioning ───────────────────────────────────────
app = FastAPI()

v1_router = APIRouter(prefix="/v1")
v2_router = APIRouter(prefix="/v2")

@v1_router.get("/users/{uid}")
async def get_user_v1(uid: int):
    return {"id": uid, "name": "Alice"}

@v2_router.get("/users/{uid}")
async def get_user_v2(uid: int):
    return {"id": uid, "name": "Alice", "created_at": "2024-01-15", "role": "user"}

app.include_router(v1_router)
app.include_router(v2_router)

# ── Approach 2: Header versioning ─────────────────────────────────────────
@app.get("/users/{uid}")
async def get_user_versioned(
    uid: int,
    api_version: Annotated[str, Header(alias="API-Version")] = "1",
):
    if api_version == "2":
        return {"id": uid, "name": "Alice", "created_at": "2024-01-15"}
    elif api_version == "1":
        return {"id": uid, "name": "Alice"}
    raise HTTPException(400, f"Unsupported API version: {api_version}")

# ── Approach 3: Mount separate apps (maximum isolation) ──────────────────
v1_app = FastAPI(title="API v1", docs_url="/docs")
v2_app = FastAPI(title="API v2", docs_url="/docs")

@v1_app.get("/users/{uid}")
async def v1_users(uid: int):
    return {"id": uid, "format": "v1"}

@v2_app.get("/users/{uid}")
async def v2_users(uid: int):
    return {"id": uid, "format": "v2", "extra": True}

main_app = FastAPI(docs_url=None)
main_app.mount("/v1", v1_app)
main_app.mount("/v2", v2_app)

# ── Version deprecation strategy ─────────────────────────────────────────
from starlette.middleware.base import BaseHTTPMiddleware

class DeprecationWarningMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        response = await call_next(request)
        if request.url.path.startswith("/v1/"):
            response.headers["Deprecation"] = "true"
            response.headers["Sunset"] = "Sat, 31 Dec 2025 23:59:59 GMT"
            response.headers["Link"] = '</v2/>; rel="successor-version"'
        return response
📖 Concept Breakdown
URL versioning
Easiest to discover, test, and cache. CDNs can cache /v1/* and /v2/* independently. The standard choice for public APIs.
Header versioning
Cleaner URLs. Preferred by some REST purists. Downside: can’t bookmark/share versioned URLs directly; harder to test in browser address bar.
app.mount()
Mounts a sub-application at a path prefix. Each app has its own middleware, exception handlers, and OpenAPI docs. Maximum isolation between versions.
Deprecation header
RFC 8594 standard header signaling that a version is deprecated. Clients that parse headers can warn developers. Includes Sunset date for removal.

★ Rarely Known

The Deprecation and Sunset response headers are IETF RFC standards (RFC 8594, RFC 8607). Well-behaved API clients and developer tooling can read these and warn developers that they’re using an API version scheduled for removal.

58

Performance Profiling

Production

Before optimizing, measure. Two major FastAPI performance problems: N+1 queries (fetching 100 users then making a separate DB query per user — 101 total queries instead of 1 join) and synchronous code in async routes (blocking the event loop). Tools: pyinstrument (call-tree profiler, minimal overhead), ORJSONResponse (5–10x faster JSON serialization), and SQLAlchemy query logging.

from fastapi import FastAPI, Request, Response
from fastapi.responses import ORJSONResponse
import pyinstrument

# ── Use ORJSONResponse globally (5-10x faster JSON serialization) ─────────
app = FastAPI(default_response_class=ORJSONResponse)

# ── 1. Profiling middleware (enable in dev/staging only) ───────────────────
class ProfilingMiddleware:
    def __init__(self, app, profile_path: str = "/_profile"):
        self.app = app
        self.profile_path = profile_path

    async def __call__(self, scope, receive, send):
        if scope.get("path") == self.profile_path or \
           scope.get("headers", {}).get(b"x-profile") == b"true":
            profiler = pyinstrument.Profiler()
            profiler.start()
            await self.app(scope, receive, send)
            profiler.stop()
            html = profiler.output_html()
            response = Response(content=html, media_type="text/html")
            await response(scope, receive, send)
        else:
            await self.app(scope, receive, send)

# app.add_middleware(ProfilingMiddleware)  # enable for profiling sessions

# ── 2. N+1 query problem and fix ──────────────────────────────────────────
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload

# ❌ N+1: 1 query for users + N queries for each user's items
async def get_users_bad(session: AsyncSession):
    users = (await session.execute(select(User))).scalars().all()
    for user in users:
        items = user.items   # triggers lazy load — 100 users = 101 queries!

# ✅ Eager loading: 1 query with JOIN (or 2 queries total)
async def get_users_good(session: AsyncSession):
    users = (await session.execute(
        select(User).options(
            selectinload(User.items)   # loads all users' items in ONE extra query
        )
    )).scalars().all()
    return users

# ── 3. Enable SQL query logging (find slow queries) ───────────────────────
import logging
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)

# ── 4. Route-level profiling ──────────────────────────────────────────────
@app.get("/profile-me")
async def profile_me():
    profiler = pyinstrument.Profiler(async_mode="enabled")
    with profiler:
        result = await some_expensive_operation()
    print(profiler.output_text(unicode=True, color=True))
    return result

async def some_expensive_operation():
    import asyncio; await asyncio.sleep(0.01); return {"done": True}
📖 Concept Breakdown
N+1 problem
Fetching a list of N items then making N more queries for related data. Fix: selectinload() or joinedload() to fetch everything in 1–2 queries.
selectinload
SQLAlchemy eager loading strategy: runs a second SELECT ... WHERE id IN (...) query for all related items at once. Avoids N+1 without a cartesian product JOIN.
ORJSONResponse
Uses orjson (Rust-based) instead of Python’s json. 5–10x faster serialization. Handles datetime, UUID, bytes natively without custom encoders.
pyinstrument
Statistical profiler — samples the call stack every N milliseconds. Low overhead (~1%). Shows a call tree with time per function. Async-aware (works with FastAPI).
default_response_class
FastAPI(default_response_class=ORJSONResponse) — applies globally. All routes use ORJSON without having to specify it per route.

💬 Interview Tip

“How would you investigate a slow FastAPI endpoint?” — (1) Check for N+1 queries by enabling SQLAlchemy logging. (2) Profile with pyinstrument to find CPU hotspots. (3) Check if async routes are calling blocking code. (4) Look at DB query explain plans. (5) Consider caching frequently-read data.

★ Key Info

N+1 is the #1 real-world FastAPI performance issue. It’s invisible until you look at query logs — the app “works” but makes 100 queries per page load instead of 1. Always check SQLAlchemy logs in development and add selectinload or joinedload wherever you access relationships in a loop.