FastAPI Mastery Course
Topic 2 / 22  ·  Async Programming
9% complete · 2 of 22 topics
Topic 2 · Section 2.1
The Event Loop
The event loop is the beating heart of every FastAPI application. It's a single-threaded loop that juggles thousands of concurrent connections by cleverly switching between tasks whenever one is waiting for I/O. Understanding it explains every async design decision FastAPI makes.
2.1.1 Event Loop Fundamentals
🔄 Event Loop Architecture
The event loop is a single loop that runs forever, continuously checking: "Is there a task ready to run? Run it. Did it pause to wait for I/O? Switch to the next task." This is called cooperative multitasking — tasks voluntarily yield control using await.
Event Loop — Core Cycle ┌─────────────────────────────────────────────────────────┐ │ EVENT LOOP │ │ │ │ ┌──────────┐ ┌──────────┐ ┌───────────────────┐ │ │ │ Ready │───▶│ Run │───▶│ Task hits `await` │ │ │ │ Queue │ │ callback │ │ → registers I/O │ │ │ └──────────┘ └──────────┘ └────────┬──────────┘ │ │ ▲ │ │ │ ┌────┴──────┐ ┌────────▼──────────┐ │ │ │ I/O done │◀──────────────────│ OS / selector │ │ │ │ → wake up │ │ (epoll / kqueue) │ │ │ └───────────┘ └───────────────────┘ │ └─────────────────────────────────────────────────────────┘ One thread. No blocking. Thousands of concurrent I/O ops.
💡
Why FastAPI uses this: A traditional web server blocks one thread per request waiting for DB/HTTP. With the event loop, a single thread handles thousands of requests simultaneously — whenever one waits for a DB query, another runs.
pythonevent_loop_basics.py
import asyncio

# Get or create the event loop
async def main():
    loop = asyncio.get_event_loop()          # get running loop
    print(f"Loop running: {loop.is_running()}")  # True (we're inside it)

    # Schedule a callback to run in the NEXT loop iteration
    loop.call_soon(lambda: print("Scheduled callback ran!"))

    # Schedule a callback after a delay
    loop.call_later(1.0, lambda: print("Ran after 1 second"))

    # Yield to let other tasks run (+ let callbacks fire)
    await asyncio.sleep(0)   # ← zero sleep = yield to event loop

# Entry point: runs the event loop until main() completes
asyncio.run(main())

# In FastAPI — Uvicorn creates and manages the event loop for you:
# uvicorn main:app --host 0.0.0.0 --port 8000
# Uvicorn runs the event loop, FastAPI registers route handlers as coroutines.
🤝 Cooperative Multitasking
Unlike OS threads (preemptive — OS forcibly switches), asyncio uses cooperative multitasking — a coroutine runs until it explicitly yields with await. This means: if you never await, you block the entire event loop for every other request.
🚨
Critical FastAPI pitfall: Calling time.sleep(), a blocking DB driver (psycopg2), or any CPU-heavy code inside an async def handler BLOCKS the event loop — all other requests stall until it finishes. Always use async alternatives or offload to a thread pool.
pythoncooperative_multitasking.py
import asyncio, time

# ❌ WRONG — blocks the event loop (request B waits for request A's sleep)
async def bad_handler():
    time.sleep(2)            # BLOCKING — freezes the entire loop!
    return {"status": "done"}

# ✅ CORRECT — yields to the event loop during the wait
async def good_handler():
    await asyncio.sleep(2)   # NON-BLOCKING — other requests run during this
    return {"status": "done"}

# Demo: cooperative switching between two coroutines
async def task_a():
    print("A: starting")
    await asyncio.sleep(1)   # yields here → task_b gets to run
    print("A: done")

async def task_b():
    print("B: starting")
    await asyncio.sleep(0.5) # yields → but finishes before A
    print("B: done")

async def main():
    await asyncio.gather(task_a(), task_b())

asyncio.run(main())
# Output:
# A: starting
# B: starting   ← B runs while A is sleeping
# B: done       ← B finishes first (0.5s)
# A: done       ← A finishes later (1s)

2.1.2 Scheduling — Ready Queue & Delayed Queue
📋 Ready Queue, Delayed Queue & Task Switching
The event loop maintains two internal queues:

