FastAPI Guide
36

Project Structure

Architecture
As FastAPI apps grow beyond a handful of routes, how you organize files determines how easy it is to navigate, test, and extend the codebase. Two main philosophies: flat/functional — group by technical role (all models in one file, all routers in one file) — and domain-driven — group by feature (each feature folder contains its own models, router, service, and tests). Flat is fine for small APIs; domain-driven scales better as teams and features grow because each feature is self-contained.
Project Layout — domain-driven
# ── Domain-Driven Structure (recommended for medium-large apps) ───────────
#
# app/
# ├── main.py               ← FastAPI() instance, lifespan, router registration
# ├── core/
# │   ├── config.py         ← Pydantic Settings, env vars
# │   ├── database.py       ← engine, session factory, Base
# │   └── security.py       ← JWT helpers, password hashing
# ├── users/
# │   ├── __init__.py
# │   ├── router.py         ← APIRouter with /users prefix
# │   ├── service.py        ← business logic
# │   ├── repository.py     ← database queries
# │   ├── models.py         ← SQLAlchemy ORM models
# │   └── schemas.py        ← Pydantic request/response schemas
# ├── items/
# │   ├── router.py
# │   ├── service.py
# │   ├── repository.py
# │   ├── models.py
# │   └── schemas.py
# └── tests/
#     ├── conftest.py       ← shared fixtures, test DB setup
#     ├── test_users.py
#     └── test_items.py

# ── main.py ───────────────────────────────────────────────────────────────
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.users.router import router as users_router
from app.items.router import router as items_router
from app.core.database import init_db

@asynccontextmanager
async def lifespan(app: FastAPI):
    await init_db()
    yield

app = FastAPI(lifespan=lifespan)

# Register all routers — each owns its prefix and tags
app.include_router(users_router, prefix="/users", tags=["Users"])
app.include_router(items_router, prefix="/items", tags=["Items"])

# ── users/router.py ───────────────────────────────────────────────────────
from fastapi import APIRouter, Depends
from app.users.service import UserService
from app.users.schemas import UserCreate, UserOut
from app.core.database import get_session
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated

router = APIRouter()

# The router doesn't know about DB details — it delegates to the service
@router.post("/", response_model=UserOut, status_code=201)
async def create_user(
    payload: UserCreate,
    session: Annotated[AsyncSession, Depends(get_session)],
):
    svc = UserService(session)
    return await svc.create(payload)

# ── users/service.py ──────────────────────────────────────────────────────
# Service layer holds business logic — no HTTP, no SQL
from app.users.repository import UserRepository
from app.users.schemas import UserCreate, UserOut
from app.users.models import User
from sqlalchemy.ext.asyncio import AsyncSession

class UserService:
    def __init__(self, session: AsyncSession):
        self.repo = UserRepository(session)

    async def create(self, data: UserCreate) -> UserOut:
        # Business logic: check for duplicates, hash password, etc.
        existing = await self.repo.get_by_email(data.email)
        if existing:
            from fastapi import HTTPException
            raise HTTPException(409, "Email already registered")
        user = await self.repo.create(data)
        return UserOut.model_validate(user)
Concept Breakdown
router.py
HTTP layer only — parse request, call service, return response. No business logic, no SQL. Thin and fast to read.
service.py
Business logic layer. Knows about domain rules (no duplicate emails) but not HTTP status codes or SQL syntax. Testable without a web server.
repository.py
Database layer. Only place that writes SQL or ORM queries. Swappable (e.g., replace SQLAlchemy with asyncpg) without touching the rest.
schemas.py
Pydantic models for request validation (UserCreate) and response serialization (UserOut). Separate from ORM models.
core/
Cross-cutting concerns shared by all features: config, database engine, security utilities. Not feature-specific.

💡 Interview Tip

"How do you structure a FastAPI application?" is a common senior-level question. Walk through the domain-driven layout and explain the responsibility of each file type. Mention that separating router/service/repository means each layer can be tested in isolation.

★ Important

The key rule: dependencies only flow downward — router → service → repository → database. A repository should never import from a router. A service should never import from a router. Violations create circular imports and tightly coupled layers.

37

Repository Pattern

