FastAPI Mastery
Topic 15 of 22 — Streaming
68.2% complete
Topic 15
Streaming 🌊
Instead of waiting for a full response to be ready before sending it, streaming lets your server push data to the client chunk by chunk — the moment each piece is ready. Essential for large file downloads, live logs, and AI token streaming from LLMs.
Three Streaming Patterns
🌊 StreamingResponse

General-purpose. Stream any bytes/text via a Python generator. Good for files, CSV exports, logs.

📡 SSE

Server-Sent Events — one-way server push over HTTP. Perfect for live dashboards, notifications, progress bars.

🤖 AI Streaming

Stream LLM tokens as they're generated. Users see words appear word-by-word instead of waiting seconds.

Normal Response Streaming Response ───────────────────────────────────────────────────────── Client Server Client Server │ │ │ │ │── GET /data ────────▶│ │── GET /stream ──────▶│ │ │ generating │ │ chunk 1 ready │ │ ... │◀── "Hello " ─────────│ │ │ done │◀── "World" ──────────│ │◀── full response ───│ │◀── "!" ─────────────│ │ │ │◀── [close] ──────────│ User waits 3s then sees everything User sees output immediately
15.1 StreamingResponse
⚙️
Generators
A generator is a Python function that uses yield instead of return. It produces values one at a time, pausing execution between each one. FastAPI's StreamingResponse takes a generator and sends each yielded chunk to the client immediately — no buffering.
How generators work (quick refresher)
Python — Generator Basics
# Normal function — returns everything at once
def normal():
    return ["chunk1", "chunk2", "chunk3"]  # all in memory

# Generator — yields one at a time, lazy
def gen():
    yield "chunk1"   # pauses here, sends to caller
    yield "chunk2"   # resumes, sends next
    yield "chunk3"   # done

# Generators are memory-efficient — great for large data
for chunk in gen():
    print(chunk)   # chunk1, chunk2, chunk3
StreamingResponse with a sync generator
Python — FastAPI StreamingResponse
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import time

app = FastAPI()

# 1. Simple text streaming
def text_generator():
    words = ["Hello", " ", "from", " ", "FastAPI", "!"]
    for word in words:
        yield word
        time.sleep(0.3)  # simulate delay between chunks

@app.get("/stream/text")
def stream_text():
    return StreamingResponse(
        text_generator(),
        media_type="text/plain"
    )

# 2. Streaming a large CSV (memory efficient)
def csv_generator():
    yield "id,name,score\n"             # header
    for i in range(1, 1000001):         # 1 million rows!
        yield f"{i},user_{i},{i * 0.5}\n"  # one row at a time

@app.get("/export/csv")
def export_csv():
    return StreamingResponse(
        csv_generator(),
        media_type="text/csv",
        headers={"Content-Disposition": "attachment; filename=export.csv"}
    )
# Without streaming: would load all 1M rows into RAM first!
# With streaming: constant ~1KB memory usage regardless of size

# 3. Streaming a file from disk
def file_generator(path: str, chunk_size: int = 8192):
    with open(path, "rb") as f:
        while chunk := f.read(chunk_size):  # walrus operator
            yield chunk

@app.get("/download/video")
def download_video():
    return StreamingResponse(
        file_generator("/data/movie.mp4"),
        media_type="video/mp4"
    )
💡
StreamingResponse signature: StreamingResponse(content, status_code=200, headers=None, media_type=None). The content must be a generator (sync or async) that yields str or bytes.
Async Generators
When your data source is itself async (database queries, HTTP API calls, file reads), use an async generator. It uses async def + yield and lets you await inside. FastAPI handles both sync and async generators in StreamingResponse.
Async generator syntax
Python — Async Generator Pattern
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

# async generator — can use await inside
async def async_text_generator():
    words = ["Hello", " ", "async", " ", "world", "!"]
    for word in words:
        yield word
        await asyncio.sleep(0.2)  # non-blocking sleep

@app.get("/stream/async")
async def stream_async():
    return StreamingResponse(
        async_text_generator(),
        media_type="text/plain"
    )

