FastAPI Guide
25

Async vs Sync Route Handlers

Advanced

FastAPI runs on a single-threaded async event loop (via Uvicorn). When you declare a route with async def, it runs directly on that loop — any blocking call inside it (a slow database query, time.sleep(), file I/O via the normal open()) will freeze all other requests until it finishes. Regular def routes are automatically offloaded to a thread pool so the event loop stays free, but they cannot use await. The rule: use async def when all I/O is async-native; use plain def when working with synchronous libraries (pandas, psycopg2, etc.).

import asyncio, time
from fastapi import FastAPI
from concurrent.futures import ThreadPoolExecutor

app = FastAPI()

# ── ASYNC route ─────────────────────────────────────────────────────────
# Runs on the event loop. Safe ONLY with async-native I/O (httpx, asyncpg,
# aiofiles). Never call time.sleep() or requests.get() here.
@app.get("/async-ok")
async def async_ok():
    await asyncio.sleep(1)   # yields control → other requests can run
    return {"source": "async"}

# ── SYNC route ───────────────────────────────────────────────────────────
# FastAPI automatically runs this in a ThreadPoolExecutor thread, so the
# event loop is NOT blocked. Use this with sync libraries.
@app.get("/sync-ok")
def sync_ok():
    time.sleep(1)            # blocking — but we're in a thread, so fine
    return {"source": "sync"}

# ── WRONG: blocking call inside async ────────────────────────────────────
# This looks async but blocks the ENTIRE server for 1 second.
@app.get("/async-bad")
async def async_bad():
    time.sleep(1)            # ← BLOCKS the event loop!
    return {"broken": True}

# ── CORRECT FIX: run_in_executor ─────────────────────────────────────────
# Offload a sync-only call to a thread while still using async def.
@app.get("/async-fixed")
async def async_fixed():
    loop = asyncio.get_event_loop()
    # run_in_executor returns a coroutine that resolves when the thread finishes
    result = await loop.run_in_executor(None, time.sleep, 1)
    return {"fixed": True}

# ── Custom thread pool for CPU-heavy work ────────────────────────────────
_cpu_pool = ThreadPoolExecutor(max_workers=4)

@app.get("/cpu-task")
async def cpu_task():
    loop = asyncio.get_event_loop()
    # Pin CPU work to a dedicated pool so it doesn't steal I/O threads
    await loop.run_in_executor(_cpu_pool, expensive_computation)
    return {"done": True}

def expensive_computation():
    # Simulate heavy CPU work (e.g., image resize, ML inference)
    total = sum(i * i for i in range(10_000_000))
    return total
📖 Concept Breakdown
async def route

Runs on the event loop. All I/O must be awaited — blocking calls freeze every other request.

def route

FastAPI wraps it in anyio.to_thread.run_sync() automatically. Blocking is safe; await is not available.

asyncio.get_event_loop()

Returns the running loop. Prefer asyncio.get_running_loop() in Python 3.10+ (raises if no loop, safer).

run_in_executor(None, fn)

None uses the default thread pool. Pass a custom Executor to isolate CPU-bound work.

ThreadPoolExecutor

Cap workers to available CPU cores for CPU tasks. For I/O tasks, higher counts are acceptable.

💬 Interview Tip

“When would you use def vs async def?” — Answer: async def only when every I/O call inside it is awaitable. Otherwise use def and let FastAPI handle threading. The common mistake is using async def with a synchronous ORM like SQLAlchemy’s sync engine.

⚠ Gotcha

FastAPI’s thread pool defaults to 40 workers. If you have 100 concurrent def routes and each blocks for 10s, the 41st request queues. Tune with anyio.from_thread.start_blocking_portal() or switch to async I/O.

26

Lifespan Events

Advanced

Applications need to do work on startup (open a DB connection pool, warm a cache, load an ML model) and on shutdown (flush queues, close connections). FastAPI’s modern approach uses a single @asynccontextmanager lifespan function — everything before the yield runs on startup, everything after runs on shutdown. This replaces the older @app.on_event("startup") decorator, which still works but is deprecated. The lifespan object is passed directly to FastAPI(lifespan=…).

from contextlib import asynccontextmanager
from fastapi import FastAPI
import httpx, asyncpg

# A dict attached to app.state — accessible from any request handler
# Think of it as the app's shared "context bag"
db_pool: asyncpg.Pool | None = None
http_client: httpx.AsyncClient | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    # ── STARTUP ──────────────────────────────────────────────────────────
    # Code here runs ONCE before the server begins accepting requests.
    global db_pool, http_client

    db_pool = await asyncpg.create_pool(
        dsn="postgresql://user:pass@localhost/mydb",
        min_size=5,
        max_size=20,
    )
    http_client = httpx.AsyncClient(timeout=10.0)

    # Store on app.state so route handlers can access via request.app.state
    app.state.db_pool = db_pool
    app.state.http_client = http_client

    print("✅ Startup complete")

    yield  # ← server is running and accepting requests

    # ── SHUTDOWN ─────────────────────────────────────────────────────────
    # Code here runs ONCE after the last request is served.
    await db_pool.close()
    await http_client.aclose()
    print("🛑 Shutdown complete")

# Pass the lifespan manager to FastAPI
app = FastAPI(lifespan=lifespan)

@app.get("/users/{user_id}")
async def get_user(user_id: int, request: Request):
    # Access the shared pool created during startup
    pool = request.app.state.db_pool
    row = await pool.fetchrow("SELECT * FROM users WHERE id=$1", user_id)
    return dict(row) if row else {"error": "not found"}

