A chat endpoint accepts a conversation history (a list of messages with roles like user, assistant, system) and returns the next assistant reply. This mirrors how OpenAI, Anthropic, and other LLM providers work under the hood.
sends messages[]
/chat endpoint
OpenAI / Claude
formats response
gets reply
from fastapi import FastAPI from pydantic import BaseModel from typing import Literal import openai app = FastAPI() client = openai.AsyncOpenAI() # uses OPENAI_API_KEY env var # ââ Pydantic schemas ââââââââââââââââââââââââââââââââââââââ class Message(BaseModel): role: Literal["user", "assistant", "system"] content: str class ChatRequest(BaseModel): messages: list[Message] model: str = "gpt-4o-mini" temperature: float = 0.7 max_tokens: int = 1024 class ChatResponse(BaseModel): reply: str model: str usage: dict # ââ Endpoint ââââââââââââââââââââââââââââââââââââââââââââââ @app.post("/chat", response_model=ChatResponse) async def chat(body: ChatRequest): response = await client.chat.completions.create( model=body.model, messages=[m.model_dump() for m in body.messages], temperature=body.temperature, max_tokens=body.max_tokens, ) return ChatResponse( reply=response.choices[0].message.content, model=response.model, usage=dict(response.usage) )
A completion endpoint takes a plain text prompt and returns generated text â no conversation history needed. Great for one-shot tasks: summarization, classification, extraction, code generation. Think of it as "prompt in â answer out".
from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field from enum import Enum class TaskType(str, Enum): summarize = "summarize" classify = "classify" extract = "extract" translate = "translate" class CompletionRequest(BaseModel): prompt: str = Field(..., min_length=1, max_length=10000) task: TaskType = TaskType.summarize target_language: str | None = None # for translate task # System prompts per task â defined server-side, not by client! SYSTEM_PROMPTS = { TaskType.summarize: "Summarize the given text concisely in 2-3 sentences.", TaskType.classify: "Classify the sentiment as POSITIVE, NEGATIVE, or NEUTRAL.", TaskType.extract: "Extract key facts as a JSON list of {fact, source} objects.", TaskType.translate: "Translate the text to {language}. Output only the translation.", } @app.post("/complete") async def complete(body: CompletionRequest): system = SYSTEM_PROMPTS[body.task] if body.task == TaskType.translate: if not body.target_language: raise HTTPException(400, "target_language is required for translate") system = system.format(language=body.target_language) response = await client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": system}, {"role": "user", "content": body.prompt}, ] ) return {"result": response.choices[0].message.content}
task enum values to pre-defined prompts, you get consistent, safe behavior. Your API is the single source of truth for how the LLM behaves.Embeddings convert text into a fixed-size vector of numbers (e.g. 1536 floats for text-embedding-3-small). Semantically similar texts have vectors that are close together. Embeddings power search, recommendations, clustering, and RAG.
from fastapi import FastAPI from pydantic import BaseModel import openai, asyncio class EmbedRequest(BaseModel): texts: list[str] # send one or many at once model: str = "text-embedding-3-small" class EmbedResponse(BaseModel): embeddings: list[list[float]] # one vector per input text dimensions: int model: str @app.post("/embed", response_model=EmbedResponse) async def embed(body: EmbedRequest): # OpenAI accepts up to 2048 texts in one call response = await client.embeddings.create( model=body.model, input=body.texts, ) vectors = [item.embedding for item in response.data] return EmbedResponse( embeddings=vectors, dimensions=len(vectors[0]), model=response.model, ) # Usage â check similarity between two texts import numpy as np def cosine_similarity(a: list[float], b: list[float]) -> float: a, b = np.array(a), np.array(b) return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
| Model | Dimensions | Use Case |
|---|---|---|
text-embedding-3-small | 1536 | Fast, cheap, great for most tasks |
text-embedding-3-large | 3072 | Higher accuracy tasks |
nomic-embed-text (local) | 768 | Free, runs on your own server |
Without streaming, users wait for the entire LLM response before seeing anything â often 5â30 seconds. With token streaming, the LLM sends each word as it's generated. Users see output appear in real time, dramatically improving perceived responsiveness.
The LLM API sends chunks via a chunked HTTP response. FastAPI uses StreamingResponse with an async generator to forward those chunks immediately to the client.
â
from fastapi import FastAPI from fastapi.responses import StreamingResponse import openai, json @app.post("/chat/stream") async def chat_stream(body: ChatRequest): # Async generator: yields each chunk as it arrives from OpenAI async def generate(): stream = await client.chat.completions.create( model=body.model, messages=[m.model_dump() for m in body.messages], stream=True, # â this enables streaming! ) async for chunk in stream: delta = chunk.choices[0].delta.content if delta is not None: # Send each token as a JSON line (NDJSON format) yield json.dumps({"token": delta}) + "\n" # Signal stream end yield json.dumps({"done": True}) + "\n" return StreamingResponse( generate(), media_type="application/x-ndjson" # newline-delimited JSON ) # ââ JavaScript client-side consumption âââââââââââââââââââ # const response = await fetch("/chat/stream", { method: "POST", body: ... }) # const reader = response.body.getReader(); # const decoder = new TextDecoder(); # while (true) { # const { done, value } = await reader.read(); # if (done) break; # const lines = decoder.decode(value).split("\n").filter(Boolean); # for (const line of lines) { # const { token } = JSON.parse(line); # if (token) appendToUI(token); # } # }
Server-Sent Events (SSE) is a browser-native streaming format. The client uses the EventSource API and the server sends data: ...\n\n formatted messages. SSE is the format used by OpenAI's own API and most chat interfaces (ChatGPT, Claude.ai).
fetch() (full control, works with POST). SSE works with EventSource (browser-native reconnect, GET only). For AI chat, NDJSON + fetch is usually better because you need to send POST bodies with messages.from fastapi import FastAPI, Request from fastapi.responses import StreamingResponse import asyncio async def sse_event(data: str, event: str = None) -> str: """Format a Server-Sent Event string.""" lines = [] if event: lines.append(f"event: {event}") lines.append(f"data: {data}") lines.append("") # blank line terminates the event return "\n".join(lines) + "\n" @app.get("/chat/sse") async def chat_sse(prompt: str, request: Request): async def generate(): stream = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], stream=True, ) async for chunk in stream: # Check if client disconnected if await request.is_disconnected(): break delta = chunk.choices[0].delta.content if delta: yield await sse_event( data=json.dumps({"token": delta}), event="token" ) yield await sse_event(data="[DONE]", event="done") return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", # disable nginx buffering! } ) # Client: const source = new EventSource("/chat/sse?prompt=Hello"); # source.addEventListener("token", e => appendText(JSON.parse(e.data).token)); # source.addEventListener("done", () => source.close());
X-Accel-Buffering: no to response headers. Without it, Nginx will buffer the entire response and only send it when the stream is complete â completely defeating the purpose of streaming!For interactive AI applications (voice interfaces, real-time collaboration, multi-turn chat with interruption), WebSockets are the best choice. Unlike SSE, WebSockets are bidirectional â the client can send new messages while the AI is still responding.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect import json @app.websocket("/ws/chat") async def ws_chat(websocket: WebSocket): await websocket.accept() # Keep track of conversation history history: list[dict] = [] try: while True: # 1. Receive user message data = await websocket.receive_text() user_msg = json.loads(data)["message"] history.append({"role": "user", "content": user_msg}) # 2. Stream LLM response back token by token full_response = "" stream = await client.chat.completions.create( model="gpt-4o-mini", messages=history, stream=True ) async for chunk in stream: delta = chunk.choices[0].delta.content if delta: full_response += delta await websocket.send_json({ "type": "token", "content": delta }) # 3. Signal end; add assistant reply to history await websocket.send_json({"type": "done"}) history.append({"role": "assistant", "content": full_response}) except WebSocketDisconnect: pass # Client disconnected cleanly
| Method | Direction | Best For | Client API |
|---|---|---|---|
| NDJSON via fetch | Server â Client | Most AI chat UIs | fetch() + ReadableStream |
| SSE (EventSource) | Server â Client | Dashboard feeds, GET streams | new EventSource(url) |
| WebSocket | Bidirectional | Voice, real-time collab | new WebSocket(url) |
Tool calling (also called "function calling") lets the LLM request execution of real functions. You define tools with JSON schemas, the model decides when to use them, and your FastAPI backend runs the actual code and returns the result. This is how AI agents take actions in the real world.
from fastapi import FastAPI from pydantic import BaseModel import openai, json # ââ Define tools as JSON schemas âââââââââââââââââââââââââââ TOOLS = [ { "type": "function", "function": { "name": "get_weather", "description": "Get current weather for a city", "parameters": { "type": "object", "properties": { "city": {"type": "string", "description": "City name"}, "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]} }, "required": ["city"] } } } ] # ââ Actual tool implementations ââââââââââââââââââââââââââââ async def get_weather(city: str, unit: str = "celsius") -> dict: # In production: call a real weather API return {"city": city, "temp": 22, "unit": unit, "condition": "sunny"} TOOL_MAP = {"get_weather": get_weather} # ââ Agent loop âââââââââââââââââââââââââââââââââââââââââââââ @app.post("/agent") async def agent(body: ChatRequest): messages = [m.model_dump() for m in body.messages] while True: # loop until LLM gives final answer (no more tool calls) response = await client.chat.completions.create( model="gpt-4o", messages=messages, tools=TOOLS, tool_choice="auto" ) msg = response.choices[0].message # No tool calls â final answer, return it if not msg.tool_calls: return {"reply": msg.content} # Append assistant's tool-call message to history messages.append(msg) # Execute each requested tool and append results for tc in msg.tool_calls: fn_name = tc.function.name fn_args = json.loads(tc.function.arguments) result = await TOOL_MAP[fn_name](**fn_args) messages.append({ "role": "tool", "tool_call_id": tc.id, "content": json.dumps(result) })
Long-running agents need persistent state. The agent may need to pause, resume, or be queried mid-task. Agent State APIs give clients visibility and control over what the agent is doing, and persist conversation + tool history across multiple API calls.
from fastapi import FastAPI, HTTPException from pydantic import BaseModel from enum import Enum import uuid, redis.asyncio as redis, json class AgentStatus(str, Enum): idle = "idle" running = "running" waiting = "waiting" # waiting for human input done = "done" error = "error" class AgentSession(BaseModel): session_id: str status: AgentStatus messages: list[dict] # full history current_task: str | None = None redis_client = redis.Redis(host="localhost", decode_responses=True) # Create a new agent session @app.post("/agent/sessions") async def create_session(initial_prompt: str): session_id = str(uuid.uuid4()) session = AgentSession( session_id=session_id, status=AgentStatus.idle, messages=[{"role": "user", "content": initial_prompt}] ) await redis_client.setex( f"session:{session_id}", 3600, # 1hr TTL session.model_dump_json() ) return {"session_id": session_id} # Get current agent state @app.get("/agent/sessions/{session_id}") async def get_session(session_id: str): data = await redis_client.get(f"session:{session_id}") if not data: raise HTTPException(404, "Session not found or expired") return AgentSession.model_validate_json(data) # Send a message to an existing session @app.post("/agent/sessions/{session_id}/messages") async def send_message(session_id: str, message: str): session = await get_session(session_id) session.messages.append({"role": "user", "content": message}) session.status = AgentStatus.running # Persist and dispatch to background task ... return {"status": "queued"}
A workflow is a sequence of AI steps where each step's output feeds into the next. Unlike a single agent loop, workflows have a defined structure: Step 1 â Step 2 â Step 3. They're great for complex multi-stage tasks: research â outline â draft â review.
from fastapi import FastAPI from pydantic import BaseModel import asyncio class BlogRequest(BaseModel): topic: str target_audience: str word_count: int = 800 class BlogResponse(BaseModel): research: str # Step 1 output outline: str # Step 2 output draft: str # Step 3 output final: str # Step 4 output async def llm(system: str, prompt: str) -> str: """Thin helper to call LLM with a system + user message.""" r = await client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": system}, {"role": "user", "content": prompt}, ] ) return r.choices[0].message.content @app.post("/workflow/blog", response_model=BlogResponse) async def blog_workflow(body: BlogRequest): # Step 1: Research key points research = await llm( "You are a research assistant. List 5 key facts about the topic.", f"Topic: {body.topic} | Audience: {body.target_audience}" ) # Step 2: Create outline using research outline = await llm( "You are a content strategist. Create a blog outline.", f"Topic: {body.topic}\nResearch:\n{research}" ) # Step 3: Write draft based on outline draft = await llm( f"Write a {body.word_count}-word blog post for {body.target_audience}.", f"Outline:\n{outline}\nResearch:\n{research}" ) # Step 4: Polish the draft final = await llm( "Polish this blog post: fix grammar, improve flow, add a strong CTA.", draft ) return BlogResponse( research=research, outline=outline, draft=draft, final=final )
asyncio.gather(step1(), step2(), step3()). This can cut workflow time by 2â3Ã.RAG (Retrieval-Augmented Generation) solves the LLM's biggest limitation: it doesn't know your private data or recent events. RAG works in two phases: indexing (embed your documents and store vectors) and retrieval + generation (find relevant chunks, inject into prompt, get accurate answer).
The first step is getting documents into your system. Users upload PDFs, text files, or raw text. The document gets stored in object storage (S3) or locally, and its content is extracted for the next pipeline step.
from fastapi import FastAPI, UploadFile, File, BackgroundTasks from pydantic import BaseModel import pypdf, io, uuid class UploadResponse(BaseModel): document_id: str filename: str status: str # "queued" â indexing happens in background @app.post("/documents", response_model=UploadResponse) async def upload_document( file: UploadFile = File(...), background_tasks: BackgroundTasks = None ): contents = await file.read() doc_id = str(uuid.uuid4()) # Extract text based on file type if file.content_type == "application/pdf": reader = pypdf.PdfReader(io.BytesIO(contents)) text = " ".join(page.extract_text() for page in reader.pages) elif file.content_type == "text/plain": text = contents.decode() else: raise HTTPException(400, "Unsupported file type") # Index in background so upload feels instant background_tasks.add_task(index_document, doc_id, file.filename, text) return UploadResponse( document_id=doc_id, filename=file.filename, status="queued" )
LLMs have context limits. You can't embed a 100-page PDF as one unit. Chunking splits the document into smaller pieces that fit in an embedding model. The art is finding the right chunk size and overlap â too small loses context, too large loses precision.
from dataclasses import dataclass @dataclass class Chunk: doc_id: str chunk_id: str text: str start: int # character position in source metadata: dict def split_into_chunks( text: str, doc_id: str, chunk_size: int = 500, # tokens (~2000 chars) overlap: int = 50, # overlap to preserve context ) -> list[Chunk]: words = text.split() chunks = [] i = 0 while i < len(words): window = words[i : i + chunk_size] chunk_text = " ".join(window) chunk_start = len(" ".join(words[:i])) chunks.append(Chunk( doc_id = doc_id, chunk_id = f"{doc_id}_{i}", text = chunk_text, start = chunk_start, metadata = {"word_index": i, "word_count": len(window)} )) i += chunk_size - overlap # slide window with overlap return chunks async def index_document(doc_id: str, filename: str, text: str): # 1. Split into chunks chunks = split_into_chunks(text, doc_id) # 2. Embed ALL chunks in one API call (batch) texts_to_embed = [c.text for c in chunks] embed_response = await client.embeddings.create( model="text-embedding-3-small", input=texts_to_embed ) vectors = [item.embedding for item in embed_response.data] # 3. Store in vector DB (pseudocode â depends on your DB choice) for chunk, vector in zip(chunks, vectors): await vector_db.upsert( id=chunk.chunk_id, values=vector, metadata={"text": chunk.text, "doc_id": doc_id, "filename": filename} )
| Chunk Strategy | When to Use |
|---|---|
| Fixed-size (500 words + overlap) | General purpose, most documents |
| Sentence / paragraph boundaries | When semantic units matter (articles, books) |
| Recursive splitting | Code files (split by function/class) |
| Semantic chunking | Group sentences by topic shift (most accurate, slowest) |
The retrieval endpoint takes a search query, embeds it, and finds the most similar chunks in the vector database using cosine similarity. It's the "search" engine of your RAG system.
from fastapi import FastAPI from pydantic import BaseModel class SearchResult(BaseModel): chunk_id: str text: str score: float # cosine similarity (0-1, higher = more similar) doc_id: str filename: str class SearchResponse(BaseModel): query: str results: list[SearchResult] @app.get("/search", response_model=SearchResponse) async def search(q: str, top_k: int = 5, doc_id: str | None = None): # 1. Embed the search query embed_response = await client.embeddings.create( model="text-embedding-3-small", input=[q] ) query_vector = embed_response.data[0].embedding # 2. Search vector DB for nearest neighbors filter = {"doc_id": doc_id} if doc_id else {} matches = await vector_db.query( vector=query_vector, top_k=top_k, include_metadata=True, filter=filter ) # 3. Format results results = [ SearchResult( chunk_id=m.id, text=m.metadata["text"], score=m.score, doc_id=m.metadata["doc_id"], filename=m.metadata["filename"] ) for m in matches ] return SearchResponse(query=q, results=results)
rank-bm25 handle the keyword side.The RAG chat endpoint ties it all together: retrieve relevant chunks, inject them as context into the system prompt, then let the LLM answer the user's question based on that context. This is how "Chat with your PDF" products work.
from fastapi import FastAPI from pydantic import BaseModel class RAGRequest(BaseModel): question: str doc_id: str | None = None # restrict to one document top_k: int = 5 class RAGResponse(BaseModel): answer: str sources: list[dict] # chunks used to generate the answer @app.post("/rag/chat", response_model=RAGResponse) async def rag_chat(body: RAGRequest): # ââ Step 1: Retrieve relevant chunks âââââââââââââââââââââ embed_r = await client.embeddings.create( model="text-embedding-3-small", input=[body.question] ) query_vec = embed_r.data[0].embedding matches = await vector_db.query( vector=query_vec, top_k=body.top_k, include_metadata=True, filter={"doc_id": body.doc_id} if body.doc_id else {} ) # ââ Step 2: Build context from retrieved chunks âââââââââââ context_parts = [] sources = [] for i, match in enumerate(matches, 1): context_parts.append(f"[Source {i}] {match.metadata['text']}") sources.append({ "rank": i, "filename": match.metadata["filename"], "score": round(match.score, 3), "excerpt": match.metadata["text"][:200] + "..." }) context = "\n\n".join(context_parts) # ââ Step 3: Call LLM with context in system prompt âââââââ system = f"""You are a helpful assistant. Answer the user's question ONLY using the provided context below. If the answer is not in the context, say "I don't have enough information in the provided documents." CONTEXT: {context}""" response = await client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": system}, {"role": "user", "content": body.question}, ] ) return RAGResponse( answer=response.choices[0].message.content, sources=sources )
pgvector (Postgres extension, great if you already use Postgres), Pinecone (managed, scales to billions of vectors), Weaviate (open source, hybrid search built-in), Chroma (local-first, great for development/prototyping).You've now covered all 22 topics from Python internals to production AI systems. You can build:
- High-performance async APIs with full type safety
- Secure authentication & authorization systems
- Production deployments with Nginx + Gunicorn + Uvicorn
- Real-time WebSocket & streaming applications
- AI-powered backends: LLM APIs, agent systems, and RAG pipelines