# Real-world: streaming rows from a database
async def stream_db_rows(db_session):
    # SQLAlchemy async streaming query
    result = await db_session.stream(
        select(User).order_by(User.id)
    )
    async for row in result:
        # serialize each row to JSON line
        yield json.dumps(row._asdict()) + "\n"

@app.get("/stream/users")
async def stream_users(db: AsyncSession = Depends(get_db)):
    return StreamingResponse(
        stream_db_rows(db),
        media_type="application/x-ndjson"  # newline-delimited JSON
    )

# Real-world: streaming an HTTP response from another service
import httpx

async def proxy_stream(url: str):
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url) as response:
            async for chunk in response.aiter_bytes():
                yield chunk

@app.get("/proxy/download")
async def proxy_download(url: str):
    return StreamingResponse(
        proxy_stream(url),
        media_type="application/octet-stream"
    )
Sync vs Async generator — when to use which?
SituationUseWhy
Reading from disk (sync file IO)SyncSync file ops are fast; use open()
CPU computation (no IO)SyncNo async benefit here
Async DB queriesAsyncMust await async DB calls
Calling external APIs / HTTPAsyncUse httpx.AsyncClient
LLM token streamingAsyncLLM SDKs provide async iterators
FastAPI automatically runs sync generators in a thread pool so they don't block the event loop. But async generators are preferred for IO-heavy work.
15.2 Server-Sent Events (SSE)
📋
SSE Event Format
Server-Sent Events (SSE) is a simple W3C standard for one-way server-to-client streaming over regular HTTP. The browser has a built-in EventSource API that handles it. No WebSocket needed — just a long-lived HTTP GET connection with a special content type.
SSE vs WebSocket — when to use SSE?
✅ Use SSE when

You only need server → client push. Live scores, progress bars, notifications, LLM token streaming, log tailing.

✅ Use WebSocket when

You need bidirectional communication. Chat apps, collaborative editing, multiplayer games.

The SSE wire format — what actually travels over the network:
SSE Wire Format (plain text)
# Each "event" has optional fields: id, event, data, retry
# Fields are separated by newlines, events by blank lines

# Simple data event
data: Hello World

# With event type and id
id: 1
event: message
data: {"user": "alice", "text": "hi!"}

# Keep-alive comment (ignored by browser)
: ping

# Retry interval (browser reconnects after 5s if disconnected)
retry: 5000
data: reconnect in 5s

# Multi-line data (each line prefixed with "data:")
data: line 1
data: line 2
data: line 3
Building SSE in FastAPI:
Python — SSE with StreamingResponse
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio, json

app = FastAPI()

# Helper to format SSE events
def sse_event(
    data: str,
    event: str | None = None,
    id: str | None = None
) -> str:
    msg = ""
    if id:
        msg += f"id: {id}\n"
    if event:
        msg += f"event: {event}\n"
    msg += f"data: {data}\n\n"  # blank line ends the event
    return msg

# Simple number stream (1, 2, 3 ... every second)
async def number_stream():
    for i in range(1, 11):
        yield sse_event(data=str(i), event="count", id=str(i))
        await asyncio.sleep(1)
    yield sse_event(data="done", event="done")  # signal end

@app.get("/sse/numbers")
async def sse_numbers():
    return StreamingResponse(
        number_stream(),
        media_type="text/event-stream",        # REQUIRED for SSE
        headers={
            "Cache-Control": "no-cache",           # no caching
            "X-Accel-Buffering": "no",             # disable nginx buffering
        }
    )