# ── OLD STYLE (deprecated, still works) ──────────────────────────────────
# @app.on_event("startup")
# async def startup():
#     app.state.db = await connect_db()
#
# @app.on_event("shutdown")
# async def shutdown():
#     await app.state.db.close()
📖 Concept Breakdown
@asynccontextmanager

Converts a generator function into a context manager. The yield splits it into enter (startup) and exit (shutdown) phases.

yield

The dividing line — everything before is startup, everything after is shutdown. The server is live between enter and exit.

app.state

A starlette.datastructures.State object — you can attach anything to it. Access it via request.app.state in handlers.

FastAPI(lifespan=…)

Registers the lifespan context manager. Replaces both on_event("startup") and on_event("shutdown").

asyncpg.create_pool

Creates a connection pool once at startup. Re-using pool connections per request is far faster than opening a new connection each time.

💬 Interview Tip

Interviewers often ask where to initialize expensive shared resources. Answer: in the lifespan function, stored on app.state. This ensures the resource is created once, reused across requests, and properly closed on shutdown — no global variable initialization order issues.

★ Important

If an exception occurs in the startup phase (before yield), the server will fail to start and the shutdown code will NOT run. Guard startup with try/except and clean up partial initialization manually.

27

Advanced Dependency Patterns

Advanced

Beyond simple Depends(get_db), FastAPI’s DI system supports three powerful patterns. Caching: by default, a dependency called multiple times within the same request is only executed once (same instance returned). Factory pattern: a dependency that itself accepts parameters, enabling runtime-configurable behavior. Overrides: app.dependency_overrides lets you swap any dependency for tests or feature flags without touching route code.

from fastapi import FastAPI, Depends, Query
from functools import lru_cache
from typing import Annotated

app = FastAPI()

# ── 1. DEPENDENCY CACHING (use_cache=True by default) ────────────────────
# This function is only called ONCE per request, even if two routes depend
# on it in the same call chain.
class DBConnection:
    def __init__(self):
        print("Opening DB connection")  # prints once per request, not twice

    def query(self, sql: str):
        return f"result of: {sql}"

def get_db() -> DBConnection:
    return DBConnection()

@app.get("/a")
async def route_a(
    db: Annotated[DBConnection, Depends(get_db)],
    # sub_dep also calls Depends(get_db) — same instance returned
):
    return {"data": db.query("SELECT 1")}

# To opt OUT of caching (get a fresh instance each time):
# Depends(get_db, use_cache=False)


# ── 2. FACTORY PATTERN (parameterized dependencies) ───────────────────────
# A function that returns a dependency function — lets you configure
# behavior at route definition time, not just at request time.
def require_role(role: str):
    """Factory: creates a dependency that checks for a specific role."""
    def check_role(token: str = Query(...)):
        # In real code: decode JWT and verify role claim
        if token != f"valid-{role}-token":
            from fastapi import HTTPException, status
            raise HTTPException(status.HTTP_403_FORBIDDEN, f"Requires {role} role")
        return token
    return check_role   # ← returns the actual dependency callable

@app.get("/admin")
async def admin_route(
    _: Annotated[str, Depends(require_role("admin"))]
):
    return {"access": "admin panel"}

@app.get("/editor")
async def editor_route(
    _: Annotated[str, Depends(require_role("editor"))]
):
    return {"access": "editor panel"}


# ── 3. DEPENDENCY OVERRIDES (testing / feature flags) ────────────────────
# Replace any dependency at runtime — most commonly used in tests.
def get_real_settings():
    return {"db_url": "postgresql://prod/db"}

@app.get("/settings")
async def show_settings(
    cfg: Annotated[dict, Depends(get_real_settings)]
):
    return cfg

# In tests:
# def get_fake_settings():
#     return {"db_url": "sqlite:///:memory:"}
#
# app.dependency_overrides[get_real_settings] = get_fake_settings
# # ... run tests ...
# app.dependency_overrides.clear()   # always clean up after tests


# ── 4. CLASS-BASED DEPENDENCIES ──────────────────────────────────────────
# Use a class with __call__ when the dependency needs to hold state
# (e.g., a query parser that normalizes common parameters).
class PaginationParams:
    def __init__(
        self,
        skip: int = Query(0, ge=0, description="Records to skip"),
        limit: int = Query(20, ge=1, le=100, description="Max records"),
    ):
        self.skip = skip
        self.limit = limit

@app.get("/items")
async def list_items(
    page: Annotated[PaginationParams, Depends(PaginationParams)]
):
    return {"skip": page.skip, "limit": page.limit}
📖 Concept Breakdown
use_cache=True

Default behavior: same dependency instance is reused within a single request. Set False to force a fresh call each time (e.g., for randomness).

Factory pattern

A function that returns a dependency function. Used when you need to pass arguments at route-definition time (like role names, permission strings).

dependency_overrides

A dict on the app object: {original_dep: replacement_dep}. FastAPI swaps them at call time. Key pattern for test isolation.

Class with __call__

FastAPI calls __init__ to inject Query/Path params, then passes the fully constructed object to the route. Clean way to bundle related params.

Annotated[T, Depends(…)]

The modern way to declare dependencies — keeps the function signature clean and the type hint accurate for IDEs and type checkers.

💬 Interview Tip

The factory pattern (a function returning a dependency) is a common interview follow-up. Example: “How would you implement role-based access without duplicating code?” — Answer: a require_role("admin") factory that returns a reusable Depends-able callable.

⚠ Gotcha

After setting dependency_overrides in tests, always call app.dependency_overrides.clear() in teardown. If you forget, overrides from one test bleed into the next.

28

