Topic 15
Streaming 🌊
Instead of waiting for a full response to be ready before sending it, streaming lets your server push data to the client chunk by chunk — the moment each piece is ready. Essential for large file downloads, live logs, and AI token streaming from LLMs.
Three Streaming Patterns
🌊 StreamingResponse
General-purpose. Stream any bytes/text via a Python generator. Good for files, CSV exports, logs.
📡 SSE
Server-Sent Events — one-way server push over HTTP. Perfect for live dashboards, notifications, progress bars.
🤖 AI Streaming
Stream LLM tokens as they're generated. Users see words appear word-by-word instead of waiting seconds.
Normal Response Streaming Response
─────────────────────────────────────────────────────────
Client Server Client Server
│ │ │ │
│── GET /data ────────▶│ │── GET /stream ──────▶│
│ │ generating │ │ chunk 1 ready
│ │ ... │◀── "Hello " ─────────│
│ │ done │◀── "World" ──────────│
│◀── full response ───│ │◀── "!" ─────────────│
│ │ │◀── [close] ──────────│
User waits 3s then sees everything User sees output immediately
15.1
StreamingResponse
Generators
▼
A generator is a Python function that uses
yield instead of return. It produces values one at a time, pausing execution between each one. FastAPI's StreamingResponse takes a generator and sends each yielded chunk to the client immediately — no buffering.
How generators work (quick refresher)
Python — Generator Basics
# Normal function — returns everything at once def normal(): return ["chunk1", "chunk2", "chunk3"] # all in memory # Generator — yields one at a time, lazy def gen(): yield "chunk1" # pauses here, sends to caller yield "chunk2" # resumes, sends next yield "chunk3" # done # Generators are memory-efficient — great for large data for chunk in gen(): print(chunk) # chunk1, chunk2, chunk3
StreamingResponse with a sync generator
Python — FastAPI StreamingResponse
from fastapi import FastAPI from fastapi.responses import StreamingResponse import time app = FastAPI() # 1. Simple text streaming def text_generator(): words = ["Hello", " ", "from", " ", "FastAPI", "!"] for word in words: yield word time.sleep(0.3) # simulate delay between chunks @app.get("/stream/text") def stream_text(): return StreamingResponse( text_generator(), media_type="text/plain" ) # 2. Streaming a large CSV (memory efficient) def csv_generator(): yield "id,name,score\n" # header for i in range(1, 1000001): # 1 million rows! yield f"{i},user_{i},{i * 0.5}\n" # one row at a time @app.get("/export/csv") def export_csv(): return StreamingResponse( csv_generator(), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=export.csv"} ) # Without streaming: would load all 1M rows into RAM first! # With streaming: constant ~1KB memory usage regardless of size # 3. Streaming a file from disk def file_generator(path: str, chunk_size: int = 8192): with open(path, "rb") as f: while chunk := f.read(chunk_size): # walrus operator yield chunk @app.get("/download/video") def download_video(): return StreamingResponse( file_generator("/data/movie.mp4"), media_type="video/mp4" )
StreamingResponse signature:
StreamingResponse(content, status_code=200, headers=None, media_type=None). The content must be a generator (sync or async) that yields str or bytes.
Async Generators
▼
When your data source is itself async (database queries, HTTP API calls, file reads), use an async generator. It uses
async def + yield and lets you await inside. FastAPI handles both sync and async generators in StreamingResponse.
Async generator syntax
Python — Async Generator Pattern
import asyncio from fastapi import FastAPI from fastapi.responses import StreamingResponse app = FastAPI() # async generator — can use await inside async def async_text_generator(): words = ["Hello", " ", "async", " ", "world", "!"] for word in words: yield word await asyncio.sleep(0.2) # non-blocking sleep @app.get("/stream/async") async def stream_async(): return StreamingResponse( async_text_generator(), media_type="text/plain" ) # Real-world: streaming rows from a database async def stream_db_rows(db_session): # SQLAlchemy async streaming query result = await db_session.stream( select(User).order_by(User.id) ) async for row in result: # serialize each row to JSON line yield json.dumps(row._asdict()) + "\n" @app.get("/stream/users") async def stream_users(db: AsyncSession = Depends(get_db)): return StreamingResponse( stream_db_rows(db), media_type="application/x-ndjson" # newline-delimited JSON ) # Real-world: streaming an HTTP response from another service import httpx async def proxy_stream(url: str): async with httpx.AsyncClient() as client: async with client.stream("GET", url) as response: async for chunk in response.aiter_bytes(): yield chunk @app.get("/proxy/download") async def proxy_download(url: str): return StreamingResponse( proxy_stream(url), media_type="application/octet-stream" )
Sync vs Async generator — when to use which?
| Situation | Use | Why |
|---|---|---|
| Reading from disk (sync file IO) | Sync | Sync file ops are fast; use open() |
| CPU computation (no IO) | Sync | No async benefit here |
| Async DB queries | Async | Must await async DB calls |
| Calling external APIs / HTTP | Async | Use httpx.AsyncClient |
| LLM token streaming | Async | LLM SDKs provide async iterators |
FastAPI automatically runs sync generators in a thread pool so they don't block the event loop. But async generators are preferred for IO-heavy work.
15.2
Server-Sent Events (SSE)
SSE Event Format
▼
Server-Sent Events (SSE) is a simple W3C standard for one-way server-to-client streaming over regular HTTP. The browser has a built-in
EventSource API that handles it. No WebSocket needed — just a long-lived HTTP GET connection with a special content type.
SSE vs WebSocket — when to use SSE?
✅ Use SSE when
You only need server → client push. Live scores, progress bars, notifications, LLM token streaming, log tailing.
✅ Use WebSocket when
You need bidirectional communication. Chat apps, collaborative editing, multiplayer games.
The SSE wire format — what actually travels over the network:
SSE Wire Format (plain text)
# Each "event" has optional fields: id, event, data, retry # Fields are separated by newlines, events by blank lines # Simple data event data: Hello World # With event type and id id: 1 event: message data: {"user": "alice", "text": "hi!"} # Keep-alive comment (ignored by browser) : ping # Retry interval (browser reconnects after 5s if disconnected) retry: 5000 data: reconnect in 5s # Multi-line data (each line prefixed with "data:") data: line 1 data: line 2 data: line 3
Building SSE in FastAPI:
Python — SSE with StreamingResponse
from fastapi import FastAPI from fastapi.responses import StreamingResponse import asyncio, json app = FastAPI() # Helper to format SSE events def sse_event( data: str, event: str | None = None, id: str | None = None ) -> str: msg = "" if id: msg += f"id: {id}\n" if event: msg += f"event: {event}\n" msg += f"data: {data}\n\n" # blank line ends the event return msg # Simple number stream (1, 2, 3 ... every second) async def number_stream(): for i in range(1, 11): yield sse_event(data=str(i), event="count", id=str(i)) await asyncio.sleep(1) yield sse_event(data="done", event="done") # signal end @app.get("/sse/numbers") async def sse_numbers(): return StreamingResponse( number_stream(), media_type="text/event-stream", # REQUIRED for SSE headers={ "Cache-Control": "no-cache", # no caching "X-Accel-Buffering": "no", # disable nginx buffering } ) # JavaScript client (browser) """ const source = new EventSource("/sse/numbers"); source.addEventListener("count", (e) => { console.log("Count:", e.data); // 1, 2, 3... console.log("ID:", e.lastEventId); }); source.addEventListener("done", () => { console.log("Stream ended!"); source.close(); // close the connection }); source.onerror = (e) => console.error("SSE error", e); """
Always set
Cache-Control: no-cache and X-Accel-Buffering: no. Without these, proxies (nginx, CDNs) may buffer your stream and the client won't receive events until the buffer fills!
Reconnection
▼
The browser's built-in
EventSource automatically reconnects if the connection drops. When it reconnects, it sends the Last-Event-ID header so your server can resume from where it left off.
Implementing resumable SSE with Last-Event-ID:
Python — Resumable SSE
from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse import asyncio app = FastAPI() # Simulated event store (use Redis or DB in production) events = [ {"id": "1", "data": "Event 1"}, {"id": "2", "data": "Event 2"}, {"id": "3", "data": "Event 3"}, ] async def resumable_stream(last_id: str | None): # Figure out where to start based on last received ID start_index = 0 if last_id: for i, event in enumerate(events): if event["id"] == last_id: start_index = i + 1 # resume AFTER the last received break # Send missed events first (catch-up) for event in events[start_index:]: yield f"id: {event['id']}\ndata: {event['data']}\n\n" await asyncio.sleep(0.1) # Continue with live events... counter = len(events) + 1 while True: yield f"id: {counter}\ndata: Live event {counter}\n\n" counter += 1 await asyncio.sleep(2) @app.get("/sse/resumable") async def sse_resumable(request: Request): # Browser sends Last-Event-ID header on reconnect last_id = request.headers.get("Last-Event-ID") return StreamingResponse( resumable_stream(last_id), media_type="text/event-stream", headers={"Cache-Control": "no-cache"} )
Controlling reconnect timing with
retry:Python — Custom Retry Interval
async def stream_with_retry(): # Tell the browser to wait 3 seconds before reconnecting yield "retry: 3000\n\n" for i in range(5): yield f"id: {i}\ndata: Message {i}\n\n" await asyncio.sleep(1) # Default retry is 3 seconds if not specified
EventSource auto-reconnects on network errors but NOT when the server explicitly closes the connection with
event: close. Call source.close() in JavaScript to manually stop reconnection.
Keep Alive
▼
Long-lived HTTP connections are often killed by proxies and load balancers after ~60–90 seconds of inactivity. To keep the connection alive, send periodic comment lines (
: ping) — they're ignored by the browser but reset the idle timer.
Python — SSE with Keep-Alive Pings
import asyncio from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse app = FastAPI() async def stream_with_keepalive(request: Request, data_source): async def generate(): # Interleave data events and keep-alive pings ping_task = asyncio.create_task(send_pings()) data_task = asyncio.create_task(send_data(data_source)) queue = asyncio.Queue() async def send_pings(): while True: await asyncio.sleep(15) # every 15s await queue.put(": ping\n\n") # comment line async def send_data(source): async for item in source: await queue.put(f"data: {item}\n\n") await queue.put(None) # sentinel: done while True: if await request.is_disconnected(): # client gone? ping_task.cancel() break item = await queue.get() if item is None: # done signal ping_task.cancel() break yield item return StreamingResponse(generate(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # Simple version: just ping periodically async def simple_keepalive_stream(request: Request): counter = 0 while not await request.is_disconnected(): counter += 1 yield f"data: update {counter}\n\n" try: await asyncio.wait_for(asyncio.sleep(10), timeout=10) except asyncio.TimeoutError: yield ": ping\n\n" # keep alive comment @app.get("/sse/live") async def live_feed(request: Request): return StreamingResponse( simple_keepalive_stream(request), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"} )
Use
request.is_disconnected() to detect when the client closes the browser tab or navigates away. This prevents your generator from running forever in the background.SSE comment syntax for keep-alive:
SSE — Comment (Keep-Alive)
# Lines starting with ":" are comments — browser ignores them # but they keep the TCP connection alive through proxies : ping : heartbeat : keep-alive # Send one every 15-30 seconds
15.3
AI Streaming
Token Streaming
▼
LLMs generate text one token at a time. A token is roughly a word or part of a word. Without streaming, users wait 5–10 seconds staring at a blank screen. With token streaming, they see text appearing word-by-word — just like ChatGPT — which dramatically improves perceived responsiveness.
Without Streaming With Token Streaming
──────────────────────────────────────────────────────────
t=0s User: "Write a poem" t=0s User: "Write a poem"
t=0s [spinner spinning...] t=0.1s "Roses "
t=0s [spinner spinning...] t=0.2s "are "
t=0s [spinner spinning...] t=0.3s "red, "
t=4s Full poem appears! t=0.4s "violets "
t=0.5s "are "
t=0.6s "blue..."
User frustrated after 4s User engaged from first word
Understanding tokens:
Python — What are tokens?
# Roughly 1 token ≈ 0.75 words or 4 characters # "Hello, world!" → ["Hello", ",", " world", "!"] (4 tokens) # "FastAPI is awesome" → ["Fast", "API", " is", " awesome"] (4 tokens) # Python code is tokenized similarly # LLM generates tokens sequentially: # Token 1: "The" # Token 2: " quick" # Token 3: " brown" # Token 4: " fox" # ... # Each token takes ~50-200ms to generate # 100-token response = 5-20 seconds without streaming! # With streaming, user sees content after first token (50-200ms)
Basic token streaming with OpenAI:
Python — OpenAI Token Streaming
from fastapi import FastAPI from fastapi.responses import StreamingResponse from openai import AsyncOpenAI from pydantic import BaseModel app = FastAPI() client = AsyncOpenAI() # reads OPENAI_API_KEY from env class ChatRequest(BaseModel): message: str async def stream_openai_tokens(message: str): # stream=True enables token-by-token streaming stream = await client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": message}], stream=True, # ← this enables streaming max_tokens=500 ) # Iterate over streamed chunks async for chunk in stream: delta = chunk.choices[0].delta if delta.content: # some chunks have no content yield delta.content # yield each token @app.post("/ai/stream") async def ai_stream(req: ChatRequest): return StreamingResponse( stream_openai_tokens(req.message), media_type="text/plain" )
LLM Streaming (SSE Format)
▼
In production, token streaming is commonly delivered as SSE so the frontend can use
EventSource or fetch with streaming body. This is exactly how ChatGPT's frontend works. The server sends each token as an SSE event.
Python — LLM Streaming over SSE (Production Pattern)
from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from openai import AsyncOpenAI from pydantic import BaseModel import json app = FastAPI() client = AsyncOpenAI() class ChatRequest(BaseModel): messages: list[dict] # full conversation history model: str = "gpt-4o" async def llm_sse_stream(request: Request, chat_req: ChatRequest): stream = await client.chat.completions.create( model=chat_req.model, messages=chat_req.messages, stream=True ) async for chunk in stream: # Check if client disconnected if await request.is_disconnected(): await stream.close() # cancel LLM request break delta = chunk.choices[0].delta finish = chunk.choices[0].finish_reason if delta.content: # Send token as SSE event payload = json.dumps({"token": delta.content}) yield f"event: token\ndata: {payload}\n\n" if finish == "stop": # Send done event with usage info usage = chunk.usage done_data = json.dumps({ "finish_reason": "stop", "usage": usage.model_dump() if usage else {} }) yield f"event: done\ndata: {done_data}\n\n" elif finish == "length": yield 'event: error\ndata: {"error":"max_tokens reached"}\n\n' @app.post("/chat/stream") async def chat_stream(request: Request, chat_req: ChatRequest): return StreamingResponse( llm_sse_stream(request, chat_req), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"} ) # JavaScript client """ // Using fetch with readable stream (more control than EventSource) const response = await fetch("/chat/stream", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ messages: [{ role: "user", content: "Hello!" }] }) }); const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ""; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split("\n\n"); buffer = lines.pop(); // keep incomplete last line for (const chunk of lines) { if (chunk.startsWith("event: token")) { const dataLine = chunk.split("\n").find(l => l.startsWith("data:")); const { token } = JSON.parse(dataLine.slice(5)); document.getElementById("output").textContent += token; } } } """
Anthropic Claude streaming (alternative LLM):
Python — Anthropic Claude Streaming
import anthropic from fastapi.responses import StreamingResponse claude = anthropic.AsyncAnthropic() async def stream_claude(prompt: str): async with claude.messages.stream( model="claude-sonnet-4-6", max_tokens=1024, messages=[{"role": "user", "content": prompt}] ) as stream: async for text in stream.text_stream(): yield text # yields text tokens directly @app.get("/claude/stream") async def claude_stream(prompt: str): return StreamingResponse( stream_claude(prompt), media_type="text/plain" )
Chunk Handling
▼
Streaming responses don't always arrive as clean individual tokens. Network buffering, LLM SDK behavior, and proxy settings can cause chunks to be merged or split. Robust chunk handling means your code works correctly regardless of how bytes arrive.
Python — Robust Chunk Handling
from fastapi import FastAPI from fastapi.responses import StreamingResponse import json, asyncio app = FastAPI() # Pattern 1: Buffer until complete JSON objects async def json_lines_stream(raw_stream): """Handles partial JSON lines across chunks""" buffer = "" async for chunk in raw_stream: buffer += chunk # Process all complete lines while "\n" in buffer: line, buffer = buffer.split("\n", 1) line = line.strip() if line: try: data = json.loads(line) yield json.dumps(data) + "\n" except json.JSONDecodeError: pass # skip malformed lines # Pattern 2: Accumulate entire response for post-processing async def stream_with_accumulator(llm_client, prompt: str): full_response = "" token_count = 0 async for chunk in llm_client.stream(prompt): token = chunk.content if chunk.content else "" full_response += token token_count += 1 # Send progress update every 10 tokens if token_count % 10 == 0: progress = {"token": token, "total_so_far": token_count} yield f"data: {json.dumps(progress)}\n\n" else: yield f"data: {json.dumps({'token': token})}\n\n" # Final event with full response final = {"done": True, "full_text": full_response, "tokens": token_count} yield f"event: complete\ndata: {json.dumps(final)}\n\n" # Pattern 3: Chunked binary streaming with size control async def controlled_chunk_stream(data_source, chunk_size: int = 1024): """Ensures chunks are always exactly chunk_size bytes (except last)""" buffer = b"" async for piece in data_source: buffer += piece while len(buffer) >= chunk_size: yield buffer[:chunk_size] buffer = buffer[chunk_size:] if buffer: # yield remaining bytes yield buffer
Client-side buffering issue: Some browsers buffer SSE until they receive enough data or a newline. Always end SSE events with
\n\n (double newline). For plain streaming, flush explicitly if possible.Handling streaming errors gracefully:
Python — Error Handling in Streams
async def safe_stream(prompt: str): try: async for token in llm_generate(prompt): yield f"data: {json.dumps({'token': token})}\n\n" except RateLimitError: yield 'event: error\ndata: {"code":"rate_limit","retry":true}\n\n' except APITimeoutError: yield 'event: error\ndata: {"code":"timeout","retry":true}\n\n' except Exception as e: error = {"code": "unknown", "message": str(e)} yield f'event: error\ndata: {json.dumps(error)}\n\n' finally: yield 'event: close\ndata: {}\n\n' # always send close
Cancellation
▼
When a user clicks "Stop generating" or navigates away, you should cancel the LLM request immediately. Running LLM requests cost money and waste resources. FastAPI lets you detect disconnection via
request.is_disconnected() and cancel async tasks.
Python — Cancellation Patterns
from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from openai import AsyncOpenAI import asyncio, json app = FastAPI() client = AsyncOpenAI() # Pattern 1: Check disconnection on every chunk async def cancellable_stream(request: Request, prompt: str): stream = await client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], stream=True ) try: async for chunk in stream: # Check client disconnected before sending each token if await request.is_disconnected(): print("Client disconnected, stopping LLM") await stream.close() # cancel OpenAI streaming return # exit generator delta = chunk.choices[0].delta if delta.content: yield f"data: {json.dumps({'token': delta.content})}\n\n" except asyncio.CancelledError: await stream.close() # clean up if task is cancelled raise @app.post("/ai/generate") async def generate(request: Request, prompt: str): return StreamingResponse( cancellable_stream(request, prompt), media_type="text/event-stream", headers={"Cache-Control": "no-cache"} ) # Pattern 2: Using asyncio.Event for manual cancellation cancel_events: dict[str, asyncio.Event] = {} async def stream_with_cancel_token(generation_id: str, prompt: str): cancel = asyncio.Event() cancel_events[generation_id] = cancel stream = await client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], stream=True ) async for chunk in stream: if cancel.is_set(): # someone called /cancel/{id} await stream.close() yield 'event: cancelled\ndata: {}\n\n' return delta = chunk.choices[0].delta if delta.content: yield f"data: {json.dumps({'token': delta.content})}\n\n" del cancel_events[generation_id] # clean up @app.post("/ai/cancel/{generation_id}") async def cancel_generation(generation_id: str): if generation_id in cancel_events: cancel_events[generation_id].set() # signal cancellation return {"cancelled": True} return {"cancelled": False, "reason": "not found"}
JavaScript: implementing a Stop button
JavaScript — Stop Button Pattern
let abortController = null; async function startGeneration(prompt) { abortController = new AbortController(); // create abort signal const response = await fetch("/ai/generate", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ prompt }), signal: abortController.signal // attach to fetch }); // When user clicks Stop, abortController.abort() cancels the fetch // → browser closes the HTTP connection // → request.is_disconnected() returns True on server // → server cancels LLM stream const reader = response.body.getReader(); try { while (true) { const { done, value } = await reader.read(); if (done) break; // handle chunk... } } catch (e) { if (e.name === "AbortError") console.log("User stopped"); } } // Stop button document.getElementById("stop-btn").onclick = () => { if (abortController) abortController.abort(); };
When
fetch is aborted, the browser closes the TCP connection. FastAPI detects this via await request.is_disconnected() returning True. Always check this in long-running generators to save resources.Complete production AI streaming endpoint:
Python — Full Production AI Streaming
from fastapi import FastAPI, Request, Depends, HTTPException from fastapi.responses import StreamingResponse from openai import AsyncOpenAI from pydantic import BaseModel import asyncio, json, uuid app = FastAPI() client = AsyncOpenAI() class Message(BaseModel): role: str # "user" | "assistant" | "system" content: str class ChatRequest(BaseModel): messages: list[Message] model: str = "gpt-4o" max_tokens: int = 2048 temperature: float = 0.7 async def production_stream(request: Request, chat: ChatRequest): # Send generation ID so client can cancel gen_id = str(uuid.uuid4()) yield f"event: start\ndata: {json.dumps({'id': gen_id})}\n\n" stream = await client.chat.completions.create( model=chat.model, messages=[m.model_dump() for m in chat.messages], stream=True, max_tokens=chat.max_tokens, temperature=chat.temperature, ) full_text = "" try: async for chunk in stream: if await request.is_disconnected(): await stream.close() return delta = chunk.choices[0].delta finish = chunk.choices[0].finish_reason if delta.content: full_text += delta.content yield f"event: token\ndata: {json.dumps({'t': delta.content})}\n\n" if finish: final = {"finish_reason": finish, "chars": len(full_text)} yield f"event: done\ndata: {json.dumps(final)}\n\n" except Exception as e: err = {"error": str(e), "type": type(e).__name__} yield f"event: error\ndata: {json.dumps(err)}\n\n" @app.post("/v1/chat/stream") async def chat_completions_stream(request: Request, chat: ChatRequest): return StreamingResponse( production_stream(request, chat), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Access-Control-Allow-Origin": "*", } )
📌 Topic 15 Summary
| Technique | Use Case | media_type | Client API |
|---|---|---|---|
StreamingResponse + generator |
Files, CSV export, binary data | application/octet-stream |
fetch + getReader() |
| Async generator | DB row streaming, API proxying | application/x-ndjson |
fetch + getReader() |
| SSE (basic) | Live dashboards, notifications | text/event-stream |
EventSource |
| SSE (with reconnect) | Reliable live feeds | text/event-stream |
EventSource (auto-reconnects) |
| SSE (keep-alive) | Long-lived streams behind proxies | text/event-stream |
EventSource |
| LLM Token Streaming | ChatGPT-style AI apps | text/event-stream |
fetch + abort controller |