# JavaScript client (browser)
"""
const source = new EventSource("/sse/numbers");

source.addEventListener("count", (e) => {
    console.log("Count:", e.data);  // 1, 2, 3...
    console.log("ID:", e.lastEventId);
});

source.addEventListener("done", () => {
    console.log("Stream ended!");
    source.close();  // close the connection
});

source.onerror = (e) => console.error("SSE error", e);
"""
⚠️
Always set Cache-Control: no-cache and X-Accel-Buffering: no. Without these, proxies (nginx, CDNs) may buffer your stream and the client won't receive events until the buffer fills!
🔄
Reconnection
The browser's built-in EventSource automatically reconnects if the connection drops. When it reconnects, it sends the Last-Event-ID header so your server can resume from where it left off.
Implementing resumable SSE with Last-Event-ID:
Python — Resumable SSE
from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse import asyncio app = FastAPI() # Simulated event store (use Redis or DB in production) events = [ {"id": "1", "data": "Event 1"}, {"id": "2", "data": "Event 2"}, {"id": "3", "data": "Event 3"}, ] async def resumable_stream(last_id: str | None): # Figure out where to start based on last received ID start_index = 0 if last_id: for i, event in enumerate(events): if event["id"] == last_id: start_index = i + 1 # resume AFTER the last received break # Send missed events first (catch-up) for event in events[start_index:]: yield f"id: {event['id']}\ndata: {event['data']}\n\n" await asyncio.sleep(0.1) # Continue with live events... counter = len(events) + 1 while True: yield f"id: {counter}\ndata: Live event {counter}\n\n" counter += 1 await asyncio.sleep(2) @app.get("/sse/resumable") async def sse_resumable(request: Request): # Browser sends Last-Event-ID header on reconnect last_id = request.headers.get("Last-Event-ID") return StreamingResponse( resumable_stream(last_id), media_type="text/event-stream", headers={"Cache-Control": "no-cache"} )
Controlling reconnect timing with retry:
Python — Custom Retry Interval
async def stream_with_retry():
    # Tell the browser to wait 3 seconds before reconnecting
    yield "retry: 3000\n\n"

    for i in range(5):
        yield f"id: {i}\ndata: Message {i}\n\n"
        await asyncio.sleep(1)

# Default retry is 3 seconds if not specified
💡
EventSource auto-reconnects on network errors but NOT when the server explicitly closes the connection with event: close. Call source.close() in JavaScript to manually stop reconnection.
💓
Keep Alive
Long-lived HTTP connections are often killed by proxies and load balancers after ~60–90 seconds of inactivity. To keep the connection alive, send periodic comment lines (: ping) — they're ignored by the browser but reset the idle timer.
Python — SSE with Keep-Alive Pings
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

async def stream_with_keepalive(request: Request, data_source):
    async def generate():
        # Interleave data events and keep-alive pings
        ping_task = asyncio.create_task(send_pings())
        data_task = asyncio.create_task(send_data(data_source))

        queue = asyncio.Queue()

        async def send_pings():
            while True:
                await asyncio.sleep(15)                   # every 15s
                await queue.put(": ping\n\n")           # comment line

        async def send_data(source):
            async for item in source:
                await queue.put(f"data: {item}\n\n")
            await queue.put(None)                         # sentinel: done

        while True:
            if await request.is_disconnected():           # client gone?
                ping_task.cancel()
                break
            item = await queue.get()
            if item is None:                            # done signal
                ping_task.cancel()
                break
            yield item

    return StreamingResponse(generate(), media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})

# Simple version: just ping periodically
async def simple_keepalive_stream(request: Request):
    counter = 0
    while not await request.is_disconnected():
        counter += 1
        yield f"data: update {counter}\n\n"
        try:
            await asyncio.wait_for(asyncio.sleep(10), timeout=10)
        except asyncio.TimeoutError:
            yield ": ping\n\n"   # keep alive comment

@app.get("/sse/live")
async def live_feed(request: Request):
    return StreamingResponse(
        simple_keepalive_stream(request),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
    )
Use request.is_disconnected() to detect when the client closes the browser tab or navigates away. This prevents your generator from running forever in the background.
SSE comment syntax for keep-alive:
SSE — Comment (Keep-Alive)
# Lines starting with ":" are comments — browser ignores them
# but they keep the TCP connection alive through proxies
: ping

: heartbeat

: keep-alive