Architecture
The Repository pattern wraps your database access behind an abstract interface. Instead of writing ORM queries scattered across services and routes, all database interaction lives in one place per domain. The key benefit: testability — you can swap the real SQLAlchemy repository with a fake in-memory implementation during tests, making tests fast and database-independent. The pattern uses an Abstract Base Class (ABC) to define the contract, with a concrete implementation for each storage backend.
Python — repository.py
from abc import ABC, abstractmethod
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.users.models import User
from app.users.schemas import UserCreate

# ── Abstract interface (the contract) ────────────────────────────────────
# Any class that implements this is a valid UserRepository.
# Services depend on this interface, not on a specific implementation.
class AbstractUserRepository(ABC):
    @abstractmethod
    async def get(self, user_id: int) -> User | None:
        ...

    @abstractmethod
    async def get_by_email(self, email: str) -> User | None:
        ...

    @abstractmethod
    async def create(self, data: UserCreate) -> User:
        ...

    @abstractmethod
    async def delete(self, user_id: int) -> bool:
        ...

    @abstractmethod
    async def list(self, skip: int = 0, limit: int = 20) -> list[User]:
        ...

# ── SQLAlchemy implementation ──────────────────────────────────────────────
class SQLUserRepository(AbstractUserRepository):
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get(self, user_id: int) -> User | None:
        return await self.session.get(User, user_id)

    async def get_by_email(self, email: str) -> User | None:
        result = await self.session.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()

    async def create(self, data: UserCreate) -> User:
        user = User(name=data.name, email=data.email)
        self.session.add(user)
        await self.session.flush()   # assigns the DB-generated ID without committing
        await self.session.refresh(user)
        return user

    async def delete(self, user_id: int) -> bool:
        user = await self.get(user_id)
        if not user:
            return False
        await self.session.delete(user)
        return True

    async def list(self, skip: int = 0, limit: int = 20) -> list[User]:
        result = await self.session.execute(
            select(User).offset(skip).limit(limit).order_by(User.id)
        )
        return list(result.scalars().all())

# ── In-memory fake for tests ───────────────────────────────────────────────
# This is not a mock — it's a real working implementation using a dict.
# Tests run instantly with no database required.
class FakeUserRepository(AbstractUserRepository):
    def __init__(self):
        self._store: dict[int, User] = {}
        self._next_id = 1

    async def get(self, user_id: int) -> User | None:
        return self._store.get(user_id)

    async def get_by_email(self, email: str) -> User | None:
        return next((u for u in self._store.values() if u.email == email), None)

    async def create(self, data: UserCreate) -> User:
        user = User(id=self._next_id, name=data.name, email=data.email)
        self._store[self._next_id] = user
        self._next_id += 1
        return user

    async def delete(self, user_id: int) -> bool:
        return self._store.pop(user_id, None) is not None

    async def list(self, skip: int = 0, limit: int = 20) -> list[User]:
        return list(self._store.values())[skip : skip + limit]

# ── Usage in tests ─────────────────────────────────────────────────────────
# async def test_create_user():
#     repo = FakeUserRepository()
#     svc = UserService(repo)         # inject fake repo
#     result = await svc.create(UserCreate(name="Alice", email="a@b.com"))
#     assert result.id == 1
#     assert result.email == "a@b.com"
Concept Breakdown
ABC / @abstractmethod
Defines the interface contract. If a subclass doesn't implement all abstract methods, instantiation raises TypeError. Python enforces the contract at class creation time.
session.flush()
Sends SQL to the DB but doesn't commit. The transaction is still open. Used to get a DB-generated ID (from a sequence) while staying in the current transaction.
scalar_one_or_none()
Returns a single result or None. Raises MultipleResultsFound if more than one row matches — safer than first() which silently ignores extras.
FakeRepository
A real in-memory implementation — not a mock. Fake = real logic with fake storage. Mock = returns pre-configured values. Fakes reveal logic bugs; mocks don't.
Dependency on interface
UserService takes AbstractUserRepository — not SQLUserRepository. This is the Dependency Inversion Principle (the "D" in SOLID).

💡 Interview Tip

"How do you test database code without hitting a real DB?" — Inject a FakeUserRepository into the service via dependency injection. The service doesn't know or care what storage is behind the interface. This makes tests run in microseconds, not seconds.

⚠ Gotcha

Don't confuse flush() with commit(). Flush sends SQL within the current transaction (useful for getting IDs). Commit ends the transaction and makes changes permanent. A flush without a commit is rolled back if an error occurs.