Ready queue — callbacks/coroutines that can run right now (I/O completed, sleep expired, freshly created tasks).
Delayed queue — callbacks scheduled for a future time via call_later() or asyncio.sleep(n). A heap (min-priority queue) sorted by scheduled time.

Each loop iteration: run everything in the ready queue, then check the OS for I/O events, then promote any elapsed delayed callbacks to the ready queue.
pythonscheduling_internals.py
import asyncio

async def demonstrate_scheduling():
    loop = asyncio.get_event_loop()

    # call_soon → added to READY queue immediately
    loop.call_soon(lambda: print("[ready] I run next iteration"))

    # call_later → added to DELAYED queue, moved to ready after 0.1s
    loop.call_later(0.1, lambda: print("[delayed] I run after 0.1s"))
    loop.call_later(0.5, lambda: print("[delayed] I run after 0.5s"))

    # call_at → schedule at an absolute event loop time
    now = loop.time()
    loop.call_at(now + 0.2, lambda: print("[delayed] I run at loop_time+0.2"))

    # asyncio.sleep internally uses call_later
    await asyncio.sleep(1)  # wait for all callbacks to fire

# Task switching: every `await` is a potential switch point
async def show_switch_points():
    print("1: before await")
    await asyncio.sleep(0)   # switch point — other tasks can run here
    print("2: after first await")
    await asyncio.sleep(0)   # another switch point
    print("3: done")

asyncio.run(demonstrate_scheduling())
Topic 2 · Section 2.2
Coroutines
Coroutines are the unit of concurrency in FastAPI. Every route handler you define with async def is a coroutine. Understanding how they're created, suspended, and resumed explains exactly how your API handles concurrent requests.
2.2.1 async — Coroutine Creation & Lifecycle
🌀 Coroutine Creation
Adding async def instead of def turns a function into a coroutine function. Calling it doesn't run it — it returns a coroutine object. The event loop runs it when you await it or wrap it in a Task.
async def fn()
fn() called
coroutine object
await / Task
executes
pythoncoroutine_creation.py
import asyncio, inspect

# 1. Define a coroutine function
async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)          # simulates DB/HTTP call
    return {"id": user_id, "name": "Alice"}

# 2. Calling it does NOT run the code — returns a coroutine object
coro = fetch_user(42)
print(type(coro))                   # <class 'coroutine'>
print(inspect.iscoroutine(coro))   # True

# 3. To actually run it, you must await it or pass to asyncio.run()
async def main():
    result = await fetch_user(42)   # NOW it executes
    print(result)                    # {'id': 42, 'name': 'Alice'}

asyncio.run(main())

# ── FastAPI route: every handler is a coroutine ──
# @app.get("/users/{user_id}")
# async def get_user(user_id: int):          ← coroutine function
#     user = await user_service.get(user_id) ← suspends here, other requests run
#     return user
📊 Coroutine Lifecycle
A coroutine goes through four states: CreatedRunningSuspended (at every await) → Closed (returned or raised). You can inspect these states programmatically.
Coroutine Lifecycle CREATED RUNNING SUSPENDED CLOSED ──────── ─────── ───────── ────── coro = fn() → await coro → hits await → return/raise (not started) (executing) (waiting I/O) (done) │ │ └───────────────┘ can oscillate many times before finally closing
python
import asyncio, inspect

async def my_coro():
    await asyncio.sleep(0.1)
    return "done"

async def inspect_lifecycle():
    coro = my_coro()
    print(coro.cr_frame)              # frame object (not None = not yet started)
    print(inspect.getcoroutinestate(coro))  # CORO_CREATED

    task = asyncio.create_task(coro)
    await asyncio.sleep(0)            # let it start
    print(inspect.getcoroutinestate(coro))  # CORO_SUSPENDED (at sleep)

    result = await task
    print(inspect.getcoroutinestate(coro))  # CORO_CLOSED
    print(result)                    # "done"

asyncio.run(inspect_lifecycle())

2.2.2 await — Suspension & Resumption
⏸️ How await suspends and resumes
await expr does three things:

