FastAPI Mastery
Topic 12 of 22 — Database Integration
54.5% complete
Topic 12
Database Integration 🗄️
SQLAlchemy is the most popular Python ORM — it maps Python classes to database tables and lets you query with Python instead of raw SQL. Combined with Alembic for migrations, it gives you a complete, production-ready database layer. This topic covers sync SQLAlchemy, async SQLAlchemy (the FastAPI-native way), and Alembic migrations end-to-end.
Stack Overview
4FastAPI Route HandlerReceives request, calls service/repo
3AsyncSession (SQLAlchemy)Unit of Work — tracks changes, manages transactions
2Async Engine + Connection PoolManages DB connections, executes SQL
1PostgreSQL / MySQL / SQLiteThe actual database
12.1 SQLAlchemy
⚙️
Engine
The Engine is the entry point to the database. It holds the connection pool — a cache of reusable DB connections. Creating an engine doesn't open a connection; connections are opened lazily when needed.
Connection String Format

dialect+driver://user:pass@host:port/dbname

e.g. postgresql+psycopg2://user:pw@localhost/db

Connection Pool

Reuses DB connections instead of opening a new one per request. Dramatically reduces latency and DB server load.

python — engine setup (sync)
# pip install sqlalchemy psycopg2-binary
from sqlalchemy import create_engine

# Synchronous engine — for scripts, Alembic, background tasks
engine = create_engine(
    "postgresql+psycopg2://user:password@localhost:5432/mydb",

    # Connection pool settings
    pool_size=5,          # Number of persistent connections
    max_overflow=10,      # Extra connections when pool is exhausted
    pool_timeout=30,      # Wait time before giving up (seconds)
    pool_recycle=1800,    # Recycle connections every 30 min (avoids stale)
    pool_pre_ping=True,   # Test connection before use — auto-reconnect

    # Debugging: prints every SQL statement executed
    echo=False,           # Set True during development to see SQL
)

# SQLite (for local dev / testing — no server needed)
sqlite_engine = create_engine(
    "sqlite:///./local.db",
    connect_args={"check_same_thread": False}  # Required for SQLite + FastAPI
)

# Other dialects:
# MySQL:  "mysql+pymysql://user:pw@localhost/db"
# SQLite: "sqlite:///./mydb.db"
ℹ️
In modern FastAPI apps, you'll mostly use the async engine (section 12.2). The sync engine is still useful for Alembic migrations and one-off scripts.
📋
Session
The Session is your workspace for a single unit of work. It tracks changes to objects (add, modify, delete) and commits them to the DB together — or rolls them back if something fails. Think of it like a shopping cart: you add items, then checkout (commit) or cancel (rollback).
SessionFactory (sessionmaker) is a factory that creates Session objects bound to an engine. You typically create one factory at startup and call it per request.
python — session factory + dependency
from sqlalchemy.orm import sessionmaker, Session from fastapi import Depends from typing import Generator # Create a session factory — called once at app startup SessionLocal = sessionmaker( bind=engine, autocommit=False, # Don't auto-commit; we control transactions autoflush=False, # Don't auto-flush before queries expire_on_commit=True, # Expire objects after commit (reload from DB) ) # Dependency: one session per request, auto-closed def get_db() -> Generator[Session, None, None]: db = SessionLocal() try: yield db # Route handler gets db here db.commit() # Commit on success except Exception: db.rollback() # Rollback on any error raise finally: db.close() # Always close the session # Use in a route from fastapi import FastAPI app = FastAPI() @app.get("/users/{user_id}") def get_user(user_id: int, db: Session = Depends(get_db)): user = db.get(User, user_id) # Fetch by primary key return user # Common session operations def crud_examples(db: Session): # CREATE new_user = User(name="Alice", email="alice@example.com") db.add(new_user) db.commit() db.refresh(new_user) # Reload from DB to get auto-generated ID # READ user = db.get(User, 1) # By PK users = db.query(User).filter(User.active == True).all() # Filtered list # UPDATE user.name = "Alice Smith" db.commit() # DELETE db.delete(user) db.commit()
🗂️
Metadata
Metadata is a container that holds the description of all your tables and columns. It's the bridge between your Python ORM models and the actual database schema. When you call Base.metadata.create_all(), SQLAlchemy reads the Metadata and creates the tables.
python — declarative base + metadata
from sqlalchemy.orm import DeclarativeBase

# Modern SQLAlchemy 2.0 style — Base holds shared Metadata
class Base(DeclarativeBase):
    pass