38

Service Layer Pattern

Architecture
The service layer is where your application's business logic lives — it's the layer that knows the rules of your domain. The router knows about HTTP; the repository knows about SQL; the service knows about business rules (can't transfer money to yourself, can't book a slot that's already taken, must send a welcome email when a user registers). Keeping this layer separate makes logic testable without a web server or database, and reusable across different entry points (HTTP API, CLI, background job).
Python — service.py
from fastapi import HTTPException, status
from app.users.repository import AbstractUserRepository
from app.users.schemas import UserCreate, UserUpdate, UserOut
from app.core.security import hash_password, verify_password
from app.core.email import send_welcome_email  # imaginary email service

class UserService:
    """
    Business logic for user management.
    Has no knowledge of HTTP, SQL, or request/response formats.
    Depends on the repository INTERFACE, not on any specific DB.
    """
    def __init__(self, repo: AbstractUserRepository):
        self.repo = repo

    async def register(self, data: UserCreate) -> UserOut:
        # ── Business rule 1: no duplicate emails ─────────────────────────
        existing = await self.repo.get_by_email(data.email)
        if existing:
            raise HTTPException(
                status_code=status.HTTP_409_CONFLICT,
                detail="An account with this email already exists.",
            )

        # ── Business rule 2: hash password before storage ─────────────────
        hashed = hash_password(data.password)
        user = await self.repo.create(
            UserCreate(name=data.name, email=data.email, password=hashed)
        )

        # ── Business rule 3: send welcome email ───────────────────────────
        # In production: use a background task or message queue
        await send_welcome_email(user.email, user.name)

        return UserOut.model_validate(user)

    async def update(self, user_id: int, data: UserUpdate) -> UserOut:
        user = await self.repo.get(user_id)
        if not user:
            raise HTTPException(status_code=404, detail="User not found")

        # Only update fields that were actually provided
        # (Pydantic model with exclude_unset)
        updates = data.model_dump(exclude_unset=True)
        if "password" in updates:
            updates["password"] = hash_password(updates["password"])

        for field, value in updates.items():
            setattr(user, field, value)

        updated = await self.repo.save(user)
        return UserOut.model_validate(updated)

    async def authenticate(self, email: str, password: str) -> UserOut:
        user = await self.repo.get_by_email(email)
        # Deliberately vague error — don't reveal whether email exists
        if not user or not verify_password(password, user.password):
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid email or password",
            )
        return UserOut.model_validate(user)

    async def delete(self, user_id: int, requesting_user_id: int) -> None:
        # ── Business rule: users can only delete themselves (or admins) ───
        if user_id != requesting_user_id:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Cannot delete another user's account",
            )
        deleted = await self.repo.delete(user_id)
        if not deleted:
            raise HTTPException(status_code=404, detail="User not found")
Concept Breakdown
Business rule
A constraint that exists in the problem domain, not in HTTP or SQL. "No duplicate emails" is a business rule. "400 Bad Request" is an HTTP detail. Keep them separate.
HTTPException in service
Acceptable in small/medium projects — FastAPI catches it anywhere in the call chain. In strict architectures, raise domain exceptions and let the router convert them to HTTP errors.
exclude_unset
model_dump(exclude_unset=True) — only includes fields the client actually provided, not defaults. Critical for PATCH semantics (partial update without overwriting unchanged fields).
Vague auth error
"Invalid email or password" (not "email not found") prevents user enumeration attacks — attackers can't distinguish missing accounts from wrong passwords.
Self-contained service
The service can be called from an HTTP handler, a CLI script, or a background job — it doesn't import anything from FastAPI routing. Just business logic + repo.

💡 Interview Tip

"What goes in a service layer vs a route handler?" — Services: business rules, validation, orchestration, error logic. Routes: HTTP parsing, dependency injection, serialization, status codes. A good rule of thumb: if the logic would still matter for a CLI version of the same feature, it belongs in the service.

39

Unit of Work Pattern