# Send one every 15-30 seconds
15.3 AI Streaming
🔤
Token Streaming
LLMs generate text one token at a time. A token is roughly a word or part of a word. Without streaming, users wait 5–10 seconds staring at a blank screen. With token streaming, they see text appearing word-by-word — just like ChatGPT — which dramatically improves perceived responsiveness.
Without Streaming With Token Streaming ────────────────────────────────────────────────────────── t=0s User: "Write a poem" t=0s User: "Write a poem" t=0s [spinner spinning...] t=0.1s "Roses " t=0s [spinner spinning...] t=0.2s "are " t=0s [spinner spinning...] t=0.3s "red, " t=4s Full poem appears! t=0.4s "violets " t=0.5s "are " t=0.6s "blue..." User frustrated after 4s User engaged from first word
Understanding tokens:
Python — What are tokens?
# Roughly 1 token ≈ 0.75 words or 4 characters # "Hello, world!" → ["Hello", ",", " world", "!"] (4 tokens) # "FastAPI is awesome" → ["Fast", "API", " is", " awesome"] (4 tokens) # Python code is tokenized similarly # LLM generates tokens sequentially: # Token 1: "The" # Token 2: " quick" # Token 3: " brown" # Token 4: " fox" # ... # Each token takes ~50-200ms to generate # 100-token response = 5-20 seconds without streaming! # With streaming, user sees content after first token (50-200ms)
Basic token streaming with OpenAI:
Python — OpenAI Token Streaming
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
from pydantic import BaseModel

app = FastAPI()
client = AsyncOpenAI()  # reads OPENAI_API_KEY from env

class ChatRequest(BaseModel):
    message: str

async def stream_openai_tokens(message: str):
    # stream=True enables token-by-token streaming
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": message}],
        stream=True,      # ← this enables streaming
        max_tokens=500
    )
    # Iterate over streamed chunks
    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:           # some chunks have no content
            yield delta.content     # yield each token

@app.post("/ai/stream")
async def ai_stream(req: ChatRequest):
    return StreamingResponse(
        stream_openai_tokens(req.message),
        media_type="text/plain"
    )
🤖
LLM Streaming (SSE Format)
In production, token streaming is commonly delivered as SSE so the frontend can use EventSource or fetch with streaming body. This is exactly how ChatGPT's frontend works. The server sends each token as an SSE event.
Python — LLM Streaming over SSE (Production Pattern)
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
from pydantic import BaseModel
import json

app = FastAPI()
client = AsyncOpenAI()

class ChatRequest(BaseModel):
    messages: list[dict]   # full conversation history
    model: str = "gpt-4o"

async def llm_sse_stream(request: Request, chat_req: ChatRequest):
    stream = await client.chat.completions.create(
        model=chat_req.model,
        messages=chat_req.messages,
        stream=True
    )
    async for chunk in stream:
        # Check if client disconnected
        if await request.is_disconnected():
            await stream.close()      # cancel LLM request
            break

        delta = chunk.choices[0].delta
        finish = chunk.choices[0].finish_reason

        if delta.content:
            # Send token as SSE event
            payload = json.dumps({"token": delta.content})
            yield f"event: token\ndata: {payload}\n\n"

        if finish == "stop":
            # Send done event with usage info
            usage = chunk.usage
            done_data = json.dumps({
                "finish_reason": "stop",
                "usage": usage.model_dump() if usage else {}
            })
            yield f"event: done\ndata: {done_data}\n\n"
        elif finish == "length":
            yield 'event: error\ndata: {"error":"max_tokens reached"}\n\n'

@app.post("/chat/stream")
async def chat_stream(request: Request, chat_req: ChatRequest):
    return StreamingResponse(
        llm_sse_stream(request, chat_req),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
    )