# Base.metadata is the Metadata object
# Every model that inherits from Base is registered here

# Create all tables defined in models (useful for dev / testing)
# In production, use Alembic migrations instead
Base.metadata.create_all(bind=engine)

# Drop all tables (careful!)
# Base.metadata.drop_all(bind=engine)

# Inspect what tables are registered
print(Base.metadata.tables.keys())
# dict_keys(['users', 'posts', 'comments'])
🧱
ORM Models
ORM Models are Python classes that map to database tables. Each class attribute maps to a column. SQLAlchemy handles translating your Python operations into SQL.
⚠️
SQLAlchemy models ≠ Pydantic models. SQLAlchemy models define the DB schema. Pydantic models (schemas) define API request/response shapes. You need both — and convert between them.
python — full ORM model example
from sqlalchemy import ( Column, Integer, String, Boolean, Float, DateTime, ForeignKey, Text, Index ) from sqlalchemy.orm import relationship, DeclarativeBase from sqlalchemy.sql import func from datetime import datetime class Base(DeclarativeBase): pass class User(Base): __tablename__ = "users" # Primary key — auto-incrementing integer id = Column(Integer, primary_key=True, index=True) # String columns with constraints email = Column(String(255), unique=True, nullable=False, index=True) name = Column(String(100), nullable=False) password = Column(String(255), nullable=False) bio = Column(Text, nullable=True) # Boolean with default is_active = Column(Boolean, default=True, nullable=False) is_admin = Column(Boolean, default=False, nullable=False) # Auto-set timestamps created_at = Column(DateTime(timezone=True), server_default=func.now()) updated_at = Column(DateTime(timezone=True), onupdate=func.now()) # Relationship — "a user has many posts" posts = relationship("Post", back_populates="author", cascade="all, delete-orphan") # Composite index for performance __table_args__ = ( Index("ix_users_email_active", "email", "is_active"), ) def __repr__(self): return f"<User id={self.id} email={self.email}>" class Post(Base): __tablename__ = "posts" id = Column(Integer, primary_key=True, index=True) title = Column(String(200), nullable=False) content = Column(Text, nullable=False) views = Column(Integer, default=0) # Foreign key — links to users.id author_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False) # Back-reference — access post.author to get the User object author = relationship("User", back_populates="posts") created_at = Column(DateTime(timezone=True), server_default=func.now())
python — querying with ORM (sync)
from sqlalchemy.orm import Session, joinedload from sqlalchemy import select, and_, or_ def query_examples(db: Session): # Simple select all users = db.query(User).all() # Filter active_users = db.query(User).filter(User.is_active == True).all() # Multiple conditions admins = db.query(User).filter( and_(User.is_active == True, User.is_admin == True) ).all() # OR condition result = db.query(User).filter( or_(User.email.like("%@gmail.com"), User.email.like("%@yahoo.com")) ).all() # Order, limit, offset (pagination) page = db.query(User).order_by(User.created_at.desc()).offset(0).limit(10).all() # Eager load relationships (avoids N+1 queries) users_with_posts = ( db.query(User) .options(joinedload(User.posts)) # JOIN in a single SQL query .filter(User.is_active == True) .all() ) # Count total = db.query(User).count() # SQLAlchemy 2.0 style (recommended) stmt = select(User).where(User.is_active == True).limit(10) result = db.execute(stmt) users = result.scalars().all()
12.2 Async SQLAlchemy
FastAPI is built on async. Sync SQLAlchemy blocks the event loop during DB calls — meaning your entire server freezes while waiting for the DB. Async SQLAlchemy uses async drivers so DB calls become awaitable, keeping the event loop free to handle other requests.
Async Engine
The async engine wraps a sync engine with an async interface. You need an async driverasyncpg for PostgreSQL, aiosqlite for SQLite.
python — async engine setup
# pip install sqlalchemy asyncpg  (PostgreSQL)
# pip install sqlalchemy aiosqlite (SQLite dev)

from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine

# Note: driver is asyncpg (not psycopg2)
async_engine: AsyncEngine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost:5432/mydb",

    # Same pool settings as sync engine
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=1800,
    pool_pre_ping=True,

    # Show SQL in development
    echo=False,
)

# SQLite for local dev/testing
test_engine = create_async_engine(
    "sqlite+aiosqlite:///./test.db",
    connect_args={"check_same_thread": False},
)