Architecture
The Unit of Work (UoW) pattern groups all operations in a business transaction into a single atomic unit — either all succeed and are committed, or any failure rolls everything back. Instead of passing a raw database session around, the UoW acts as a transaction boundary. It holds all the repositories for a given transaction, so a service can span multiple repositories (creating a user AND creating their default settings) inside one atomic commit. The async with uow: context manager handles commit on success and rollback on failure automatically.
Python — unit_of_work.py
from abc import ABC, abstractmethod
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from app.users.repository import AbstractUserRepository, SQLUserRepository
from app.settings.repository import AbstractSettingsRepository, SQLSettingsRepository

# ── Abstract UoW ──────────────────────────────────────────────────────────
class AbstractUnitOfWork(ABC):
    users: AbstractUserRepository
    settings: AbstractSettingsRepository

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            await self.rollback()   # any exception → rollback
        else:
            await self.commit()     # clean exit → commit

    @abstractmethod
    async def commit(self): ...

    @abstractmethod
    async def rollback(self): ...

# ── SQLAlchemy implementation ──────────────────────────────────────────────
class SQLUnitOfWork(AbstractUnitOfWork):
    def __init__(self, session_factory: async_sessionmaker):
        self._session_factory = session_factory

    async def __aenter__(self):
        # Open a new session for this unit of work
        self._session: AsyncSession = self._session_factory()
        # Attach all repositories — they share the same session
        self.users    = SQLUserRepository(self._session)
        self.settings = SQLSettingsRepository(self._session)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await super().__aexit__(exc_type, exc_val, exc_tb)
        await self._session.close()   # always release the connection

    async def commit(self):
        await self._session.commit()

    async def rollback(self):
        await self._session.rollback()

# ── Service using UoW ──────────────────────────────────────────────────────
class RegistrationService:
    def __init__(self, uow: AbstractUnitOfWork):
        self.uow = uow

    async def register_user(self, name: str, email: str) -> dict:
        async with self.uow as uow:
            # Both operations happen inside the same DB transaction
            user = await uow.users.create_raw(name=name, email=email)

            # If this fails, the user creation above is also rolled back
            settings = await uow.settings.create_defaults(user_id=user.id)

            # Returning inside __aenter__ triggers the commit in __aexit__
            return {"user_id": user.id, "settings_id": settings.id}
        # If an exception was raised, __aexit__ rolled back both operations

# ── FastAPI dependency ────────────────────────────────────────────────────
from fastapi import Depends
from app.core.database import session_factory  # async_sessionmaker instance
from typing import Annotated

def get_uow() -> SQLUnitOfWork:
    return SQLUnitOfWork(session_factory)

UoWDep = Annotated[SQLUnitOfWork, Depends(get_uow)]

# In a router:
# @router.post("/register")
# async def register(payload: RegisterRequest, uow: UoWDep):
#     svc = RegistrationService(uow)
#     return await svc.register_user(payload.name, payload.email)
Concept Breakdown
async with uow
Context manager protocol: __aenter__ opens session, __aexit__ commits on clean exit or rolls back on exception — automatic transaction management.
Shared session
Both uow.users and uow.settings share the same AsyncSession — this is what makes them part of the same atomic transaction.
rollback on exc
If anything raises inside async with uow, both the user creation and settings creation are rolled back — no half-committed state in the DB.
session_factory
async_sessionmaker configured with the engine. Each UoW creates its own session — sessions are not shared between requests.
Abstract UoW in tests
Swap with a FakeUnitOfWork that uses FakeUserRepository and records commit/rollback calls — test business logic without any I/O.

💡 Interview Tip

"How do you ensure two database operations succeed or fail together?" — The Unit of Work pattern. One session, one transaction. Either both commit (return from async with uow cleanly) or both rollback (any exception). Never use two separate sessions for related operations.

⚠ Gotcha

Don't hold a UoW open for a long time (like across multiple HTTP requests or while waiting for user input). Keep transactions short. Long-running transactions hold DB locks, blocking other writers and readers.

40

DTO vs Domain Model

Architecture
A Domain Model (ORM class) represents how data is stored in the database — it maps to tables and columns, has DB-specific fields like id and created_at, and may contain relationships loaded lazily. A DTO (Data Transfer Object) — Pydantic schema — represents how data crosses a boundary (HTTP request/response). They serve different purposes and should be separate classes: the ORM model might have a hashed_password column you never want to expose in the API response, or the API might accept a password field that never maps directly to the ORM.
Python — dto_vs_domain.py
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, DateTime, func
from pydantic import BaseModel, EmailStr, ConfigDict
from datetime import datetime