Custom Middleware & Request Context

Advanced

Middleware sits between the ASGI server and your route handlers, intercepting every request and response. FastAPI offers two approaches: the simpler BaseHTTPMiddleware (subclass and override dispatch), or pure ASGI middleware for zero overhead. A related pattern is request-scoped context via contextvars.ContextVar — a Python standard library feature that stores a value per-coroutine (like thread-locals but for async code), enabling you to set a request ID in middleware and read it anywhere downstream without threading it through function arguments.

import uuid, time
from contextvars import ContextVar
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
from starlette.types import ASGIApp, Receive, Scope, Send

app = FastAPI()

# ── ContextVar: per-request storage ──────────────────────────────────────
# ContextVar is isolated per async task — safe to set in middleware and
# read anywhere in the same request, even in nested async calls.
request_id_var: ContextVar[str] = ContextVar("request_id", default="unknown")

# ── Approach 1: BaseHTTPMiddleware (simple, slight overhead) ──────────────
class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Generate a unique ID for this request
        req_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))

        # Store in ContextVar — visible to all code in this request's coroutine
        token = request_id_var.set(req_id)

        # Attach to request.state too, for route handlers that prefer it
        request.state.request_id = req_id

        start = time.perf_counter()
        try:
            response: Response = await call_next(request)
        finally:
            # Always reset ContextVar to avoid leaking into other requests
            request_id_var.reset(token)

        elapsed = (time.perf_counter() - start) * 1000
        response.headers["X-Request-ID"] = req_id
        response.headers["X-Response-Time"] = f"{elapsed:.1f}ms"
        return response

app.add_middleware(RequestIDMiddleware)

# ── Approach 2: Pure ASGI middleware (no overhead, more complex) ──────────
class PureASGIMiddleware:
    """Wraps the ASGI app directly — no Starlette abstractions."""
    def __init__(self, app: ASGIApp):
        self.app = app

    async def __call__(self, scope: Scope, receive: Receive, send: Send):
        if scope["type"] == "http":
            # scope is a plain dict — extract headers manually
            headers = dict(scope.get("headers", []))
            req_id = headers.get(b"x-request-id", b"").decode() or str(uuid.uuid4())
            scope["state"] = scope.get("state", {})
            scope["state"]["request_id"] = req_id

        await self.app(scope, receive, send)

# app.add_middleware(PureASGIMiddleware)  # uncomment to use

# ── Route that reads from ContextVar ─────────────────────────────────────
@app.get("/trace")
async def trace_example():
    # No need to pass request_id through parameters — ContextVar handles it
    current_id = request_id_var.get()
    return {"request_id": current_id, "message": "traceable request"}

# ── Middleware execution order note ──────────────────────────────────────
# Middleware is applied LIFO (last-added, first-executed).
# app.add_middleware(A)
# app.add_middleware(B)  ← B executes first on the way in, last on the way out
📖 Concept Breakdown
ContextVar

Stores a value scoped to the current async task (coroutine chain). Like thread-locals but for async code — each request gets its own isolated value.

ContextVar.set()

Returns a Token object. Pass this to .reset(token) to restore the previous value — important for cleanup in async contexts.

BaseHTTPMiddleware

Starlette’s convenience class. Override dispatch(request, call_next). Slightly slower than pure ASGI due to extra wrapping overhead.

call_next(request)

Passes control to the next middleware or the route handler. The response it returns can be modified before returning to the client.

scope["type"]

In pure ASGI, check this before processing — values are "http", "websocket", or "lifespan". Handle each appropriately.

LIFO order

Last middleware added is first to run on request and last on response. Think of it as wrapping layers — innermost layer is added first.

💬 Interview Tip

If asked “how do you propagate a request ID through async code without passing it everywhere?” — the answer is ContextVar. Set it in middleware, read it in logging, DB calls, or anywhere downstream. No parameter threading needed.

⚠ Gotcha

BaseHTTPMiddleware has a known issue with streaming responses in some Starlette versions — the response body may be buffered entirely. For streaming endpoints (video, large files), use pure ASGI middleware instead.

29

Streaming Responses

Advanced

Normal FastAPI responses serialize the entire body to a string/bytes, store it in memory, then send it. For large files, database exports, or AI-generated text, this wastes memory and makes the client wait for everything before it sees anything. StreamingResponse takes an async (or sync) generator — FastAPI sends each yielded chunk immediately as it’s produced. The client starts receiving data right away, and your server never holds the full payload in memory at once.

import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import aiofiles

app = FastAPI()

# ── 1. Stream a large file from disk ─────────────────────────────────────
@app.get("/download/{filename}")
async def download_file(filename: str):
    async def file_generator():
        # aiofiles reads asynchronously — doesn't block the event loop
        async with aiofiles.open(f"./files/{filename}", "rb") as f:
            while chunk := await f.read(64 * 1024):  # 64KB chunks
                yield chunk  # send each chunk immediately to the client

    return StreamingResponse(
        file_generator(),
        media_type="application/octet-stream",
        headers={"Content-Disposition": f"attachment; filename={filename}"},
    )

# ── 2. Stream a database query result row by row ─────────────────────────
@app.get("/export/users")
async def export_users():
    async def csv_generator():
        yield "id,name,email\n"  # header row

        # In real code: use an async cursor that fetches rows lazily
        fake_rows = [
            (1, "Alice", "alice@example.com"),
            (2, "Bob",   "bob@example.com"),
        ]
        for row in fake_rows:
            yield f"{row[0]},{row[1]},{row[2]}\n"
            await asyncio.sleep(0)  # yield control to event loop between rows

    return StreamingResponse(csv_generator(), media_type="text/csv")