# JavaScript client
"""
// Using fetch with readable stream (more control than EventSource)
const response = await fetch("/chat/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ messages: [{ role: "user", content: "Hello!" }] })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    
    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split("\n\n");
    buffer = lines.pop();  // keep incomplete last line
    
    for (const chunk of lines) {
        if (chunk.startsWith("event: token")) {
            const dataLine = chunk.split("\n").find(l => l.startsWith("data:"));
            const { token } = JSON.parse(dataLine.slice(5));
            document.getElementById("output").textContent += token;
        }
    }
}
"""
Anthropic Claude streaming (alternative LLM):
Python — Anthropic Claude Streaming
import anthropic from fastapi.responses import StreamingResponse claude = anthropic.AsyncAnthropic() async def stream_claude(prompt: str): async with claude.messages.stream( model="claude-sonnet-4-6", max_tokens=1024, messages=[{"role": "user", "content": prompt}] ) as stream: async for text in stream.text_stream(): yield text # yields text tokens directly @app.get("/claude/stream") async def claude_stream(prompt: str): return StreamingResponse( stream_claude(prompt), media_type="text/plain" )
📦
Chunk Handling
Streaming responses don't always arrive as clean individual tokens. Network buffering, LLM SDK behavior, and proxy settings can cause chunks to be merged or split. Robust chunk handling means your code works correctly regardless of how bytes arrive.
Python — Robust Chunk Handling
from fastapi import FastAPI from fastapi.responses import StreamingResponse import json, asyncio app = FastAPI() # Pattern 1: Buffer until complete JSON objects async def json_lines_stream(raw_stream): """Handles partial JSON lines across chunks""" buffer = "" async for chunk in raw_stream: buffer += chunk # Process all complete lines while "\n" in buffer: line, buffer = buffer.split("\n", 1) line = line.strip() if line: try: data = json.loads(line) yield json.dumps(data) + "\n" except json.JSONDecodeError: pass # skip malformed lines # Pattern 2: Accumulate entire response for post-processing async def stream_with_accumulator(llm_client, prompt: str): full_response = "" token_count = 0 async for chunk in llm_client.stream(prompt): token = chunk.content if chunk.content else "" full_response += token token_count += 1 # Send progress update every 10 tokens if token_count % 10 == 0: progress = {"token": token, "total_so_far": token_count} yield f"data: {json.dumps(progress)}\n\n" else: yield f"data: {json.dumps({'token': token})}\n\n" # Final event with full response final = {"done": True, "full_text": full_response, "tokens": token_count} yield f"event: complete\ndata: {json.dumps(final)}\n\n" # Pattern 3: Chunked binary streaming with size control async def controlled_chunk_stream(data_source, chunk_size: int = 1024): """Ensures chunks are always exactly chunk_size bytes (except last)""" buffer = b"" async for piece in data_source: buffer += piece while len(buffer) >= chunk_size: yield buffer[:chunk_size] buffer = buffer[chunk_size:] if buffer: # yield remaining bytes yield buffer
⚠️
Client-side buffering issue: Some browsers buffer SSE until they receive enough data or a newline. Always end SSE events with \n\n (double newline). For plain streaming, flush explicitly if possible.
Handling streaming errors gracefully:
Python — Error Handling in Streams
async def safe_stream(prompt: str): try: async for token in llm_generate(prompt): yield f"data: {json.dumps({'token': token})}\n\n" except RateLimitError: yield 'event: error\ndata: {"code":"rate_limit","retry":true}\n\n' except APITimeoutError: yield 'event: error\ndata: {"code":"timeout","retry":true}\n\n' except Exception as e: error = {"code": "unknown", "message": str(e)} yield f'event: error\ndata: {json.dumps(error)}\n\n' finally: yield 'event: close\ndata: {}\n\n' # always send close
Cancellation
When a user clicks "Stop generating" or navigates away, you should cancel the LLM request immediately. Running LLM requests cost money and waste resources. FastAPI lets you detect disconnection via request.is_disconnected() and cancel async tasks.
Python — Cancellation Patterns
from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse from openai import AsyncOpenAI import asyncio, json app = FastAPI() client = AsyncOpenAI() # Pattern 1: Check disconnection on every chunk async def cancellable_stream(request: Request, prompt: str): stream = await client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], stream=True ) try: async for chunk in stream: # Check client disconnected before sending each token if await request.is_disconnected(): print("Client disconnected, stopping LLM") await stream.close() # cancel OpenAI streaming return # exit generator delta = chunk.choices[0].delta if delta.content: yield f"data: {json.dumps({'token': delta.content})}\n\n" except asyncio.CancelledError: await stream.close() # clean up if task is cancelled raise @app.post("/ai/generate") async def generate(request: Request, prompt: str): return StreamingResponse( cancellable_stream(request, prompt), media_type="text/event-stream", headers={"Cache-Control": "no-cache"} ) # Pattern 2: Using asyncio.Event for manual cancellation cancel_events: dict[str, asyncio.Event] = {} async def stream_with_cancel_token(generation_id: str, prompt: str): cancel = asyncio.Event() cancel_events[generation_id] = cancel stream = await client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], stream=True ) async for chunk in stream: if cancel.is_set(): # someone called /cancel/{id} await stream.close() yield 'event: cancelled\ndata: {}\n\n' return delta = chunk.choices[0].delta if delta.content: yield f"data: {json.dumps({'token': delta.content})}\n\n" del cancel_events[generation_id] # clean up @app.post("/ai/cancel/{generation_id}") async def cancel_generation(generation_id: str): if generation_id in cancel_events: cancel_events[generation_id].set() # signal cancellation return {"cancelled": True} return {"cancelled": False, "reason": "not found"}
JavaScript: implementing a Stop button
JavaScript — Stop Button Pattern
let abortController = null;

