Project Structure
Architecture# ── Domain-Driven Structure (recommended for medium-large apps) ───────────
#
# app/
# ├── main.py ← FastAPI() instance, lifespan, router registration
# ├── core/
# │ ├── config.py ← Pydantic Settings, env vars
# │ ├── database.py ← engine, session factory, Base
# │ └── security.py ← JWT helpers, password hashing
# ├── users/
# │ ├── __init__.py
# │ ├── router.py ← APIRouter with /users prefix
# │ ├── service.py ← business logic
# │ ├── repository.py ← database queries
# │ ├── models.py ← SQLAlchemy ORM models
# │ └── schemas.py ← Pydantic request/response schemas
# ├── items/
# │ ├── router.py
# │ ├── service.py
# │ ├── repository.py
# │ ├── models.py
# │ └── schemas.py
# └── tests/
# ├── conftest.py ← shared fixtures, test DB setup
# ├── test_users.py
# └── test_items.py
# ── main.py ───────────────────────────────────────────────────────────────
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.users.router import router as users_router
from app.items.router import router as items_router
from app.core.database import init_db
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
yield
app = FastAPI(lifespan=lifespan)
# Register all routers — each owns its prefix and tags
app.include_router(users_router, prefix="/users", tags=["Users"])
app.include_router(items_router, prefix="/items", tags=["Items"])
# ── users/router.py ───────────────────────────────────────────────────────
from fastapi import APIRouter, Depends
from app.users.service import UserService
from app.users.schemas import UserCreate, UserOut
from app.core.database import get_session
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
router = APIRouter()
# The router doesn't know about DB details — it delegates to the service
@router.post("/", response_model=UserOut, status_code=201)
async def create_user(
payload: UserCreate,
session: Annotated[AsyncSession, Depends(get_session)],
):
svc = UserService(session)
return await svc.create(payload)
# ── users/service.py ──────────────────────────────────────────────────────
# Service layer holds business logic — no HTTP, no SQL
from app.users.repository import UserRepository
from app.users.schemas import UserCreate, UserOut
from app.users.models import User
from sqlalchemy.ext.asyncio import AsyncSession
class UserService:
def __init__(self, session: AsyncSession):
self.repo = UserRepository(session)
async def create(self, data: UserCreate) -> UserOut:
# Business logic: check for duplicates, hash password, etc.
existing = await self.repo.get_by_email(data.email)
if existing:
from fastapi import HTTPException
raise HTTPException(409, "Email already registered")
user = await self.repo.create(data)
return UserOut.model_validate(user)
UserCreate) and response serialization (UserOut). Separate from ORM models.💡 Interview Tip
"How do you structure a FastAPI application?" is a common senior-level question. Walk through the domain-driven layout and explain the responsibility of each file type. Mention that separating router/service/repository means each layer can be tested in isolation.
★ Important
The key rule: dependencies only flow downward — router → service → repository → database. A repository should never import from a router. A service should never import from a router. Violations create circular imports and tightly coupled layers.
Repository Pattern
Architecturefrom abc import ABC, abstractmethod
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.users.models import User
from app.users.schemas import UserCreate
# ── Abstract interface (the contract) ────────────────────────────────────
# Any class that implements this is a valid UserRepository.
# Services depend on this interface, not on a specific implementation.
class AbstractUserRepository(ABC):
@abstractmethod
async def get(self, user_id: int) -> User | None:
...
@abstractmethod
async def get_by_email(self, email: str) -> User | None:
...
@abstractmethod
async def create(self, data: UserCreate) -> User:
...
@abstractmethod
async def delete(self, user_id: int) -> bool:
...
@abstractmethod
async def list(self, skip: int = 0, limit: int = 20) -> list[User]:
...
# ── SQLAlchemy implementation ──────────────────────────────────────────────
class SQLUserRepository(AbstractUserRepository):
def __init__(self, session: AsyncSession):
self.session = session
async def get(self, user_id: int) -> User | None:
return await self.session.get(User, user_id)
async def get_by_email(self, email: str) -> User | None:
result = await self.session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def create(self, data: UserCreate) -> User:
user = User(name=data.name, email=data.email)
self.session.add(user)
await self.session.flush() # assigns the DB-generated ID without committing
await self.session.refresh(user)
return user
async def delete(self, user_id: int) -> bool:
user = await self.get(user_id)
if not user:
return False
await self.session.delete(user)
return True
async def list(self, skip: int = 0, limit: int = 20) -> list[User]:
result = await self.session.execute(
select(User).offset(skip).limit(limit).order_by(User.id)
)
return list(result.scalars().all())
# ── In-memory fake for tests ───────────────────────────────────────────────
# This is not a mock — it's a real working implementation using a dict.
# Tests run instantly with no database required.
class FakeUserRepository(AbstractUserRepository):
def __init__(self):
self._store: dict[int, User] = {}
self._next_id = 1
async def get(self, user_id: int) -> User | None:
return self._store.get(user_id)
async def get_by_email(self, email: str) -> User | None:
return next((u for u in self._store.values() if u.email == email), None)
async def create(self, data: UserCreate) -> User:
user = User(id=self._next_id, name=data.name, email=data.email)
self._store[self._next_id] = user
self._next_id += 1
return user
async def delete(self, user_id: int) -> bool:
return self._store.pop(user_id, None) is not None
async def list(self, skip: int = 0, limit: int = 20) -> list[User]:
return list(self._store.values())[skip : skip + limit]
# ── Usage in tests ─────────────────────────────────────────────────────────
# async def test_create_user():
# repo = FakeUserRepository()
# svc = UserService(repo) # inject fake repo
# result = await svc.create(UserCreate(name="Alice", email="a@b.com"))
# assert result.id == 1
# assert result.email == "a@b.com"
TypeError. Python enforces the contract at class creation time.None. Raises MultipleResultsFound if more than one row matches — safer than first() which silently ignores extras.UserService takes AbstractUserRepository — not SQLUserRepository. This is the Dependency Inversion Principle (the "D" in SOLID).💡 Interview Tip
"How do you test database code without hitting a real DB?" — Inject a FakeUserRepository into the service via dependency injection. The service doesn't know or care what storage is behind the interface. This makes tests run in microseconds, not seconds.
⚠ Gotcha
Don't confuse flush() with commit(). Flush sends SQL within the current transaction (useful for getting IDs). Commit ends the transaction and makes changes permanent. A flush without a commit is rolled back if an error occurs.
Service Layer Pattern
Architecturefrom fastapi import HTTPException, status
from app.users.repository import AbstractUserRepository
from app.users.schemas import UserCreate, UserUpdate, UserOut
from app.core.security import hash_password, verify_password
from app.core.email import send_welcome_email # imaginary email service
class UserService:
"""
Business logic for user management.
Has no knowledge of HTTP, SQL, or request/response formats.
Depends on the repository INTERFACE, not on any specific DB.
"""
def __init__(self, repo: AbstractUserRepository):
self.repo = repo
async def register(self, data: UserCreate) -> UserOut:
# ── Business rule 1: no duplicate emails ─────────────────────────
existing = await self.repo.get_by_email(data.email)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="An account with this email already exists.",
)
# ── Business rule 2: hash password before storage ─────────────────
hashed = hash_password(data.password)
user = await self.repo.create(
UserCreate(name=data.name, email=data.email, password=hashed)
)
# ── Business rule 3: send welcome email ───────────────────────────
# In production: use a background task or message queue
await send_welcome_email(user.email, user.name)
return UserOut.model_validate(user)
async def update(self, user_id: int, data: UserUpdate) -> UserOut:
user = await self.repo.get(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Only update fields that were actually provided
# (Pydantic model with exclude_unset)
updates = data.model_dump(exclude_unset=True)
if "password" in updates:
updates["password"] = hash_password(updates["password"])
for field, value in updates.items():
setattr(user, field, value)
updated = await self.repo.save(user)
return UserOut.model_validate(updated)
async def authenticate(self, email: str, password: str) -> UserOut:
user = await self.repo.get_by_email(email)
# Deliberately vague error — don't reveal whether email exists
if not user or not verify_password(password, user.password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid email or password",
)
return UserOut.model_validate(user)
async def delete(self, user_id: int, requesting_user_id: int) -> None:
# ── Business rule: users can only delete themselves (or admins) ───
if user_id != requesting_user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot delete another user's account",
)
deleted = await self.repo.delete(user_id)
if not deleted:
raise HTTPException(status_code=404, detail="User not found")
model_dump(exclude_unset=True) — only includes fields the client actually provided, not defaults. Critical for PATCH semantics (partial update without overwriting unchanged fields).💡 Interview Tip
"What goes in a service layer vs a route handler?" — Services: business rules, validation, orchestration, error logic. Routes: HTTP parsing, dependency injection, serialization, status codes. A good rule of thumb: if the logic would still matter for a CLI version of the same feature, it belongs in the service.
Unit of Work Pattern
Architectureasync with uow: context manager handles commit on success and rollback on failure automatically.
from abc import ABC, abstractmethod
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from app.users.repository import AbstractUserRepository, SQLUserRepository
from app.settings.repository import AbstractSettingsRepository, SQLSettingsRepository
# ── Abstract UoW ──────────────────────────────────────────────────────────
class AbstractUnitOfWork(ABC):
users: AbstractUserRepository
settings: AbstractSettingsRepository
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type:
await self.rollback() # any exception → rollback
else:
await self.commit() # clean exit → commit
@abstractmethod
async def commit(self): ...
@abstractmethod
async def rollback(self): ...
# ── SQLAlchemy implementation ──────────────────────────────────────────────
class SQLUnitOfWork(AbstractUnitOfWork):
def __init__(self, session_factory: async_sessionmaker):
self._session_factory = session_factory
async def __aenter__(self):
# Open a new session for this unit of work
self._session: AsyncSession = self._session_factory()
# Attach all repositories — they share the same session
self.users = SQLUserRepository(self._session)
self.settings = SQLSettingsRepository(self._session)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await super().__aexit__(exc_type, exc_val, exc_tb)
await self._session.close() # always release the connection
async def commit(self):
await self._session.commit()
async def rollback(self):
await self._session.rollback()
# ── Service using UoW ──────────────────────────────────────────────────────
class RegistrationService:
def __init__(self, uow: AbstractUnitOfWork):
self.uow = uow
async def register_user(self, name: str, email: str) -> dict:
async with self.uow as uow:
# Both operations happen inside the same DB transaction
user = await uow.users.create_raw(name=name, email=email)
# If this fails, the user creation above is also rolled back
settings = await uow.settings.create_defaults(user_id=user.id)
# Returning inside __aenter__ triggers the commit in __aexit__
return {"user_id": user.id, "settings_id": settings.id}
# If an exception was raised, __aexit__ rolled back both operations
# ── FastAPI dependency ────────────────────────────────────────────────────
from fastapi import Depends
from app.core.database import session_factory # async_sessionmaker instance
from typing import Annotated
def get_uow() -> SQLUnitOfWork:
return SQLUnitOfWork(session_factory)
UoWDep = Annotated[SQLUnitOfWork, Depends(get_uow)]
# In a router:
# @router.post("/register")
# async def register(payload: RegisterRequest, uow: UoWDep):
# svc = RegistrationService(uow)
# return await svc.register_user(payload.name, payload.email)
__aenter__ opens session, __aexit__ commits on clean exit or rolls back on exception — automatic transaction management.uow.users and uow.settings share the same AsyncSession — this is what makes them part of the same atomic transaction.async with uow, both the user creation and settings creation are rolled back — no half-committed state in the DB.async_sessionmaker configured with the engine. Each UoW creates its own session — sessions are not shared between requests.FakeUnitOfWork that uses FakeUserRepository and records commit/rollback calls — test business logic without any I/O.💡 Interview Tip
"How do you ensure two database operations succeed or fail together?" — The Unit of Work pattern. One session, one transaction. Either both commit (return from async with uow cleanly) or both rollback (any exception). Never use two separate sessions for related operations.
⚠ Gotcha
Don't hold a UoW open for a long time (like across multiple HTTP requests or while waiting for user input). Keep transactions short. Long-running transactions hold DB locks, blocking other writers and readers.
DTO vs Domain Model
Architectureid and created_at, and may contain relationships loaded lazily. A DTO (Data Transfer Object) — Pydantic schema — represents how data crosses a boundary (HTTP request/response). They serve different purposes and should be separate classes: the ORM model might have a hashed_password column you never want to expose in the API response, or the API might accept a password field that never maps directly to the ORM.
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, DateTime, func
from pydantic import BaseModel, EmailStr, ConfigDict
from datetime import datetime
# ── Domain Model (ORM — what's in the DB) ────────────────────────────────
class Base(DeclarativeBase): pass
class UserORM(Base):
"""Represents the 'users' table. Never sent directly to clients."""
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
email: Mapped[str] = mapped_column(String(255), unique=True)
hashed_password: Mapped[str] = mapped_column(String(200)) # NEVER expose
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
# Relationships (may lazy-load additional queries)
# items: Mapped[list["ItemORM"]] = relationship(back_populates="user")
# ── DTOs (Pydantic Schemas — what crosses the HTTP boundary) ──────────────
# Request: what the CLIENT sends when creating a user
class UserCreate(BaseModel):
name: str
email: EmailStr
password: str # ← plain text; service will hash it before saving
# Request: what the CLIENT sends when updating (all fields optional)
class UserUpdate(BaseModel):
name: str | None = None
email: EmailStr | None = None
password: str | None = None # optional password change
# Response: what the SERVER returns (no sensitive fields)
class UserOut(BaseModel):
id: int
name: str
email: str
is_active: bool
created_at: datetime
# hashed_password is intentionally ABSENT here
# from_attributes=True allows Pydantic to read ORM instances
# (reads attributes like user.id, user.name from the ORM object)
model_config = ConfigDict(from_attributes=True)
# Response: minimal version for lists (less data = faster response)
class UserSummary(BaseModel):
id: int
name: str
model_config = ConfigDict(from_attributes=True)
# ── Conversion ────────────────────────────────────────────────────────────
# In service layer, after fetching an ORM object:
# user_orm = await repo.get(user_id) # returns UserORM instance
# user_out = UserOut.model_validate(user_orm) # ORM → DTO (uses from_attributes)
# return user_out
# ── Why separate? ─────────────────────────────────────────────────────────
# ORM model change (add column) ≠ API change (no client breakage)
# API change (add field) ≠ DB change (no migration required)
# Never accidentally expose internal fields (hashed_password, internal flags)
# Different endpoints can use different response shapes (full vs summary)
UserCreate (input), UserOut (full response), UserSummary (list response).UserOut.model_validate(orm_instance) — Pydantic reads the ORM object's attributes directly without needing a dict conversion step.from_orm().UserOut (full) and UserSummary (minimal) for different routes optimizes payload size — list endpoints send less data per item.★ Important
Always use response_model=UserOut on routes that return user data. FastAPI will serialize the ORM object through the Pydantic model, automatically stripping any field not declared in UserOut — even if the ORM accidentally has more fields.
⚠ Gotcha
If you use the ORM model directly as a response model (response_model=UserORM), you risk accidentally exposing hashed_password or other sensitive fields if Pydantic can read the ORM attributes. Always use a dedicated UserOut schema.
Dependency Inversion with Interfaces
ArchitectureUserService should depend on AbstractUserRepository (an interface), not on SQLUserRepository (a concrete class tied to SQLAlchemy). The concrete implementation is injected at runtime via FastAPI's Depends(). This enables swapping implementations (SQLite ↔ PostgreSQL ↔ MongoDB ↔ FakeRepo) without changing the service.
from abc import ABC, abstractmethod
from fastapi import FastAPI, Depends
from typing import Annotated, Protocol
app = FastAPI()
# ── Define the interface with Protocol (structural typing) ────────────────
# Protocol is more Pythonic than ABC for interfaces — no explicit inheritance
# required. Any class with matching methods satisfies the protocol.
class EmailService(Protocol):
async def send(self, to: str, subject: str, body: str) -> None: ...
class NotificationService(Protocol):
async def notify(self, user_id: int, message: str) -> None: ...
# ── Concrete implementations ───────────────────────────────────────────────
class SMTPEmailService:
"""Production: sends real emails via SMTP."""
async def send(self, to: str, subject: str, body: str) -> None:
# In production: use aiosmtplib or sendgrid
print(f"[SMTP] Sending to {to}: {subject}")
class ConsoleEmailService:
"""Development: prints emails to console instead of sending."""
async def send(self, to: str, subject: str, body: str) -> None:
print(f"[CONSOLE] Email to {to}: {subject}\n{body}")
class FakeEmailService:
"""Tests: records sent emails for assertion."""
def __init__(self):
self.sent: list[dict] = []
async def send(self, to: str, subject: str, body: str) -> None:
self.sent.append({"to": to, "subject": subject, "body": body})
# ── Dependency providers ───────────────────────────────────────────────────
# These functions are the "wiring" — they decide which implementation to use.
# Swap implementations here without touching any service or route.
from app.core.config import settings
def get_email_service() -> EmailService:
if settings.environment == "production":
return SMTPEmailService()
return ConsoleEmailService()
# ── Service depending on the INTERFACE, not the implementation ────────────
class WelcomeService:
def __init__(self, email_svc: EmailService):
# This class has no idea whether email_svc is SMTP, console, or fake.
# It just calls .send() — the contract defined by the Protocol.
self.email_svc = email_svc
async def welcome_user(self, name: str, email: str) -> None:
await self.email_svc.send(
to=email,
subject=f"Welcome, {name}!",
body=f"Thanks for signing up, {name}.",
)
# ── FastAPI wiring ─────────────────────────────────────────────────────────
EmailDep = Annotated[EmailService, Depends(get_email_service)]
@app.post("/welcome/{email}")
async def welcome(email: str, email_svc: EmailDep):
svc = WelcomeService(email_svc)
await svc.welcome_user("Guest", email)
return {"sent": True}
# ── In tests: override with FakeEmailService ──────────────────────────────
# fake_email = FakeEmailService()
# app.dependency_overrides[get_email_service] = lambda: fake_email
#
# async def test_welcome():
# response = client.post("/welcome/test@example.com")
# assert len(fake_email.sent) == 1
# assert fake_email.sent[0]["to"] == "test@example.com"
class Foo(Protocol) inheritance needed. Duck typing with type safety.class SQL(AbstractRepo)). Protocol is structural — any class with matching methods qualifies. Protocol is preferred for external or third-party classes you can't modify.get_email_service() function is the composition root — it decides which concrete class to use. In tests, dependency_overrides replaces this function.WelcomeService never imports SMTPEmailService — it only knows the Protocol. Change the provider from SMTP to SES in get_email_service() and nothing else needs to change.◆ Rarely Known
Python's typing.runtime_checkable decorator on a Protocol enables isinstance(obj, EmailService) checks at runtime — useful for debugging but not needed in normal code. Without it, Protocol is only used for static type checking (mypy/pyright).
Event-Driven Patterns
ArchitectureEventBus class works; for cross-service events, use message queues (Redis Pub/Sub, RabbitMQ, Kafka).
from dataclasses import dataclass, field
from datetime import datetime
from collections import defaultdict
from typing import Callable, Awaitable
import asyncio
# ── Domain Events (data classes — no behavior) ────────────────────────────
@dataclass
class DomainEvent:
occurred_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class UserRegistered(DomainEvent):
user_id: int = 0
email: str = ""
name: str = ""
@dataclass
class OrderPlaced(DomainEvent):
order_id: int = 0
user_id: int = 0
total: float = 0.0
# ── In-process EventBus ────────────────────────────────────────────────────
# Handlers are async functions: async def handler(event: SomeEvent) -> None
Handler = Callable[[DomainEvent], Awaitable[None]]
class EventBus:
def __init__(self):
self._handlers: dict[type, list[Handler]] = defaultdict(list)
def subscribe(self, event_type: type, handler: Handler) -> None:
"""Register a handler for an event type."""
self._handlers[event_type].append(handler)
async def publish(self, event: DomainEvent) -> None:
"""Fire all handlers for this event type. Runs concurrently."""
handlers = self._handlers.get(type(event), [])
if handlers:
await asyncio.gather(*[h(event) for h in handlers])
# ── Global bus (in real apps: inject via Depends) ─────────────────────────
bus = EventBus()
# ── Handlers ──────────────────────────────────────────────────────────────
async def send_welcome_email(event: UserRegistered) -> None:
print(f"[EMAIL] Welcome email sent to {event.email}")
async def create_default_settings(event: UserRegistered) -> None:
print(f"[SETTINGS] Default settings created for user {event.user_id}")
async def notify_analytics(event: UserRegistered) -> None:
print(f"[ANALYTICS] New user tracked: {event.user_id}")
# Register handlers — each handler is independent of the others
bus.subscribe(UserRegistered, send_welcome_email)
bus.subscribe(UserRegistered, create_default_settings)
bus.subscribe(UserRegistered, notify_analytics)
# ── Service publishing an event ───────────────────────────────────────────
class UserService:
def __init__(self, repo, event_bus: EventBus):
self.repo = repo
self.bus = event_bus
async def register(self, name: str, email: str) -> dict:
user = await self.repo.create_raw(name=name, email=email)
# Publish event — don't call send_welcome_email directly.
# The service doesn't know or care what happens after this line.
await self.bus.publish(
UserRegistered(user_id=user.id, email=user.email, name=user.name)
)
return {"id": user.id}
# ── FastAPI integration ────────────────────────────────────────────────────
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
@app.post("/register")
async def register(name: str, email: str, background_tasks: BackgroundTasks):
svc = UserService(repo=..., event_bus=bus)
result = await svc.register(name, email)
return result
# ── Tip: for true decoupling publish events AFTER commit ──────────────────
# If you publish before commit and the commit fails, handlers already ran.
# Use "transactional outbox" pattern or publish inside UoW's commit() method.
UserRegistered, not RegisterUser). Immutable — no behavior.asyncio.gather(). If one handler fails, others still run (unless you want fail-fast behavior).💡 Interview Tip
"How do you avoid tight coupling between features?" — Event-driven architecture. Publishing a UserRegistered event keeps the registration service unaware of downstream effects. Interviewers at senior level often ask how you'd extend behavior without modifying existing code — this is the answer.
CQRS Basics
Architecturefrom fastapi import FastAPI, Depends
from pydantic import BaseModel
from typing import Annotated
app = FastAPI()
# ── Commands (write side) ──────────────────────────────────────────────────
# Commands describe an INTENT to change something.
# They are validated, go through business rules, raise on failure.
# Return: None, a created ID, or a minimal result.
class CreateUserCommand(BaseModel):
name: str
email: str
password: str
class UpdateUserCommand(BaseModel):
user_id: int
name: str | None = None
email: str | None = None
class DeleteUserCommand(BaseModel):
user_id: int
requested_by: int # who is deleting — for authorization
class UserCommandService:
"""Handles all write operations. Business logic lives here."""
async def handle_create(self, cmd: CreateUserCommand) -> int:
# Validate, create, emit event, return new ID
# ... business logic ...
return 42 # new user ID
async def handle_update(self, cmd: UpdateUserCommand) -> None:
# Validate, update, emit event
pass
async def handle_delete(self, cmd: DeleteUserCommand) -> None:
if cmd.user_id != cmd.requested_by:
from fastapi import HTTPException
raise HTTPException(403, "Cannot delete other users")
# Delete logic...
# ── Queries (read side) ────────────────────────────────────────────────────
# Queries describe a READ request. They NEVER modify state.
# Can use different, optimized data sources (read replicas, denormalized views).
class UserDetailView(BaseModel):
"""Optimized read model — may join multiple tables, denormalized."""
id: int
name: str
email: str
item_count: int # pre-aggregated — not in ORM model
last_login: str | None
class UserListView(BaseModel):
id: int
name: str
class UserQueryService:
"""Handles all read operations. Can use raw SQL for performance."""
async def get_user(self, user_id: int) -> UserDetailView | None:
# May query a read-optimized view or materialized view
# No business rules — just read and return
return UserDetailView(
id=user_id, name="Alice", email="a@b.com",
item_count=5, last_login="2024-01-15"
)
async def list_users(self, skip: int = 0, limit: int = 20) -> list[UserListView]:
return [UserListView(id=i, name=f"User {i}") for i in range(skip, skip+limit)]
# ── FastAPI routes ─────────────────────────────────────────────────────────
# Command routes: verb-first paths, meaningful actions
@app.post("/users/create", status_code=201)
async def create_user(cmd: CreateUserCommand):
svc = UserCommandService()
user_id = await svc.handle_create(cmd)
return {"id": user_id}
@app.delete("/users/{uid}")
async def delete_user(uid: int, requesting_user_id: int):
svc = UserCommandService()
await svc.handle_delete(DeleteUserCommand(user_id=uid, requested_by=requesting_user_id))
# Query routes: noun-first paths, return data
@app.get("/users/{uid}", response_model=UserDetailView)
async def get_user(uid: int):
svc = UserQueryService()
return await svc.get_user(uid)
@app.get("/users", response_model=list[UserListView])
async def list_users(skip: int = 0, limit: int = 20):
svc = UserQueryService()
return await svc.list_users(skip, limit)
CreateUser, PlaceOrder. Goes through validation + business logic. Returns minimal data or nothing.GetUser, ListOrders. Can bypass domain model entirely for raw SQL performance.item_count) not in any single ORM model. Avoids N+1 queries by pre-aggregating.UserCommandService handles writes (business rules). UserQueryService handles reads (raw SQL, denormalized views). Each evolves independently.💡 Interview Tip
"What is CQRS and when would you use it?" — Separate read/write models. Use when: read queries are complex aggregations that don't map to write models, when you need separate scaling for reads vs writes, or when adding read projections shouldn't touch write logic. Don't use it for simple CRUD — the overhead isn't worth it.
⚠ Gotcha
Full CQRS with separate read/write databases introduces eventual consistency — after a write, the read replica may lag by milliseconds. This is usually acceptable, but if you immediately redirect after a write and show a list page, the new item may not appear yet. Plan for this in your UI.