# ── 3. Stream AI / LLM token output ──────────────────────────────────────
@app.get("/ai/stream")
async def stream_ai():
    async def token_generator():
        words = ["The", " quick", " brown", " fox", " jumps"]
        for word in words:
            yield word           # send each token as it arrives
            await asyncio.sleep(0.1)  # simulate LLM latency per token

    # text/plain streaming; use text/event-stream for SSE format (see topic 30)
    return StreamingResponse(token_generator(), media_type="text/plain")

# ── 4. Sync generator (FastAPI handles it in a thread) ───────────────────
@app.get("/sync-stream")
def sync_stream():
    def generate():
        for i in range(5):
            yield f"line {i}\n"
            time.sleep(0.5)  # blocking is OK here — FastAPI threads it

    return StreamingResponse(generate(), media_type="text/plain")
📖 Concept Breakdown
StreamingResponse

Accepts a generator (sync or async). Each yield sends a chunk. No full-body buffering in memory — constant memory regardless of file size.

async generator

A function with async def + yield. Can await between yields, keeping the event loop free while waiting for the next chunk.

aiofiles

Async file I/O library. Wraps Python’s open() to read in a thread pool, making it non-blocking from the event loop’s perspective.

walrus operator :=

while chunk := await f.read(n) — assign and test in one step. Loop ends when read() returns empty bytes (end of file).

media_type

Tells the browser how to handle the stream. application/octet-stream triggers download; text/event-stream enables SSE; text/plain for raw text.

★ Important

If an exception occurs mid-stream, the HTTP response header has already been sent (with status 200). The client won’t see an error status code — it’ll just see the stream end unexpectedly. Add error handling inside the generator and signal errors via your data format.

◆ Rarely Known

You can set Content-Length in the response headers if you know the file size upfront. This lets browsers show accurate download progress bars. With a generator you normally can’t know size in advance, so browsers show indeterminate progress.

30

Server-Sent Events (SSE)

Advanced

SSE is a simple protocol on top of HTTP where the server pushes data to the browser over a long-lived HTTP connection. The browser’s EventSource API handles reconnection automatically. Unlike WebSockets, SSE is one-directional (server → client only) and works over plain HTTP/1.1 with no protocol upgrade. It’s ideal for live dashboards, notification feeds, and AI token streaming. The format is just newline-delimited text: data: {json}\n\n.

import asyncio, json
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

# ── SSE event format ──────────────────────────────────────────────────────
# Each SSE message follows this text format:
#   event: <optional event name>
#   id: <optional event id>
#   data: <your data>
#   (blank line ends the message)
#
# The browser's EventSource reads these and fires addEventListener handlers.

def make_sse(data: dict, event: str | None = None, id: str | None = None) -> str:
    """Format a Python dict as an SSE-compliant text chunk."""
    lines = []
    if event:
        lines.append(f"event: {event}")
    if id:
        lines.append(f"id: {id}")
    lines.append(f"data: {json.dumps(data)}")
    lines.append("")  # blank line = end of message
    return "\n".join(lines) + "\n"

# ── SSE endpoint ──────────────────────────────────────────────────────────
@app.get("/events/live")
async def live_events(request: Request):
    async def event_stream():
        counter = 0
        while True:
            # Check if client disconnected — prevents zombie generators
            if await request.is_disconnected():
                break

            counter += 1
            # Send a named event every second
            yield make_sse(
                data={"counter": counter, "status": "ok"},
                event="update",
                id=str(counter),
            )
            await asyncio.sleep(1)

        # Inform clients the stream is intentionally ending
        yield make_sse({"done": True}, event="close")

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",  # ← triggers EventSource in browser
        headers={
            "Cache-Control": "no-cache",       # prevent proxies from caching
            "X-Accel-Buffering": "no",         # disable nginx buffering
            "Connection": "keep-alive",
        },
    )

# ── SSE for AI token streaming ─────────────────────────────────────────
@app.get("/ai/tokens")
async def stream_tokens():
    async def token_stream():
        tokens = ["Hello", ", ", "world", "! ", "How", " are", " you?"]
        for token in tokens:
            yield make_sse({"token": token, "done": False})
            await asyncio.sleep(0.05)
        yield make_sse({"token": "", "done": True})

    return StreamingResponse(token_stream(), media_type="text/event-stream",
                             headers={"Cache-Control": "no-cache"})

# ── Client-side JavaScript (for reference) ────────────────────────────────
# const source = new EventSource("/events/live");
# source.addEventListener("update", (e) => {
#     console.log(JSON.parse(e.data));
# });
# source.addEventListener("close", () => source.close());
📖 Concept Breakdown
text/event-stream

The MIME type that activates the browser’s EventSource protocol. Without it, the browser treats the response as a regular stream download.

SSE message format

The minimal SSE format is data: ...\n\n (double newline ends a message). Multi-line data is allowed: multiple data: lines are concatenated by the browser.

event: name

Optional named event. Client uses addEventListener("name", handler). Default unnamed events fire the onmessage handler.

id: value

Last event ID. Browser sends Last-Event-ID header on reconnect — lets server resume from where it left off after network interruption.

request.is_disconnected()

Async check that returns True when the client closes the connection. Essential to break infinite generators and avoid resource leaks.

X-Accel-Buffering: no

Nginx header that disables its response buffering. Without it, nginx queues chunks and the client sees delayed bursts instead of real-time events.

💬 Interview Tip