async function startGeneration(prompt) {
    abortController = new AbortController();  // create abort signal

    const response = await fetch("/ai/generate", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ prompt }),
        signal: abortController.signal  // attach to fetch
    });

    // When user clicks Stop, abortController.abort() cancels the fetch
    // → browser closes the HTTP connection
    // → request.is_disconnected() returns True on server
    // → server cancels LLM stream
    const reader = response.body.getReader();
    try {
        while (true) {
            const { done, value } = await reader.read();
            if (done) break;
            // handle chunk...
        }
    } catch (e) {
        if (e.name === "AbortError") console.log("User stopped");
    }
}

// Stop button
document.getElementById("stop-btn").onclick = () => {
    if (abortController) abortController.abort();
};
💡
When fetch is aborted, the browser closes the TCP connection. FastAPI detects this via await request.is_disconnected() returning True. Always check this in long-running generators to save resources.
Complete production AI streaming endpoint:
Python — Full Production AI Streaming
from fastapi import FastAPI, Request, Depends, HTTPException from fastapi.responses import StreamingResponse from openai import AsyncOpenAI from pydantic import BaseModel import asyncio, json, uuid app = FastAPI() client = AsyncOpenAI() class Message(BaseModel): role: str # "user" | "assistant" | "system" content: str class ChatRequest(BaseModel): messages: list[Message] model: str = "gpt-4o" max_tokens: int = 2048 temperature: float = 0.7 async def production_stream(request: Request, chat: ChatRequest): # Send generation ID so client can cancel gen_id = str(uuid.uuid4()) yield f"event: start\ndata: {json.dumps({'id': gen_id})}\n\n" stream = await client.chat.completions.create( model=chat.model, messages=[m.model_dump() for m in chat.messages], stream=True, max_tokens=chat.max_tokens, temperature=chat.temperature, ) full_text = "" try: async for chunk in stream: if await request.is_disconnected(): await stream.close() return delta = chunk.choices[0].delta finish = chunk.choices[0].finish_reason if delta.content: full_text += delta.content yield f"event: token\ndata: {json.dumps({'t': delta.content})}\n\n" if finish: final = {"finish_reason": finish, "chars": len(full_text)} yield f"event: done\ndata: {json.dumps(final)}\n\n" except Exception as e: err = {"error": str(e), "type": type(e).__name__} yield f"event: error\ndata: {json.dumps(err)}\n\n" @app.post("/v1/chat/stream") async def chat_completions_stream(request: Request, chat: ChatRequest): return StreamingResponse( production_stream(request, chat), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Access-Control-Allow-Origin": "*", } )
📌 Topic 15 Summary
TechniqueUse Casemedia_typeClient API
StreamingResponse + generator Files, CSV export, binary data application/octet-stream fetch + getReader()
Async generator DB row streaming, API proxying application/x-ndjson fetch + getReader()
SSE (basic) Live dashboards, notifications text/event-stream EventSource
SSE (with reconnect) Reliable live feeds text/event-stream EventSource (auto-reconnects)
SSE (keep-alive) Long-lived streams behind proxies text/event-stream EventSource
LLM Token Streaming ChatGPT-style AI apps text/event-stream fetch + abort controller