1. Suspends the current coroutine (saves its state/frame).
2. Gives control back to the event loop — other coroutines can now run.
3. Resumes this coroutine once the awaited thing completes, with the result.

You can only await on awaitable objects: coroutines, Tasks, Futures, or objects implementing __await__.
pythonawait_mechanics.py
import asyncio

async def slow_db_query(query: str) -> list:
    print(f"  DB: executing '{query}'")
    await asyncio.sleep(0.2)   # simulates 200ms DB latency
    print(f"  DB: '{query}' done")
    return ["row1", "row2"]

async def handle_request_a():
    print("Request A: start")
    rows = await slow_db_query("SELECT * FROM users")
    # ↑ SUSPENDS here. Event loop runs request B while waiting.
    print(f"Request A: got {len(rows)} rows")

async def handle_request_b():
    print("Request B: start")
    rows = await slow_db_query("SELECT * FROM orders")
    print(f"Request B: got {len(rows)} rows")

async def main():
    # Both run CONCURRENTLY — total time ~0.2s, not 0.4s
    await asyncio.gather(
        handle_request_a(),
        handle_request_b(),
    )

asyncio.run(main())

# What you CANNOT await (runtime TypeError):
# await 42          → not awaitable
# await "hello"     → not awaitable
# await time.sleep  → not awaitable (use asyncio.sleep!)
FastAPI rule: Use async def for route handlers that do I/O (DB, HTTP, file). Use plain def for CPU-only handlers (FastAPI runs those in a thread pool automatically).
Topic 2 · Section 2.3
Tasks
Tasks are scheduled coroutines — they run concurrently in the background without you having to await them immediately. Mastering tasks lets you run multiple DB queries in parallel, fire background jobs, and implement timeouts in FastAPI routes.
2.3.1 Task Management
🚀 create_task — Fire and Concurrently Run
asyncio.create_task(coro) wraps a coroutine in a Task and schedules it to run on the event loop immediately — without waiting for it. This is how you achieve true concurrency in FastAPI: kick off multiple async operations simultaneously.
pythoncreate_task.py
import asyncio, time

async def fetch_user(uid: int):
    await asyncio.sleep(0.2)
    return {"id": uid, "name": "Alice"}

async def fetch_orders(uid: int):
    await asyncio.sleep(0.3)
    return ["order1", "order2"]

async def fetch_stats(uid: int):
    await asyncio.sleep(0.1)
    return {"logins": 42}

async def get_user_dashboard(uid: int):
    start = time.time()

    # ❌ Sequential — total time: 0.2 + 0.3 + 0.1 = 0.6s
    # user   = await fetch_user(uid)
    # orders = await fetch_orders(uid)
    # stats  = await fetch_stats(uid)

    # ✅ Parallel with create_task — total time: max(0.2, 0.3, 0.1) = 0.3s
    task_user   = asyncio.create_task(fetch_user(uid))
    task_orders = asyncio.create_task(fetch_orders(uid))
    task_stats  = asyncio.create_task(fetch_stats(uid))

    # Now await their results
    user   = await task_user
    orders = await task_orders
    stats  = await task_stats

    print(f"Done in {time.time()-start:.2f}s")  # ~0.3s
    return {"user": user, "orders": orders, "stats": stats}

asyncio.run(get_user_dashboard(1))
🎯 gather, wait & as_completed
Three patterns for running multiple coroutines concurrently — each with different trade-offs for when you need all results, partial results on failure, or streaming results as they arrive.
pythongather_wait_as_completed.py
import asyncio

async def work(name: str, delay: float):
    await asyncio.sleep(delay)
    return f"{name} done"

async def main():

    # ── gather() ─────────────────────────────────────────
    # Runs all, waits for ALL to finish, returns list in order.
    # If one raises, by default it cancels others.
    results = await asyncio.gather(
        work("A", 0.1),
        work("B", 0.3),
        work("C", 0.2),
        return_exceptions=True   # exceptions become values, not raised
    )
    print(results)  # ['A done', 'B done', 'C done'] ← in input order

    # ── wait() ───────────────────────────────────────────
    # More control: separate done from not-done sets.
    tasks = {asyncio.create_task(work(n, d)) for n, d in [("X",.1),("Y",.5)]}
    done, pending = await asyncio.wait(tasks, timeout=0.2)
    print(f"done: {len(done)}, pending: {len(pending)}")
    for t in pending:
        t.cancel()   # cancel tasks that didn't finish in time

    # ── as_completed() ───────────────────────────────────
    # Yields results as they finish (fastest first).
    # Great for: "show first result immediately, load rest after"
    coros = [work("fast", 0.1), work("slow", 0.5), work("mid", 0.3)]
    async for fut in asyncio.as_completed(coros):  # Python 3.13+ async for
        result = await fut
        print(f"Got: {result}")  # fast → mid → slow