“SSE vs WebSocket?” — SSE is HTTP-based (works through proxies, firewalls, CDNs), one-directional, auto-reconnecting, and simpler to implement. WebSocket is bidirectional, requires protocol upgrade, and is better for interactive real-time apps (chat, games). SSE is the right choice for dashboards, notifications, and AI streaming.

⚠ Gotcha

HTTP/1.1 allows only 6 connections per origin in browsers. If you open many SSE connections to the same server, browsers queue them. Use HTTP/2 to eliminate this limit — it multiplexes streams over one connection.

31

OpenAPI Customization

Advanced

FastAPI auto-generates an OpenAPI 3.1 schema from your routes and Pydantic models. You can customize nearly every aspect: the schema document itself (title, version, contact info), per-route documentation (summaries, tags, examples), the Swagger UI and ReDoc appearance, and even generate a completely custom schema. This is important for external APIs where documentation quality matters, and for enterprise environments requiring specific API contract formats.

from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field

# ── 1. Rich metadata on app creation ─────────────────────────────────────
app = FastAPI(
    title="My API",
    version="2.1.0",
    description="""
## My API

A well-documented API example.

### Features
- Full CRUD for users
- JWT authentication
    """,
    contact={"name": "Support Team", "email": "support@example.com"},
    license_info={"name": "MIT", "url": "https://opensource.org/licenses/MIT"},
    # Disable auto-generated docs so we can customize them
    docs_url=None,
    redoc_url=None,
)

# ── 2. Rich model documentation ───────────────────────────────────────────
class UserCreate(BaseModel):
    name: str = Field(..., description="Full name", example="Alice Smith")
    email: str = Field(..., description="Email address", example="alice@example.com")
    age: int = Field(..., ge=0, le=150, description="Age in years", example=30)

    model_config = {
        "json_schema_extra": {
            "examples": [
                {"name": "Alice Smith", "email": "alice@example.com", "age": 30}
            ]
        }
    }

# ── 3. Per-route metadata ─────────────────────────────────────────────────
@app.post(
    "/users",
    summary="Create a new user",        # short title in Swagger
    description="Creates a user and sends a welcome email.",  # longer text
    response_description="The created user with ID",
    tags=["Users"],                      # groups routes in Swagger UI
    operation_id="create_user",          # custom operationId for SDK generation
    responses={
        409: {"description": "User with this email already exists"},
        422: {"description": "Validation error"},
    },
)
async def create_user(user: UserCreate):
    return {"id": 1, **user.model_dump()}

# ── 4. Custom Swagger UI with self-hosted assets ──────────────────────────
# Serve swagger-ui assets locally (needed in air-gapped environments)
# app.mount("/static", StaticFiles(directory="static"), name="static")

@app.get("/docs", include_in_schema=False)  # exclude from the schema itself
async def custom_swagger_ui():
    return get_swagger_ui_html(
        openapi_url="/openapi.json",
        title="My API — Docs",
        swagger_js_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5/swagger-ui-bundle.js",
        swagger_css_url="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5/swagger-ui.css",
        swagger_favicon_url="/static/favicon.ico",
        oauth2_redirect_url="/docs/oauth2-redirect",
        swagger_ui_parameters={"persistAuthorization": True},  # remember auth token
    )

# ── 5. Fully custom OpenAPI schema ────────────────────────────────────────
def custom_openapi():
    if app.openapi_schema:
        return app.openapi_schema   # cached after first call

    schema = get_openapi(
        title=app.title,
        version=app.version,
        description=app.description,
        routes=app.routes,
    )

    # Inject a global security scheme
    schema["components"]["securitySchemes"] = {
        "BearerAuth": {
            "type": "http",
            "scheme": "bearer",
            "bearerFormat": "JWT",
        }
    }
    # Apply it to every operation by default
    for path in schema["paths"].values():
        for operation in path.values():
            operation.setdefault("security", [{"BearerAuth": []}])

    app.openapi_schema = schema
    return schema

app.openapi = custom_openapi  # replace the default schema generator
📖 Concept Breakdown
docs_url=None

Disables the auto-generated /docs endpoint so you can mount a custom one. Same for redoc_url=None.

include_in_schema=False

Hides a route from the OpenAPI schema entirely (useful for internal endpoints, health checks, or custom docs routes).

operation_id

Custom ID for the route. SDK generators (openapi-generator, orval) use this as the method name. Stable IDs prevent breaking SDK clients on route rename.

app.openapi = fn

Replaces the schema generation function entirely. Called lazily on first /openapi.json request. Cache the result on app.openapi_schema for performance.

json_schema_extra

Pydantic v2 way to add raw JSON Schema fields (like examples) that Pydantic doesn’t generate automatically.

◆ Rarely Known

You can tag individual routes with tags=["Users"] and also define tag metadata (description, external docs link) at the app level: FastAPI(openapi_tags=[{"name": "Users", "description": "User management endpoints"}]). This enriches Swagger UI with grouped descriptions.

32

Security Scopes & RBAC

Advanced

OAuth2 scopes let you express what a token is allowed to do, not just who it belongs to. FastAPI’s Security() function (a superset of Depends()) and SecurityScopes let you declare required scopes per route. The dependency receives the requested scopes at runtime and can verify them against what the token actually grants. This enables fine-grained Role-Based Access Control (RBAC) where different endpoints require different permission levels.

from fastapi import FastAPI, Depends, Security, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from pydantic import BaseModel
from typing import Annotated

app = FastAPI()

# OAuth2 with explicit scopes listed — these appear in Swagger UI
oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="token",
    scopes={
        "users:read":  "Read user information",
        "users:write": "Create and update users",
        "admin":       "Full administrative access",
    }
)