# ── Domain Model (ORM — what's in the DB) ────────────────────────────────
class Base(DeclarativeBase): pass

class UserORM(Base):
    """Represents the 'users' table. Never sent directly to clients."""
    __tablename__ = "users"

    id:             Mapped[int]      = mapped_column(primary_key=True)
    name:           Mapped[str]      = mapped_column(String(100))
    email:          Mapped[str]      = mapped_column(String(255), unique=True)
    hashed_password: Mapped[str]     = mapped_column(String(200))  # NEVER expose
    is_active:      Mapped[bool]     = mapped_column(default=True)
    created_at:     Mapped[datetime] = mapped_column(server_default=func.now())
    # Relationships (may lazy-load additional queries)
    # items: Mapped[list["ItemORM"]] = relationship(back_populates="user")

# ── DTOs (Pydantic Schemas — what crosses the HTTP boundary) ──────────────

# Request: what the CLIENT sends when creating a user
class UserCreate(BaseModel):
    name: str
    email: EmailStr
    password: str     # ← plain text; service will hash it before saving

# Request: what the CLIENT sends when updating (all fields optional)
class UserUpdate(BaseModel):
    name: str | None = None
    email: EmailStr | None = None
    password: str | None = None   # optional password change

# Response: what the SERVER returns (no sensitive fields)
class UserOut(BaseModel):
    id: int
    name: str
    email: str
    is_active: bool
    created_at: datetime
    # hashed_password is intentionally ABSENT here

    # from_attributes=True allows Pydantic to read ORM instances
    # (reads attributes like user.id, user.name from the ORM object)
    model_config = ConfigDict(from_attributes=True)

# Response: minimal version for lists (less data = faster response)
class UserSummary(BaseModel):
    id: int
    name: str
    model_config = ConfigDict(from_attributes=True)

# ── Conversion ────────────────────────────────────────────────────────────
# In service layer, after fetching an ORM object:
# user_orm = await repo.get(user_id)          # returns UserORM instance
# user_out = UserOut.model_validate(user_orm)  # ORM → DTO (uses from_attributes)
# return user_out

# ── Why separate? ─────────────────────────────────────────────────────────
# ORM model change (add column) ≠ API change (no client breakage)
# API change (add field) ≠ DB change (no migration required)
# Never accidentally expose internal fields (hashed_password, internal flags)
# Different endpoints can use different response shapes (full vs summary)
Concept Breakdown
Domain Model (ORM)
Maps to DB tables. Contains all stored fields including internal ones (hashed passwords, soft-delete flags). Never returned directly from API routes.
DTO (Pydantic schema)
Defines the API contract. Different schemas for different contexts: UserCreate (input), UserOut (full response), UserSummary (list response).
from_attributes=True
Enables UserOut.model_validate(orm_instance) — Pydantic reads the ORM object's attributes directly without needing a dict conversion step.
model_validate()
Pydantic v2 method that converts an ORM instance (or dict) into a Pydantic model. Validates and maps fields. Replaces v1's from_orm().
Separate schemas
Having UserOut (full) and UserSummary (minimal) for different routes optimizes payload size — list endpoints send less data per item.

★ Important

Always use response_model=UserOut on routes that return user data. FastAPI will serialize the ORM object through the Pydantic model, automatically stripping any field not declared in UserOut — even if the ORM accidentally has more fields.

⚠ Gotcha

If you use the ORM model directly as a response model (response_model=UserORM), you risk accidentally exposing hashed_password or other sensitive fields if Pydantic can read the ORM attributes. Always use a dedicated UserOut schema.

41

Dependency Inversion with Interfaces

Architecture
The Dependency Inversion Principle (the "D" in SOLID) says: high-level modules should not depend on low-level modules — both should depend on abstractions. In practice: your UserService should depend on AbstractUserRepository (an interface), not on SQLUserRepository (a concrete class tied to SQLAlchemy). The concrete implementation is injected at runtime via FastAPI's Depends(). This enables swapping implementations (SQLite ↔ PostgreSQL ↔ MongoDB ↔ FakeRepo) without changing the service.
Python — dependency_inversion.py
from abc import ABC, abstractmethod
from fastapi import FastAPI, Depends
from typing import Annotated, Protocol

app = FastAPI()