asyncio.run(main())

2.3.2 Cancellation & Timeout Handling
🛑 Cancel Signals, Cleanup & Timeouts
Cancelling a task injects a CancelledError at the next await point. Tasks can catch it for cleanup. asyncio.timeout() (Python 3.11+) or asyncio.wait_for() add deadline enforcement — critical for preventing runaway requests in production FastAPI.
pythoncancellation_timeout.py
import asyncio

# ── Manual cancellation ──────────────────────────────────
async def long_running():
    try:
        print("working...")
        await asyncio.sleep(10)    # long operation
        print("done")
    except asyncio.CancelledError:
        print("cleanup: releasing resources")
        raise                        # ALWAYS re-raise CancelledError!

async def cancel_demo():
    task = asyncio.create_task(long_running())
    await asyncio.sleep(0.5)     # let it start
    task.cancel()                  # inject CancelledError
    try:
        await task
    except asyncio.CancelledError:
        print("task was cancelled")

# ── Timeout with wait_for (works in all Python 3.7+) ─────
async def timeout_demo():
    try:
        result = await asyncio.wait_for(
            long_running(),
            timeout=2.0             # raise TimeoutError after 2s
        )
    except asyncio.TimeoutError:
        print("Request timed out after 2s")

# ── asyncio.timeout context manager (Python 3.11+) ───────
async def timeout_context():
    try:
        async with asyncio.timeout(2.0):
            await asyncio.sleep(10)  # will be cancelled
    except TimeoutError:
        print("timed out!")

# ── FastAPI timeout pattern ───────────────────────────────
# @app.get("/data")
# async def get_data():
#     try:
#         result = await asyncio.wait_for(slow_service(), timeout=5.0)
#         return result
#     except asyncio.TimeoutError:
#         raise HTTPException(504, "Gateway Timeout")

asyncio.run(cancel_demo())
Topic 2 · Section 2.4
Concurrency Models
Not all work is equal. I/O-bound tasks (waiting for DB, HTTP, files) are handled beautifully by async. CPU-bound tasks (image processing, ML inference, heavy computation) need threads or processes — or they'll block your event loop and kill throughput.
2.4.1 I/O Bound — HTTP, DB calls, File operations
🌐 Async I/O — The Right Tool for FastAPI
I/O-bound operations spend most of their time waiting for an external system (database, HTTP server, file system). During that wait, your CPU is idle. async/await lets you use that idle time to serve other requests — this is the core value of FastAPI's async architecture.
❌ Sync / Blocking
  • Thread waits idle during I/O
  • Need 1 thread per concurrent request
  • 100 concurrent → 100 threads → high memory
  • Thread switching overhead
✅ Async / Non-blocking
  • Loop handles other requests during I/O wait
  • 1 thread → 10,000+ concurrent I/O ops
  • Low memory, no thread switching
  • Native FastAPI model — zero config
pythonasync_io_fastapi.py
import asyncio
import httpx                   # async HTTP client
import aiofiles               # async file I/O
from sqlalchemy.ext.asyncio import AsyncSession

# ── Async HTTP requests (non-blocking) ──
async def fetch_github_user(username: str) -> dict:
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"https://api.github.com/users/{username}")
        resp.raise_for_status()
        return resp.json()

# ── Async DB query (non-blocking) ──
async def get_users(db: AsyncSession) -> list:
    result = await db.execute(select(User))
    return result.scalars().all()

# ── Async file read (non-blocking) ──
async def read_config(path: str) -> str:
    async with aiofiles.open(path, 'r') as f:
        return await f.read()