class TokenData(BaseModel):
    username: str
    scopes: list[str] = []

# ── Core scope-checking dependency ────────────────────────────────────────
# SecurityScopes is injected by FastAPI — contains the scopes declared
# by the route that invoked this dependency (via Security()).
async def get_current_user(
    security_scopes: SecurityScopes,      # FastAPI injects this automatically
    token: Annotated[str, Depends(oauth2_scheme)],
) -> TokenData:
    # Build the WWW-Authenticate header describing what scopes are needed
    auth_value = f'Bearer scope="{security_scopes.scope_str}"'

    # In production: decode JWT here
    # For demo, simulate a token with limited scopes
    fake_token_data = TokenData(
        username="alice",
        scopes=["users:read"],   # this user only has read access
    )

    # Check each required scope against what the token actually grants
    for required_scope in security_scopes.scopes:
        if required_scope not in fake_token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Insufficient permissions. Required: {required_scope}",
                headers={"WWW-Authenticate": auth_value},
            )

    return fake_token_data

# ── Type alias for convenience ────────────────────────────────────────────
CurrentUser = Annotated[TokenData, Depends(get_current_user)]

# ── Routes with different scope requirements ──────────────────────────────
@app.get("/users/me")
async def read_my_profile(
    # Security() is like Depends() but also passes scopes to the dependency
    current_user: Annotated[TokenData, Security(get_current_user, scopes=["users:read"])]
):
    return {"user": current_user.username, "scopes": current_user.scopes}

@app.post("/users")
async def create_user(
    current_user: Annotated[TokenData, Security(get_current_user, scopes=["users:write"])]
):
    return {"created_by": current_user.username}

@app.delete("/users/{uid}")
async def delete_user(
    uid: int,
    # Requires BOTH write access AND admin — user must have all listed scopes
    current_user: Annotated[TokenData, Security(get_current_user, scopes=["users:write", "admin"])]
):
    return {"deleted": uid, "by": current_user.username}
📖 Concept Breakdown
Security(dep, scopes=[…])

Like Depends() but additionally injects SecurityScopes into the dependency with the listed scopes. Used for per-route permission declarations.

SecurityScopes

Injected by FastAPI into a dependency. Contains .scopes (list) and .scope_str (space-separated string) of what the calling route required.

scope_str

Used in the WWW-Authenticate header to tell the client exactly what permissions it needs to retry with. Important for OAuth2 compliance.

OAuth2PasswordBearer(scopes=…)

Declares available scopes in the schema — Swagger UI shows them in the “Authorize” dialog so developers can request specific permissions.

403 vs 401

Return 401 Unauthorized when no credentials are provided. Return 403 Forbidden when credentials are valid but lack the required permission.

💬 Interview Tip

“How do you implement RBAC in FastAPI?” — Answer: OAuth2 scopes via Security() + SecurityScopes for fine-grained control, or simpler role-checking in a dependency by reading the role claim from the JWT. Scopes are better for resource-level permissions; roles are better for user-level categories.

⚠ Gotcha

Scopes in the JWT must match exactly — “admin” ≠ “Admin”. Normalize scope strings to lowercase everywhere, and document the canonical scope names. Case mismatches are a common production bug.

33

Rate Limiting

Advanced

Rate limiting prevents abuse by capping how many requests a client can make in a given time window. The most common FastAPI approach uses slowapi, a port of Flask-Limiter that integrates with Starlette. For production, rate limit state should live in Redis so it’s shared across multiple server instances — an in-memory counter resets on each pod restart and can’t coordinate across horizontal replicas.

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware

# ── Setup ─────────────────────────────────────────────────────────────────
# get_remote_address extracts the client IP from the request.
# For Redis-backed limiting: Limiter(key_func=…, storage_uri="redis://localhost")
limiter = Limiter(
    key_func=get_remote_address,  # key = client IP
    default_limits=["200/day", "50/hour"],  # global defaults
)

app = FastAPI()
app.state.limiter = limiter  # slowapi reads limiter from app.state

# Register the 429 handler — SlowAPI raises RateLimitExceeded on violation
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)

# ── Per-route limits ──────────────────────────────────────────────────────
@app.get("/public")
@limiter.limit("10/minute")   # 10 req/min per IP for this route
async def public_endpoint(request: Request):   # request MUST be in the signature
    return {"data": "public info"}

@app.get("/expensive")
@limiter.limit("2/minute")    # very strict — expensive computation
async def expensive_endpoint(request: Request):
    return {"result": "heavy computation done"}

# ── Per-user limiting (authenticated routes) ──────────────────────────────
def get_user_id(request: Request) -> str:
    """Extract user ID from JWT for per-user rate limiting."""
    auth = request.headers.get("Authorization", "")
    if auth.startswith("Bearer "):
        # In production: decode JWT and return user ID
        return auth.split(" ")[1][:16]   # first 16 chars of token as demo key
    return get_remote_address(request)   # fallback to IP

user_limiter = Limiter(key_func=get_user_id)

@app.get("/user-scoped")
@user_limiter.limit("100/hour")
async def user_scoped(request: Request):
    return {"limited": "per user id"}

# ── Manual rate limit check (without decorator) ───────────────────────────
from slowapi.wrappers import Limit

@app.get("/dynamic")
async def dynamic_limit(request: Request, is_premium: bool = False):
    limit_str = "1000/hour" if is_premium else "10/hour"
    # Programmatic limit check
    limiter._check_request_limit(request, endpoint_func=dynamic_limit,
                                  in_middleware=False)
    return {"premium": is_premium}
📖 Concept Breakdown
Limiter(key_func=…)