# ── Define the interface with Protocol (structural typing) ────────────────
# Protocol is more Pythonic than ABC for interfaces — no explicit inheritance
# required. Any class with matching methods satisfies the protocol.
class EmailService(Protocol):
    async def send(self, to: str, subject: str, body: str) -> None: ...

class NotificationService(Protocol):
    async def notify(self, user_id: int, message: str) -> None: ...

# ── Concrete implementations ───────────────────────────────────────────────
class SMTPEmailService:
    """Production: sends real emails via SMTP."""
    async def send(self, to: str, subject: str, body: str) -> None:
        # In production: use aiosmtplib or sendgrid
        print(f"[SMTP] Sending to {to}: {subject}")

class ConsoleEmailService:
    """Development: prints emails to console instead of sending."""
    async def send(self, to: str, subject: str, body: str) -> None:
        print(f"[CONSOLE] Email to {to}: {subject}\n{body}")

class FakeEmailService:
    """Tests: records sent emails for assertion."""
    def __init__(self):
        self.sent: list[dict] = []

    async def send(self, to: str, subject: str, body: str) -> None:
        self.sent.append({"to": to, "subject": subject, "body": body})

# ── Dependency providers ───────────────────────────────────────────────────
# These functions are the "wiring" — they decide which implementation to use.
# Swap implementations here without touching any service or route.
from app.core.config import settings

def get_email_service() -> EmailService:
    if settings.environment == "production":
        return SMTPEmailService()
    return ConsoleEmailService()

# ── Service depending on the INTERFACE, not the implementation ────────────
class WelcomeService:
    def __init__(self, email_svc: EmailService):
        # This class has no idea whether email_svc is SMTP, console, or fake.
        # It just calls .send() — the contract defined by the Protocol.
        self.email_svc = email_svc

    async def welcome_user(self, name: str, email: str) -> None:
        await self.email_svc.send(
            to=email,
            subject=f"Welcome, {name}!",
            body=f"Thanks for signing up, {name}.",
        )

# ── FastAPI wiring ─────────────────────────────────────────────────────────
EmailDep = Annotated[EmailService, Depends(get_email_service)]

@app.post("/welcome/{email}")
async def welcome(email: str, email_svc: EmailDep):
    svc = WelcomeService(email_svc)
    await svc.welcome_user("Guest", email)
    return {"sent": True}

# ── In tests: override with FakeEmailService ──────────────────────────────
# fake_email = FakeEmailService()
# app.dependency_overrides[get_email_service] = lambda: fake_email
#
# async def test_welcome():
#     response = client.post("/welcome/test@example.com")
#     assert len(fake_email.sent) == 1
#     assert fake_email.sent[0]["to"] == "test@example.com"
Concept Breakdown
Protocol
Python 3.8+ structural typing. A class satisfies a Protocol if it has the required methods — no explicit class Foo(Protocol) inheritance needed. Duck typing with type safety.
ABC vs Protocol
ABC requires explicit inheritance (class SQL(AbstractRepo)). Protocol is structural — any class with matching methods qualifies. Protocol is preferred for external or third-party classes you can't modify.
Wiring function
The get_email_service() function is the composition root — it decides which concrete class to use. In tests, dependency_overrides replaces this function.
Swap without changing service
WelcomeService never imports SMTPEmailService — it only knows the Protocol. Change the provider from SMTP to SES in get_email_service() and nothing else needs to change.

◆ Rarely Known

Python's typing.runtime_checkable decorator on a Protocol enables isinstance(obj, EmailService) checks at runtime — useful for debugging but not needed in normal code. Without it, Protocol is only used for static type checking (mypy/pyright).

42

Event-Driven Patterns

Architecture
Instead of directly calling services from other services (creating tight coupling), an event-driven architecture has code publish domain events ("UserRegistered", "OrderPlaced") and separate handlers subscribe to those events. The publisher doesn't know who handles the event or what they do. This decouples features — adding a new reaction to "UserRegistered" (e.g., send a Slack notification) doesn't require modifying the registration code. For in-process use, a simple EventBus class works; for cross-service events, use message queues (Redis Pub/Sub, RabbitMQ, Kafka).
Python — events.py
from dataclasses import dataclass, field
from datetime import datetime
from collections import defaultdict
from typing import Callable, Awaitable
import asyncio

