Authentication & Authorization
Authentication answers "who are you?" — verifying the caller's identity. Authorization answers "what can you do?" — checking their permissions. FastAPI handles both elegantly through its dependency injection system.
- Who is this caller?
- Verifies identity
- API Key / Password / Token
- Comes first in the pipeline
- Returns: a user or identity
- What can this caller do?
- Verifies permissions
- Roles / Scopes / Policies
- Comes after AuthN
- Returns: allowed or 403 Forbidden
Authentication
FastAPI supports multiple authentication mechanisms, all implemented as dependencies. Pick the one that matches your use case.
| Method | Best for | Token lives in |
|---|---|---|
| API Keys | Server-to-server, simple integrations | Header / Query param |
| JWT (Bearer) | SPAs, mobile apps, stateless APIs | Authorization header |
| OAuth2 | Third-party logins, delegated access | Header (after token exchange) |
| Cookies | Traditional web apps, SSR | Cookie header |
| Sessions | Stateful web apps | Cookie (session ID) |
The simplest auth method — a shared secret string. The client sends it in a
header (preferred) or query parameter. FastAPI provides APIKeyHeader and
APIKeyQuery security scheme classes.
from fastapi import FastAPI, Security, HTTPException, status from fastapi.security import APIKeyHeader, APIKeyQuery import os app = FastAPI() # Define where the API key comes from api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) api_key_query = APIKeyQuery(name="api_key", auto_error=False) VALID_KEYS = {"secret-key-123", "another-key-456"} # store in DB in production async def get_api_key( header_key: str = Security(api_key_header), query_key: str = Security(api_key_query), ): key = header_key or query_key # accept from either location if key not in VALID_KEYS: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid or missing API key" ) return key # ── Protected route ────────────────────────────── @app.get("/data") async def get_data(api_key: str = Security(get_api_key)): return {"message": "Access granted", "key_used": api_key[:6] + "..."} # Client sends: GET /data Header: X-API-Key: secret-key-123 # or: GET /data?api_key=secret-key-123
JWT (JSON Web Token) is the most common auth mechanism for APIs. The client receives a
signed token on login and sends it in every request as
Authorization: Bearer <token>.
FastAPI provides OAuth2PasswordBearer as the security scheme.
username+password
signs with secret key
Authorization: Bearer <token>
from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer import jwt # pip install PyJWT app = FastAPI() # Tells FastAPI where tokens come from (shown in Swagger UI) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") SECRET_KEY = "your-secret-key" # use env var in production! ALGORITHM = "HS256" async def get_current_user(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id: int = payload.get("sub") if user_id is None: raise ValueError("No sub claim") except jwt.PyJWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) return {"id": user_id} # or fetch user from DB here @app.get("/me") async def read_me(current_user = Depends(get_current_user)): return current_user
OAuth2 is a standard authorization framework, not an auth method itself.
FastAPI ships with OAuth2PasswordBearer (Resource Owner Password flow)
and you can integrate any OAuth2 provider (Google, GitHub, etc.) using the
Authorization Code flow.
- Client sends username + password
- Server returns access token
- Good for first-party apps
- FastAPI:
OAuth2PasswordBearer - Also:
OAuth2PasswordRequestForm
- Redirect user to provider
- Provider returns auth code
- Server exchanges for token
- Use
authliborhttpx-oauth - Best for third-party login
from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel import jwt, hashlib, datetime app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") SECRET = "supersecret" ALGO = "HS256" # Simulated user store USERS_DB = { "alice": {"id": 1, "password_hash": hashlib.sha256(b"password123").hexdigest()} } class Token(BaseModel): access_token: str token_type: str = "bearer" def create_access_token(user_id: int, expires_minutes: int = 30) -> str: expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=expires_minutes) payload = {"sub": str(user_id), "exp": expire} return jwt.encode(payload, SECRET, algorithm=ALGO) # ── Login endpoint: POST /token ─────────────────── @app.post("/token", response_model=Token) async def login(form: OAuth2PasswordRequestForm = Depends()): # OAuth2PasswordRequestForm parses username + password from form data user = USERS_DB.get(form.username) if not user: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Incorrect username") given_hash = hashlib.sha256(form.password.encode()).hexdigest() if given_hash != user["password_hash"]: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Incorrect password") token = create_access_token(user["id"]) return {"access_token": token} # ── Protected route ─────────────────────────────── @app.get("/me") async def read_me(token: str = Depends(oauth2_scheme)): payload = jwt.decode(token, SECRET, algorithms=[ALGO]) return {"user_id": payload["sub"]}
Cookie-based auth is ideal for traditional web apps and browser clients where you
control the frontend. FastAPI reads cookies via Cookie() and sets
them via Response.set_cookie(). Sessions store a
session ID in the cookie while keeping data server-side.
from fastapi import FastAPI, Cookie, Response, HTTPException, Depends from typing import Optional import secrets, datetime app = FastAPI() # Simulated session store (use Redis/DB in production) SESSIONS: dict[str, dict] = {} # ── Login: set cookie ──────────────────────────── @app.post("/login") async def login(response: Response): session_id = secrets.token_urlsafe(32) # random session ID SESSIONS[session_id] = {"user_id": 1, "created": "now"} response.set_cookie( key="session_id", value=session_id, httponly=True, # not accessible via JS (prevents XSS theft) secure=True, # only sent over HTTPS samesite="lax", # CSRF protection max_age=3600 # 1 hour ) return {"message": "Logged in"} # ── Read cookie in dependency ───────────────────── async def get_session_user(session_id: Optional[str] = Cookie(default=None)): if not session_id or session_id not in SESSIONS: raise HTTPException(401, "Not authenticated") return SESSIONS[session_id] @app.get("/dashboard") async def dashboard(user = Depends(get_session_user)): return {"user": user} # ── Logout: delete cookie ───────────────────────── @app.post("/logout") async def logout(response: Response, session_id: Optional[str] = Cookie(default=None)): if session_id: SESSIONS.pop(session_id, None) response.delete_cookie("session_id") return {"message": "Logged out"}
httponly=True, secure=True, and
samesite="lax" on auth cookies. These three settings protect against
XSS token theft and CSRF attacks simultaneously.JWT Deep Dive
A JSON Web Token is a compact, self-contained token that carries
signed claims about a user. It has three Base64-encoded parts separated by dots:
header.payload.signature
Header (algo+type) · Payload (claims) · Signature (HMAC-SHA256)
Claims are key-value pairs in the JWT payload. There are standard (registered) claims defined by RFC 7519, and custom claims you add for your application needs.
| Claim | Name | Contains |
|---|---|---|
sub | Subject | User ID — who the token is about |
exp | Expiration | Unix timestamp — token invalid after this |
iat | Issued At | When the token was created |
nbf | Not Before | Token invalid before this time |
jti | JWT ID | Unique ID (used for revocation) |
roles | Custom | User's roles (your own claim) |
email | Custom | User email (your own claim) |
import jwt, datetime, uuid from pydantic import BaseModel SECRET_KEY = "your-256-bit-secret" ALGORITHM = "HS256" class TokenData(BaseModel): user_id: int email: str roles: list[str] = [] def create_token(data: TokenData, expires_minutes: int = 30) -> str: now = datetime.datetime.utcnow() expire = now + datetime.timedelta(minutes=expires_minutes) payload = { # Standard claims "sub": str(data.user_id), # subject — MUST be a string "exp": expire, # expiration "iat": now, # issued at "jti": str(uuid.uuid4()), # unique ID (for revocation) # Custom claims "email": data.email, "roles": data.roles, } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) def decode_token(token: str) -> dict: return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # Raises jwt.ExpiredSignatureError if token expired # Raises jwt.InvalidTokenError for bad signature # Usage: token = create_token(TokenData(user_id=1, email="alice@example.com", roles=["admin"])) claims = decode_token(token) # claims = {"sub": "1", "email": "alice@example.com", "roles": ["admin"], ...}
Access tokens should be short-lived (15-60 minutes). When they expire, instead of asking the user to log in again, issue a refresh token — a long-lived (days/weeks) token stored securely that can get a new access token.
15–60 min
send refresh token
without re-login
import jwt, datetime, uuid from fastapi import FastAPI, HTTPException, status from pydantic import BaseModel SECRET = "access-secret" REF_SECRET = "refresh-secret" # different secret for refresh tokens! ALGO = "HS256" app = FastAPI() def create_access_token(user_id: int) -> str: payload = { "sub": str(user_id), "exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=15), "type": "access" } return jwt.encode(payload, SECRET, algorithm=ALGO) def create_refresh_token(user_id: int) -> str: payload = { "sub": str(user_id), "exp": datetime.datetime.utcnow() + datetime.timedelta(days=30), "jti": str(uuid.uuid4()), # track in DB for revocation "type": "refresh" } return jwt.encode(payload, REF_SECRET, algorithm=ALGO) class RefreshRequest(BaseModel): refresh_token: str @app.post("/refresh") async def refresh(body: RefreshRequest): try: payload = jwt.decode(body.refresh_token, REF_SECRET, algorithms=[ALGO]) if payload.get("type") != "refresh": raise ValueError("Wrong token type") except jwt.PyJWTError: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid refresh token") user_id = int(payload["sub"]) return { "access_token": create_access_token(user_id), "refresh_token": create_refresh_token(user_id), # rotate! "token_type": "bearer" }
JWTs are stateless — once issued, you can't "cancel" them until they expire. Revocation requires keeping a server-side record. Two common patterns:
- Store revoked
jtiin Redis - Check blocklist on every request
- Delete entries after token expires
- Minimal storage (only revoked tokens)
- Best for logout / token theft
- Store all valid
jtis in DB/Redis - Check allowlist on every request
- More storage, more control
- Can revoke entire user's tokens
- Best for high-security apps
import redis, jwt, datetime from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Redis client (pip install redis) r = redis.Redis(host="localhost", port=6379, decode_responses=True) SECRET = "your-secret" ALGO = "HS256" def revoke_token(jti: str, expires_in_seconds: int): # Store jti in Redis with TTL matching token expiry r.setex(f"revoked:{jti}", expires_in_seconds, "1") def is_revoked(jti: str) -> bool: return r.exists(f"revoked:{jti}") == 1 async def get_current_user(token: str = Depends(oauth2_scheme)): try: payload = jwt.decode(token, SECRET, algorithms=[ALGO]) except jwt.PyJWTError: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token") # Check if this specific token has been revoked jti = payload.get("jti") if jti and is_revoked(jti): raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Token has been revoked") return payload @app.post("/logout") async def logout(token: str = Depends(oauth2_scheme)): payload = jwt.decode(token, SECRET, algorithms=[ALGO]) jti = payload.get("jti") exp = payload.get("exp") if jti and exp: ttl = int(exp - datetime.datetime.utcnow().timestamp()) if ttl > 0: revoke_token(jti, ttl) # only store while token is still valid return {"message": "Logged out successfully"}
Paste a JWT below to decode and inspect its header and payload claims.
Authorization
Once we know who the user is (AuthN), we need to decide what they can do (AuthZ). FastAPI handles authorization through dependency functions that check the current user's roles, permissions, or attributes.
In RBAC, users are assigned roles (e.g. admin,
editor, viewer), and each role has a set of
permissions. Routes check if the current user has the required role.
editor → can: read, write
viewer → can: read
from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from pydantic import BaseModel import jwt from enum import Enum app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") SECRET = "secret" # ── Define roles as an enum ─────────────────────── class Role(str, Enum): admin = "admin" editor = "editor" viewer = "viewer" # ── Current user dependency ─────────────────────── class CurrentUser(BaseModel): id: int email: str roles: list[Role] async def get_current_user(token: str = Depends(oauth2_scheme)) -> CurrentUser: try: payload = jwt.decode(token, SECRET, algorithms=["HS256"]) except jwt.PyJWTError: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token") return CurrentUser( id=int(payload["sub"]), email=payload["email"], roles=payload.get("roles", []) ) # ── Role checker — returns a dependency function ── def require_roles(*roles: Role): """Factory that creates a dependency checking for any of the given roles.""" async def checker(user: CurrentUser = Depends(get_current_user)): if not any(role in user.roles for role in roles): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Required roles: {[r.value for r in roles]}" ) return user return checker # ── Routes with role requirements ───────────────── @app.get("/reports") # anyone logged in async def get_reports(user = Depends(get_current_user)): return {"reports": []} @app.post("/articles") # editors and admins only async def create_article(user = Depends(require_roles(Role.editor, Role.admin))): return {"created_by": user.email} @app.delete("/users/{user_id}") # admins only async def delete_user(user_id: int, user = Depends(require_roles(Role.admin))): return {"deleted": user_id}
require_roles(*roles) factory pattern is powerful — it returns
a fresh dependency function each time, so you can do
Depends(require_roles(Role.admin)) inline without defining a new
function for every combination.ABAC is more fine-grained than RBAC. Access decisions are based on attributes of the user, the resource, and the environment. Example: "a user can edit an article only if they are the author". RBAC can't express this — ABAC can.
user.role == "admin"- Coarse-grained
- Simple to implement
- Good for most apps
- Hard to express ownership
user.id == article.author_id- Fine-grained
- More complex
- Good for resource ownership
- Can combine with roles
from fastapi import FastAPI, Depends, HTTPException, status from pydantic import BaseModel app = FastAPI() class Article(BaseModel): id: int title: str author_id: int published: bool # Simulated data store ARTICLES = { 1: Article(id=1, title="Hello World", author_id=1, published=False), 2: Article(id=2, title="FastAPI Rocks", author_id=2, published=True), } # ── ABAC Policy functions ───────────────────────── def can_edit_article(user: dict, article: Article) -> bool: # A user can edit if: they are the author, OR they are an admin return user["id"] == article.author_id or "admin" in user.get("roles", []) def can_delete_article(user: dict, article: Article) -> bool: # Only admins can delete; authors cannot delete their own published articles if "admin" in user.get("roles", []): return True if user["id"] == article.author_id and not article.published: return True # authors can delete unpublished drafts return False # ── Routes using ABAC policies ──────────────────── @app.put("/articles/{article_id}") async def update_article(article_id: int, current_user = Depends(get_current_user)): article = ARTICLES.get(article_id) if not article: raise HTTPException(status.HTTP_404_NOT_FOUND, "Article not found") if not can_edit_article(current_user, article): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="You don't have permission to edit this article" ) return {"message": "Article updated"} @app.delete("/articles/{article_id}") async def delete_article(article_id: int, current_user = Depends(get_current_user)): article = ARTICLES.get(article_id) if not article: raise HTTPException(status.HTTP_404_NOT_FOUND, "Article not found") if not can_delete_article(current_user, article): raise HTTPException(status.HTTP_403_FORBIDDEN, "Cannot delete this article") return {"message": "Article deleted"}
Pick a user and action to see ABAC decision:
| Concept | Tool / Pattern | Key Note |
|---|---|---|
| API Keys | APIKeyHeader, APIKeyQuery | Use header, always HTTPS |
| JWT Bearer | OAuth2PasswordBearer + PyJWT | Short-lived, stateless |
| OAuth2 Login | OAuth2PasswordRequestForm | Password flow for first-party |
| Cookies | Cookie(), response.set_cookie() | Use httponly + secure + samesite |
| JWT Claims | sub, exp, iat, jti + custom | Payload is not encrypted |
| Refresh Tokens | Separate secret, long-lived, rotate | Store jti for revocation |
| RBAC | require_roles() dependency factory | Roles embedded in JWT |
| ABAC | Policy functions per action | Check user + resource attributes |