Determines what to count requests against. Common: client IP, user ID from JWT, API key. Each unique key gets its own independent counter.

"10/minute"

Limit string format: count/period. Periods: second, minute, hour, day. Multiple limits: ["10/min", "100/hour"].

RateLimitExceeded

Exception raised by slowapi when a limit is hit. The registered handler returns HTTP 429 with a Retry-After header.

request in signature

Required — slowapi needs access to the Request object to extract the key and check limits. Forgetting this causes a runtime error.

storage_uri="redis://…"

Stores counters in Redis. Required for multi-instance deployments — otherwise each server pod tracks limits independently.

💬 Interview Tip

Common question: “How do you rate limit across multiple servers?” — Redis-backed distributed counters. Each server atomically increments a Redis key with a TTL. Redis INCR + EXPIRE is atomic and fast. Mention that in-memory limiters won’t work in Kubernetes with multiple pods.

★ Important

Always put @limiter.limit() after @app.get() (closer to the function). Python decorators apply bottom-up — the limiter decorator must wrap the actual function, not FastAPI’s route wrapper.

34

Caching Strategies

Advanced

Caching stores the result of an expensive operation so subsequent calls return it instantly. In FastAPI apps there are three levels: in-process cache (Python dicts, cachetools.TTLCache — fast, but lost on restart and not shared across instances), Redis cache (shared across all instances, survives restarts, supports TTL), and HTTP cache headers (Cache-Control, ETag — lets CDNs and browsers cache responses, reducing server load entirely). Choose based on whether data changes, how stale is acceptable, and whether you run multiple instances.

import asyncio, hashlib, json
from functools import lru_cache
from fastapi import FastAPI, Request, Response
from cachetools import TTLCache
from typing import Any

app = FastAPI()

# ── 1. In-process TTL cache ───────────────────────────────────────────────
# TTLCache: LRU with automatic expiry. maxsize=1000 items, ttl=300 seconds.
_cache: TTLCache = TTLCache(maxsize=1000, ttl=300)
_cache_lock = asyncio.Lock()  # protect concurrent writes to the cache

async def get_expensive_data(key: str) -> dict:
    """Fetch data with in-process cache. Lost on restart."""
    async with _cache_lock:
        if key in _cache:
            return _cache[key]

    # Cache miss — fetch from DB / external service
    result = {"key": key, "data": "expensive result"}
    await asyncio.sleep(0.5)   # simulate slow DB

    async with _cache_lock:
        _cache[key] = result   # store for next 300 seconds
    return result

@app.get("/data/{key}")
async def get_data(key: str):
    return await get_expensive_data(key)

# ── 2. Redis cache ─────────────────────────────────────────────────────────
# Requires: pip install redis[asyncio]
# import redis.asyncio as aioredis
#
# redis_client = aioredis.from_url("redis://localhost")
#
# async def get_or_cache(key: str, ttl: int = 300) -> dict:
#     cached = await redis_client.get(key)
#     if cached:
#         return json.loads(cached)                  # cache hit
#     result = await fetch_from_db(key)              # cache miss
#     await redis_client.set(key, json.dumps(result), ex=ttl)
#     return result

# ── 3. HTTP Cache-Control headers ─────────────────────────────────────────
# These instruct browsers and CDNs to cache the response.
@app.get("/static-config")
async def static_config(response: Response):
    # Cache for 1 hour publicly (CDN + browser)
    response.headers["Cache-Control"] = "public, max-age=3600"
    return {"config": "rarely changes"}

# ── 4. ETag-based conditional caching ────────────────────────────────────
# ETag = fingerprint of the response. Browser sends it back; server
# returns 304 Not Modified if data hasn't changed — no body transferred.
@app.get("/users/{uid}")
async def get_user_etag(uid: int, request: Request, response: Response):
    user_data = {"id": uid, "name": "Alice", "version": 5}

    # Generate ETag from the content
    content_bytes = json.dumps(user_data, sort_keys=True).encode()
    etag = f'"{hashlib.md5(content_bytes).hexdigest()}"'

    # Check if client already has this version
    if request.headers.get("If-None-Match") == etag:
        return Response(status_code=304)   # Not Modified — no body needed

    response.headers["ETag"] = etag
    response.headers["Cache-Control"] = "private, max-age=60"
    return user_data

# ── 5. Cache invalidation ─────────────────────────────────────────────────
async def invalidate_user_cache(uid: int):
    """Remove a user from cache when their data changes."""
    key = f"user:{uid}"
    async with _cache_lock:
        _cache.pop(key, None)   # remove if present, no error if missing
📖 Concept Breakdown
TTLCache

Cachetools LRU cache with time-based expiry. Items auto-expire after ttl seconds. When maxsize is reached, the least-recently-used item is evicted.

asyncio.Lock

Prevents cache stampede — without a lock, 100 concurrent requests for the same missing key would all simultaneously call the slow DB.

Cache-Control: public

CDNs (Cloudflare, CloudFront) can cache this response. Use private for user-specific data — only the browser should cache it.

ETag

A content fingerprint. Browser caches the response + ETag. On next request, sends If-None-Match: "etag". Server returns 304 if unchanged — saves bandwidth.

Cache invalidation

The hard part of caching. When data changes, must proactively remove or update cached entries. Event-driven or write-through strategies help keep cache consistent.

⚠ Gotcha

Cache stampede (thundering herd): when a popular cache key expires, all concurrent requests miss and hit the DB simultaneously. Solutions: background cache refresh before expiry, probabilistic early expiration, or a “single-flight” lock pattern (only one request fetches; others wait for it).