# ── Domain Events (data classes — no behavior) ────────────────────────────
@dataclass
class DomainEvent:
    occurred_at: datetime = field(default_factory=datetime.utcnow)

@dataclass
class UserRegistered(DomainEvent):
    user_id: int = 0
    email: str = ""
    name: str = ""

@dataclass
class OrderPlaced(DomainEvent):
    order_id: int = 0
    user_id: int = 0
    total: float = 0.0

# ── In-process EventBus ────────────────────────────────────────────────────
# Handlers are async functions: async def handler(event: SomeEvent) -> None
Handler = Callable[[DomainEvent], Awaitable[None]]

class EventBus:
    def __init__(self):
        self._handlers: dict[type, list[Handler]] = defaultdict(list)

    def subscribe(self, event_type: type, handler: Handler) -> None:
        """Register a handler for an event type."""
        self._handlers[event_type].append(handler)

    async def publish(self, event: DomainEvent) -> None:
        """Fire all handlers for this event type. Runs concurrently."""
        handlers = self._handlers.get(type(event), [])
        if handlers:
            await asyncio.gather(*[h(event) for h in handlers])

# ── Global bus (in real apps: inject via Depends) ─────────────────────────
bus = EventBus()

# ── Handlers ──────────────────────────────────────────────────────────────
async def send_welcome_email(event: UserRegistered) -> None:
    print(f"[EMAIL] Welcome email sent to {event.email}")

async def create_default_settings(event: UserRegistered) -> None:
    print(f"[SETTINGS] Default settings created for user {event.user_id}")

async def notify_analytics(event: UserRegistered) -> None:
    print(f"[ANALYTICS] New user tracked: {event.user_id}")

# Register handlers — each handler is independent of the others
bus.subscribe(UserRegistered, send_welcome_email)
bus.subscribe(UserRegistered, create_default_settings)
bus.subscribe(UserRegistered, notify_analytics)

# ── Service publishing an event ───────────────────────────────────────────
class UserService:
    def __init__(self, repo, event_bus: EventBus):
        self.repo = repo
        self.bus = event_bus

    async def register(self, name: str, email: str) -> dict:
        user = await self.repo.create_raw(name=name, email=email)

        # Publish event — don't call send_welcome_email directly.
        # The service doesn't know or care what happens after this line.
        await self.bus.publish(
            UserRegistered(user_id=user.id, email=user.email, name=user.name)
        )
        return {"id": user.id}

# ── FastAPI integration ────────────────────────────────────────────────────
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()

@app.post("/register")
async def register(name: str, email: str, background_tasks: BackgroundTasks):
    svc = UserService(repo=..., event_bus=bus)
    result = await svc.register(name, email)
    return result

# ── Tip: for true decoupling publish events AFTER commit ──────────────────
# If you publish before commit and the commit fails, handlers already ran.
# Use "transactional outbox" pattern or publish inside UoW's commit() method.
Concept Breakdown
Domain Event
A plain data object describing something that happened in the domain. Named in past tense (UserRegistered, not RegisterUser). Immutable — no behavior.
EventBus.publish()
Runs all registered handlers concurrently with asyncio.gather(). If one handler fails, others still run (unless you want fail-fast behavior).
Loose coupling
The registration service doesn't import the email module. Adding a new reaction (Slack, SMS) = subscribe a new handler. No existing code changes.
Transactional outbox
Advanced pattern: store events in a DB table within the same transaction as the main data change, then publish from the outbox asynchronously. Prevents events published for uncommitted data.
In-process vs external
In-process EventBus is simple and fast but events are lost on crash. External queues (Redis, RabbitMQ) are durable but add latency and infrastructure.

💡 Interview Tip

"How do you avoid tight coupling between features?" — Event-driven architecture. Publishing a UserRegistered event keeps the registration service unaware of downstream effects. Interviewers at senior level often ask how you'd extend behavior without modifying existing code — this is the answer.

43

CQRS Basics

Architecture
CQRS (Command Query Responsibility Segregation) separates read operations (queries) from write operations (commands). Commands change state and return nothing (or just a success/ID). Queries read state and never change it. This separation lets you optimize each path independently: queries can use denormalized read models optimized for display, while commands go through validation and business logic. In its simplest form in FastAPI, this just means separate route handlers, services, and even database models for reads vs writes.
Python — cqrs.py
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from typing import Annotated