# Create tables (dev only — use Alembic in production)
async def init_db():
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
🔄
Async Session — Full FastAPI Integration
The AsyncSession is the async equivalent of Session. All DB operations become await-able. This is the pattern you'll use in production FastAPI apps.
python — database.py (complete setup)
from sqlalchemy.ext.asyncio import ( create_async_engine, AsyncSession, async_sessionmaker ) from sqlalchemy.orm import DeclarativeBase from typing import AsyncGenerator # 1. Base for ORM models class Base(DeclarativeBase): pass # 2. Async engine engine = create_async_engine( "postgresql+asyncpg://user:password@localhost/mydb", pool_size=10, pool_pre_ping=True, ) # 3. Async session factory (SQLAlchemy 2.0 style) AsyncSessionLocal = async_sessionmaker( bind=engine, class_=AsyncSession, autocommit=False, autoflush=False, expire_on_commit=False, # Don't expire objects after commit (async-safe) ) # 4. FastAPI dependency — one AsyncSession per request async def get_db() -> AsyncGenerator[AsyncSession, None]: async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise
python — async CRUD operations
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from sqlalchemy.orm import selectinload from fastapi import FastAPI, Depends, HTTPException app = FastAPI() # CREATE @app.post("/users") async def create_user( name: str, email: str, db: AsyncSession = Depends(get_db) ): user = User(name=name, email=email) db.add(user) await db.flush() # Write to DB but don't commit yet (gets auto-ID) await db.refresh(user) # Reload from DB to get generated values return {"id": user.id, "email": user.email} # READ — by primary key @app.get("/users/{user_id}") async def get_user(user_id: int, db: AsyncSession = Depends(get_db)): user = await db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") return user # READ — filtered query with async @app.get("/users") async def list_users( skip: int = 0, limit: int = 10, db: AsyncSession = Depends(get_db) ): stmt = ( select(User) .where(User.is_active == True) .options(selectinload(User.posts)) # Eager load posts (async-safe) .offset(skip) .limit(limit) .order_by(User.created_at.desc()) ) result = await db.execute(stmt) users = result.scalars().all() return users # UPDATE @app.put("/users/{user_id}") async def update_user(user_id: int, name: str, db: AsyncSession = Depends(get_db)): user = await db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") user.name = name # Modify the object — SQLAlchemy tracks the change await db.flush() # Write to DB (commit happens in get_db on yield exit) return user # DELETE @app.delete("/users/{user_id}") async def delete_user(user_id: int, db: AsyncSession = Depends(get_db)): user = await db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") await db.delete(user) return {"deleted": user_id}
💡
joinedload vs selectinload in async: joinedload uses SQL JOINs but can cause issues in async contexts. Prefer selectinload for async — it fires a separate SELECT IN (...) query which is async-safe and avoids the "MissingGreenlet" error.
python — lifespan: init DB at startup
from contextlib import asynccontextmanager @asynccontextmanager async def lifespan(app: FastAPI): # Startup: create tables (dev) or verify connection async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) print("✅ Database connected") yield # Shutdown: dispose connection pool await engine.dispose() print("🔴 Database pool closed") app = FastAPI(lifespan=lifespan)
12.3 Alembic — Database Migrations
Alembic is a database migration tool for SQLAlchemy. When you change your ORM models (add a column, rename a table, add an index), Alembic generates migration scripts that safely evolve your database schema without losing data. Think of it as git for your database schema.
🔧
Migration Setup + Creation
1
Install and initialise Alembic
Run once per project to create the alembic/ folder and config file.
2
Configure env.py
Tell Alembic where your models and database URL live.
3
Generate migration
Alembic compares your models to the current DB schema and auto-generates the diff.
4
Apply migration
Run alembic upgrade head to apply all pending migrations.
bash — install and init
# Install pip install alembic # Initialise in your project root (creates alembic/ folder) alembic init alembic # For async support, use the async template alembic init -t async alembic
python — alembic/env.py (configure)
from logging.config import fileConfig from sqlalchemy import engine_from_config, pool from alembic import context # Import your Base and ALL models (so Alembic can see them) from app.database import Base from app.models import User, Post # ← must import all model files import os config = context.config # Override DB URL from environment variable config.set_main_option( "sqlalchemy.url", os.environ.get("DATABASE_URL", "postgresql://user:pw@localhost/db") ) # Tell Alembic about your models — enables autogenerate target_metadata = Base.metadata def run_migrations_offline() -> None: url = config.get_main_option("sqlalchemy.url") context.configure(url=url, target_metadata=target_metadata, literal_binds=True) with context.begin_transaction(): context.run_migrations() def run_migrations_online() -> None: connectable = engine_from_config( config.get_section(config.config_ini_section), prefix="sqlalchemy.", poolclass=pool.NullPool, ) with connectable.connect() as connection: context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations() if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()
bash — generate a migration
# Auto-generate migration by comparing models to DB schema alembic revision --autogenerate -m "create users and posts tables" # Creates: alembic/versions/abc123_create_users_and_posts_tables.py # Manual migration (when autogenerate can't detect changes) alembic revision -m "add custom index"
python — alembic/versions/abc123_create_users.py
# Auto-generated migration file — always review before applying! from alembic import op import sqlalchemy as sa revision = 'abc123' down_revision = None # None = first migration branch_labels = None depends_on = None def upgrade() -> None: # Create the users table op.create_table( 'users', sa.Column('id', sa.Integer(), nullable=False), sa.Column('email', sa.String(length=255), nullable=False), sa.Column('name', sa.String(length=100), nullable=False), sa.Column('is_active', sa.Boolean(), nullable=False), sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True) op.create_index(op.f('ix_users_id'), 'users', ['id'], unique=False) def downgrade() -> None: # Reverse: drop the users table op.drop_index(op.f('ix_users_email'), table_name='users') op.drop_index(op.f('ix_users_id'), table_name='users') op.drop_table('users')
⬆️
Upgrade & Downgrade
Alembic tracks which migrations have been applied in a alembic_version table in your database. upgrade moves forward (applies changes), downgrade moves back (reverts them).
bash — alembic commands cheatsheet
# ── Applying migrations ───────────────────── # Apply ALL pending migrations (most common) alembic upgrade head # Apply next one migration only alembic upgrade +1 # Apply to a specific revision alembic upgrade abc123 # ── Reverting migrations ──────────────────── # Revert last migration alembic downgrade -1 # Revert to a specific revision alembic downgrade abc123 # Revert ALL migrations (dangerous!) alembic downgrade base # ── Inspection ────────────────────────────── # Show current DB revision alembic current # Show migration history alembic history --verbose # Show pending (unapplied) migrations alembic history -r current:head # Show SQL that would be run (dry run) alembic upgrade head --sql
python — run migrations programmatically at startup
from alembic.config import Config from alembic import command from contextlib import asynccontextmanager def run_migrations(): # Run migrations on app startup (great for Docker deployments) alembic_cfg = Config("alembic.ini") command.upgrade(alembic_cfg, "head") @asynccontextmanager async def lifespan(app: FastAPI): # Run in thread pool — alembic is synchronous import asyncio loop = asyncio.get_event_loop() await loop.run_in_executor(None, run_migrations) print("✅ Migrations applied") yield app = FastAPI(lifespan=lifespan)
python — common migration operations
from alembic import op import sqlalchemy as sa # Add a column to existing table def upgrade(): op.add_column('users', sa.Column('phone', sa.String(20), nullable=True)) def downgrade(): op.drop_column('users', 'phone') # Rename a column def upgrade(): op.alter_column('users', 'username', new_column_name='display_name') def downgrade(): op.alter_column('users', 'display_name', new_column_name='username') # Add index def upgrade(): op.create_index('ix_posts_author_id', 'posts', ['author_id']) def downgrade(): op.drop_index('ix_posts_author_id', table_name='posts') # Data migration — fill a new column with existing data def upgrade(): op.add_column('users', sa.Column('slug', sa.String(100), nullable=True)) # Populate slug from name for existing rows op.execute("UPDATE users SET slug = lower(replace(name, ' ', '-'))") # Now make it non-nullable op.alter_column('users', 'slug', nullable=False)
⚠️
Always review autogenerated migrations before applying. Alembic sometimes misses things (enum changes, function-based indexes). Also, never edit applied migrations — create a new one instead.
💡
Project structure tip:
app/
├── database.py       ← engine, AsyncSessionLocal, get_db
├── models/
│   ├── __init__.py   ← import all models here
│   ├── user.py
│   └── post.py
alembic/
├── env.py
└── versions/
alembic.ini
📋 Topic 12 Summary
12.1 SQLAlchemy
• Engine = connection pool manager
• Session = unit of work per request
• Metadata = schema registry
• ORM Models = Python ↔ DB tables
12.2 Async SQLAlchemy
create_async_engine + asyncpg driver
AsyncSession via async_sessionmaker
await db.execute(select(...))
selectinload for async eager loading
12.3 Alembic
alembic revision --autogenerate
alembic upgrade head applies changes
alembic downgrade -1 reverts
• Always review before applying
← Topic 11: Security
Reply "next" → Topic 13: Background Processing