# ── FastAPI: run all three concurrently ──
async def dashboard_endpoint(username: str, db: AsyncSession):
    gh_user, db_users, config = await asyncio.gather(
        fetch_github_user(username),  # async HTTP — non-blocking
        get_users(db),                # async DB   — non-blocking
        read_config("config.yaml"),  # async file — non-blocking
    )
    return {"github": gh_user, "users": db_users}

2.4.2 CPU Bound — ThreadPoolExecutor & ProcessPoolExecutor
⚙️ Offloading CPU Work Without Blocking the Loop
CPU-bound work (image resizing, password hashing, ML inference, data processing) runs Python bytecode and is bound by the GIL. Running it in an async def function blocks the event loop.

Solution: offload to a thread pool (for blocking I/O or light CPU) or a process pool (for heavy CPU that bypasses the GIL) using loop.run_in_executor().
🔧
FastAPI shortcut: If you define a route with def (not async def), FastAPI automatically runs it in a thread pool. Only do this for truly synchronous blocking code — don't use it as a lazy alternative to async.
pythoncpu_bound_offload.py
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import hashlib, time

# ── Synchronous CPU-heavy function ──
def hash_password(password: str) -> str:
    # bcrypt is CPU-intensive — takes ~100ms
    return hashlib.pbkdf2_hmac('sha256', password.encode(), b'salt', 100_000).hex()

def heavy_compute(n: int) -> int:
    return sum(i*i for i in range(n))

async def main():
    loop = asyncio.get_event_loop()

    # ── ThreadPoolExecutor ──────────────────────────────
    # Good for: blocking I/O (sync DB drivers), light CPU work
    # GIL is still held, but I/O ops release it
    with ThreadPoolExecutor(max_workers=4) as pool:
        hashed = await loop.run_in_executor(
            pool,
            hash_password,
            "mysecretpassword"
        )
        print(f"Hash: {hashed[:20]}...")

    # ── ProcessPoolExecutor ─────────────────────────────
    # Good for: heavy CPU (ML inference, image processing)
    # Spawns separate processes → bypasses GIL
    with ProcessPoolExecutor(max_workers=2) as pool:
        result = await loop.run_in_executor(
            pool,
            heavy_compute,
            10_000_000
        )
        print(f"Sum: {result}")

    # ── asyncio.to_thread (Python 3.9+, simpler syntax) ─
    hashed = await asyncio.to_thread(hash_password, "mypassword")
    print(f"to_thread hash: {hashed[:20]}...")

asyncio.run(main())

# ── FastAPI patterns ────────────────────────────────────
#
# Pattern 1: auto thread pool with plain def
# @app.post("/login")
# def login(data: LoginSchema):          ← FastAPI runs in thread pool
#     hashed = hash_password(data.password)  ← safe: not blocking event loop
#     ...
#
# Pattern 2: explicit offload in async def
# @app.post("/process-image")
# async def process_image(file: UploadFile):
#     data = await file.read()
#     result = await asyncio.to_thread(resize_image, data, (800, 600))
#     return {"size": len(result)}
⚠️
ProcessPoolExecutor caution: Each process has its own memory space — you can't share Python objects directly. Use it only for pure functions that take serializable input/output (numbers, bytes, strings). Heavy ML models → load once per process with an initializer.
🗺️ Decision Map — Which Concurrency Tool?
Concurrency Decision Tree for FastAPI Your task is... Waiting for DB / HTTP / file? └─▶ async def + await (async SQLAlchemy, httpx, aiofiles) Using a sync library that blocks (psycopg2, requests)? └─▶ asyncio.to_thread() or ThreadPoolExecutor Pure CPU work (hashing, image resize, <100ms)? └─▶ asyncio.to_thread() (simple, uses thread pool) Heavy CPU work (ML inference, video encode, >1s)? └─▶ ProcessPoolExecutor (bypasses GIL, separate process) Long-running background job (email, report)? └─▶ FastAPI BackgroundTasks (Topic 13) or Celery / ARQ (Topic 13.2) Plain sync code that is short / not I/O? └─▶ plain def handler (FastAPI auto-threads it)
Topic 2: Async Programming  ·  4 sections · 10 concepts · All subtopics covered
✅ Approved — Ready for Topic 3