app = FastAPI()

# ── Commands (write side) ──────────────────────────────────────────────────
# Commands describe an INTENT to change something.
# They are validated, go through business rules, raise on failure.
# Return: None, a created ID, or a minimal result.

class CreateUserCommand(BaseModel):
    name: str
    email: str
    password: str

class UpdateUserCommand(BaseModel):
    user_id: int
    name: str | None = None
    email: str | None = None

class DeleteUserCommand(BaseModel):
    user_id: int
    requested_by: int  # who is deleting — for authorization

class UserCommandService:
    """Handles all write operations. Business logic lives here."""
    async def handle_create(self, cmd: CreateUserCommand) -> int:
        # Validate, create, emit event, return new ID
        # ... business logic ...
        return 42  # new user ID

    async def handle_update(self, cmd: UpdateUserCommand) -> None:
        # Validate, update, emit event
        pass

    async def handle_delete(self, cmd: DeleteUserCommand) -> None:
        if cmd.user_id != cmd.requested_by:
            from fastapi import HTTPException
            raise HTTPException(403, "Cannot delete other users")
        # Delete logic...

# ── Queries (read side) ────────────────────────────────────────────────────
# Queries describe a READ request. They NEVER modify state.
# Can use different, optimized data sources (read replicas, denormalized views).

class UserDetailView(BaseModel):
    """Optimized read model — may join multiple tables, denormalized."""
    id: int
    name: str
    email: str
    item_count: int   # pre-aggregated — not in ORM model
    last_login: str | None

class UserListView(BaseModel):
    id: int
    name: str

class UserQueryService:
    """Handles all read operations. Can use raw SQL for performance."""
    async def get_user(self, user_id: int) -> UserDetailView | None:
        # May query a read-optimized view or materialized view
        # No business rules — just read and return
        return UserDetailView(
            id=user_id, name="Alice", email="a@b.com",
            item_count=5, last_login="2024-01-15"
        )

    async def list_users(self, skip: int = 0, limit: int = 20) -> list[UserListView]:
        return [UserListView(id=i, name=f"User {i}") for i in range(skip, skip+limit)]

# ── FastAPI routes ─────────────────────────────────────────────────────────
# Command routes: verb-first paths, meaningful actions
@app.post("/users/create", status_code=201)
async def create_user(cmd: CreateUserCommand):
    svc = UserCommandService()
    user_id = await svc.handle_create(cmd)
    return {"id": user_id}

@app.delete("/users/{uid}")
async def delete_user(uid: int, requesting_user_id: int):
    svc = UserCommandService()
    await svc.handle_delete(DeleteUserCommand(user_id=uid, requested_by=requesting_user_id))

# Query routes: noun-first paths, return data
@app.get("/users/{uid}", response_model=UserDetailView)
async def get_user(uid: int):
    svc = UserQueryService()
    return await svc.get_user(uid)

@app.get("/users", response_model=list[UserListView])
async def list_users(skip: int = 0, limit: int = 20):
    svc = UserQueryService()
    return await svc.list_users(skip, limit)
Concept Breakdown
Command
An object expressing intent to change state. Named imperatively: CreateUser, PlaceOrder. Goes through validation + business logic. Returns minimal data or nothing.
Query
An object requesting data without side effects. Named descriptively: GetUser, ListOrders. Can bypass domain model entirely for raw SQL performance.
Read model
A Pydantic schema optimized for display — may include pre-joined data (item_count) not in any single ORM model. Avoids N+1 queries by pre-aggregating.
Separate services
UserCommandService handles writes (business rules). UserQueryService handles reads (raw SQL, denormalized views). Each evolves independently.
Read replica
In advanced CQRS, queries run against a read replica (no write load) while commands go to the primary. Query service just needs a different DB URL.

💡 Interview Tip

"What is CQRS and when would you use it?" — Separate read/write models. Use when: read queries are complex aggregations that don't map to write models, when you need separate scaling for reads vs writes, or when adding read projections shouldn't touch write logic. Don't use it for simple CRUD — the overhead isn't worth it.

⚠ Gotcha

Full CQRS with separate read/write databases introduces eventual consistency — after a write, the read replica may lag by milliseconds. This is usually acceptable, but if you immediately redirect after a write and show a list page, the new item may not appear yet. Plan for this in your UI.