◆ Rarely Known

stale-while-revalidate in Cache-Control: Cache-Control: max-age=60, stale-while-revalidate=30 — CDN serves stale content for 30s while it fetches a fresh copy in the background. Eliminates cache expiry latency spikes.

35

Pagination Patterns

Advanced

Returning millions of database rows in one response isn’t feasible. Pagination splits results into pages. Offset/limit pagination is simple (OFFSET 100 LIMIT 20) but becomes slow on large datasets (the database must scan and discard the first 100 rows). Cursor-based pagination uses a pointer to the last seen item (WHERE id > cursor) — it’s O(1) regardless of page depth and is safe for real-time feeds where rows can be inserted between pages. Most APIs offer offset for convenience, cursor for performance.

from fastapi import FastAPI, Query
from pydantic import BaseModel
from typing import Generic, TypeVar, Annotated
import base64, json

app = FastAPI()

T = TypeVar("T")

# ── Generic Page response model ───────────────────────────────────────────
# Generic[T] lets this work for any data type: Page[User], Page[Product], etc.
class Page(BaseModel, Generic[T]):
    items: list[T]
    total: int
    skip: int
    limit: int
    has_next: bool
    has_prev: bool

class User(BaseModel):
    id: int
    name: str

# Simulate a database
FAKE_USERS = [User(id=i, name=f"User {i}") for i in range(1, 101)]

# ── 1. Offset / Limit pagination ──────────────────────────────────────────
@app.get("/users", response_model=Page[User])
async def list_users(
    skip: Annotated[int, Query(ge=0, description="Records to skip")] = 0,
    limit: Annotated[int, Query(ge=1, le=100, description="Max items")] = 20,
) -> Page[User]:
    total = len(FAKE_USERS)
    items = FAKE_USERS[skip : skip + limit]

    return Page(
        items=items,
        total=total,
        skip=skip,
        limit=limit,
        has_next=(skip + limit) < total,
        has_prev=skip > 0,
    )

# ── 2. Cursor-based pagination ─────────────────────────────────────────────
class CursorPage(BaseModel, Generic[T]):
    items: list[T]
    next_cursor: str | None   # None means no more pages
    prev_cursor: str | None

def encode_cursor(data: dict) -> str:
    """Encode cursor as URL-safe base64 JSON."""
    return base64.urlsafe_b64encode(json.dumps(data).encode()).decode()

def decode_cursor(cursor: str) -> dict:
    """Decode cursor back to dict."""
    return json.loads(base64.urlsafe_b64decode(cursor).encode()
                       if False else base64.urlsafe_b64decode(cursor))

@app.get("/users/cursor", response_model=CursorPage[User])
async def list_users_cursor(
    cursor: Annotated[str | None, Query(description="Pagination cursor")] = None,
    limit: Annotated[int, Query(ge=1, le=100)] = 20,
) -> CursorPage[User]:
    # Decode cursor to get the last seen ID
    after_id = 0
    if cursor:
        cursor_data = decode_cursor(cursor)
        after_id = cursor_data.get("id", 0)

    # Filter users after the cursor ID — O(1) with a DB index on ID
    remaining = [u for u in FAKE_USERS if u.id > after_id]
    items = remaining[:limit]

    # Build next cursor from the last item's ID
    next_cursor = None
    if len(remaining) > limit:
        next_cursor = encode_cursor({"id": items[-1].id})

    prev_cursor = None
    if after_id > 0:
        prev_cursor = encode_cursor({"id": max(after_id - limit, 0)})

    return CursorPage(items=items, next_cursor=next_cursor, prev_cursor=prev_cursor)

# ── 3. Page number style (convenience wrapper) ────────────────────────────
@app.get("/products")
async def list_products(
    page: Annotated[int, Query(ge=1, description="Page number (1-based)")] = 1,
    per_page: Annotated[int, Query(ge=1, le=100)] = 20,
):
    skip = (page - 1) * per_page   # convert 1-based page to 0-based offset
    items = FAKE_USERS[skip : skip + per_page]
    return {
        "page": page,
        "per_page": per_page,
        "total_pages": -(-len(FAKE_USERS) // per_page),  # ceiling division
        "items": items,
    }
📖 Concept Breakdown
Generic[T] in Pydantic

class Page(BaseModel, Generic[T]) — lets one model work for any item type. FastAPI generates correct OpenAPI schemas for Page[User], Page[Product], etc.

Offset pagination

Simple: OFFSET skip LIMIT limit. Problem: slow on large tables (database scans rows 1–skip just to discard them). Fine for small-to-medium datasets.

Cursor pagination

WHERE id > cursor_id LIMIT n. Uses an index seek — same speed whether page 1 or page 1000. Correct behavior when new rows are inserted between requests.

Opaque cursor

Base64-encode the cursor so clients treat it as a black box, not a manipulable integer. This lets you change the internal cursor format without breaking clients.

Ceiling division -(-n//d)

A Python trick: -(-100 // 3) = 34, the correct ceiling. Equivalent to math.ceil(n/d) but avoids a float conversion.

💬 Interview Tip

“What’s the problem with offset pagination at page 5000?” — The DB performs OFFSET 100000, which forces a full scan of 100,000 rows just to discard them, even with indexes. Cursor-based pagination (keyset pagination) avoids this with an indexed WHERE clause. Always mention this trade-off when designing paginated APIs.

⚠ Gotcha

With offset pagination and a live feed, if a new item is inserted while a user pages through results, they may see a duplicate on the next page (the item shifts others forward). Cursor pagination is immune to this because it’s anchored to a stable ID.