FastAPI Mastery
Topic 10 of 22

Authentication & Authorization

Authentication answers "who are you?" — verifying the caller's identity. Authorization answers "what can you do?" — checking their permissions. FastAPI handles both elegantly through its dependency injection system.

🔑 Authentication (AuthN)
  • Who is this caller?
  • Verifies identity
  • API Key / Password / Token
  • Comes first in the pipeline
  • Returns: a user or identity
🛡️ Authorization (AuthZ)
  • What can this caller do?
  • Verifies permissions
  • Roles / Scopes / Policies
  • Comes after AuthN
  • Returns: allowed or 403 Forbidden
10.1

Authentication

FastAPI supports multiple authentication mechanisms, all implemented as dependencies. Pick the one that matches your use case.

MethodBest forToken lives in
API KeysServer-to-server, simple integrationsHeader / Query param
JWT (Bearer)SPAs, mobile apps, stateless APIsAuthorization header
OAuth2Third-party logins, delegated accessHeader (after token exchange)
CookiesTraditional web apps, SSRCookie header
SessionsStateful web appsCookie (session ID)
🗝️
API Keys

The simplest auth method — a shared secret string. The client sends it in a header (preferred) or query parameter. FastAPI provides APIKeyHeader and APIKeyQuery security scheme classes.

Python — API Key via Header
from fastapi import FastAPI, Security, HTTPException, status
from fastapi.security import APIKeyHeader, APIKeyQuery
import os

app = FastAPI()

# Define where the API key comes from
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
api_key_query  = APIKeyQuery(name="api_key",   auto_error=False)

VALID_KEYS = {"secret-key-123", "another-key-456"}  # store in DB in production

async def get_api_key(
    header_key: str = Security(api_key_header),
    query_key:  str = Security(api_key_query),
):
    key = header_key or query_key   # accept from either location
    if key not in VALID_KEYS:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Invalid or missing API key"
        )
    return key

# ── Protected route ──────────────────────────────
@app.get("/data")
async def get_data(api_key: str = Security(get_api_key)):
    return {"message": "Access granted", "key_used": api_key[:6] + "..."}

# Client sends:  GET /data   Header: X-API-Key: secret-key-123
#            or: GET /data?api_key=secret-key-123
⚠️
Never put API keys in query params for sensitive data — they appear in server logs and browser history. Use headers instead. Always send over HTTPS.
🪙
JWT (Bearer Token) Authentication

JWT (JSON Web Token) is the most common auth mechanism for APIs. The client receives a signed token on login and sends it in every request as Authorization: Bearer <token>. FastAPI provides OAuth2PasswordBearer as the security scheme.

POST /login
username+password
Server creates JWT
signs with secret key
Client stores token
GET /me
Authorization: Bearer <token>
Server verifies & returns user
Python — Bearer token dependency
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
import jwt  # pip install PyJWT

app = FastAPI()

# Tells FastAPI where tokens come from (shown in Swagger UI)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

SECRET_KEY = "your-secret-key"  # use env var in production!
ALGORITHM  = "HS256"

async def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: int = payload.get("sub")
        if user_id is None:
            raise ValueError("No sub claim")
    except jwt.PyJWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or expired token",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return {"id": user_id}  # or fetch user from DB here

@app.get("/me")
async def read_me(current_user = Depends(get_current_user)):
    return current_user
ℹ️
The full JWT lifecycle (creation, claims, refresh tokens) is covered in detail in section 10.2 below.
🔐
OAuth2

OAuth2 is a standard authorization framework, not an auth method itself. FastAPI ships with OAuth2PasswordBearer (Resource Owner Password flow) and you can integrate any OAuth2 provider (Google, GitHub, etc.) using the Authorization Code flow.

Password Flow (simple)
  • Client sends username + password
  • Server returns access token
  • Good for first-party apps
  • FastAPI: OAuth2PasswordBearer
  • Also: OAuth2PasswordRequestForm
Authorization Code (Google/GitHub)
  • Redirect user to provider
  • Provider returns auth code
  • Server exchanges for token
  • Use authlib or httpx-oauth
  • Best for third-party login
Python — complete OAuth2 Password Flow login endpoint
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
import jwt, hashlib, datetime

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

SECRET  = "supersecret"
ALGO    = "HS256"

# Simulated user store
USERS_DB = {
    "alice": {"id": 1, "password_hash": hashlib.sha256(b"password123").hexdigest()}
}

class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"

def create_access_token(user_id: int, expires_minutes: int = 30) -> str:
    expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=expires_minutes)
    payload = {"sub": str(user_id), "exp": expire}
    return jwt.encode(payload, SECRET, algorithm=ALGO)

# ── Login endpoint: POST /token ───────────────────
@app.post("/token", response_model=Token)
async def login(form: OAuth2PasswordRequestForm = Depends()):
    # OAuth2PasswordRequestForm parses username + password from form data
    user = USERS_DB.get(form.username)
    if not user:
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Incorrect username")

    given_hash = hashlib.sha256(form.password.encode()).hexdigest()
    if given_hash != user["password_hash"]:
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Incorrect password")

    token = create_access_token(user["id"])
    return {"access_token": token}

# ── Protected route ───────────────────────────────
@app.get("/me")
async def read_me(token: str = Depends(oauth2_scheme)):
    payload = jwt.decode(token, SECRET, algorithms=[ALGO])
    return {"user_id": payload["sub"]}
🍪
Cookies & Sessions

Cookie-based auth is ideal for traditional web apps and browser clients where you control the frontend. FastAPI reads cookies via Cookie() and sets them via Response.set_cookie(). Sessions store a session ID in the cookie while keeping data server-side.

Python — cookie-based auth
from fastapi import FastAPI, Cookie, Response, HTTPException, Depends
from typing import Optional
import secrets, datetime

app = FastAPI()

# Simulated session store (use Redis/DB in production)
SESSIONS: dict[str, dict] = {}

# ── Login: set cookie ────────────────────────────
@app.post("/login")
async def login(response: Response):
    session_id = secrets.token_urlsafe(32)   # random session ID
    SESSIONS[session_id] = {"user_id": 1, "created": "now"}

    response.set_cookie(
        key="session_id",
        value=session_id,
        httponly=True,      # not accessible via JS (prevents XSS theft)
        secure=True,        # only sent over HTTPS
        samesite="lax",     # CSRF protection
        max_age=3600         # 1 hour
    )
    return {"message": "Logged in"}

# ── Read cookie in dependency ─────────────────────
async def get_session_user(session_id: Optional[str] = Cookie(default=None)):
    if not session_id or session_id not in SESSIONS:
        raise HTTPException(401, "Not authenticated")
    return SESSIONS[session_id]

@app.get("/dashboard")
async def dashboard(user = Depends(get_session_user)):
    return {"user": user}

# ── Logout: delete cookie ─────────────────────────
@app.post("/logout")
async def logout(response: Response, session_id: Optional[str] = Cookie(default=None)):
    if session_id:
        SESSIONS.pop(session_id, None)
    response.delete_cookie("session_id")
    return {"message": "Logged out"}
💡
Always set httponly=True, secure=True, and samesite="lax" on auth cookies. These three settings protect against XSS token theft and CSRF attacks simultaneously.
10.2

JWT Deep Dive

A JSON Web Token is a compact, self-contained token that carries signed claims about a user. It has three Base64-encoded parts separated by dots: header.payload.signature

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZXhwIjoxNzAwMDAwMH0.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
Header (algo+type) · Payload (claims) · Signature (HMAC-SHA256)
📋
JWT Claims

Claims are key-value pairs in the JWT payload. There are standard (registered) claims defined by RFC 7519, and custom claims you add for your application needs.

ClaimNameContains
subSubjectUser ID — who the token is about
expExpirationUnix timestamp — token invalid after this
iatIssued AtWhen the token was created
nbfNot BeforeToken invalid before this time
jtiJWT IDUnique ID (used for revocation)
rolesCustomUser's roles (your own claim)
emailCustomUser email (your own claim)
Python — creating a JWT with claims
import jwt, datetime, uuid
from pydantic import BaseModel

SECRET_KEY = "your-256-bit-secret"
ALGORITHM  = "HS256"

class TokenData(BaseModel):
    user_id: int
    email:   str
    roles:   list[str] = []

def create_token(data: TokenData, expires_minutes: int = 30) -> str:
    now    = datetime.datetime.utcnow()
    expire = now + datetime.timedelta(minutes=expires_minutes)

    payload = {
        # Standard claims
        "sub": str(data.user_id),  # subject — MUST be a string
        "exp": expire,             # expiration
        "iat": now,                # issued at
        "jti": str(uuid.uuid4()),  # unique ID (for revocation)
        # Custom claims
        "email": data.email,
        "roles": data.roles,
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def decode_token(token: str) -> dict:
    return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    # Raises jwt.ExpiredSignatureError if token expired
    # Raises jwt.InvalidTokenError for bad signature

# Usage:
token = create_token(TokenData(user_id=1, email="alice@example.com", roles=["admin"]))
claims = decode_token(token)
# claims = {"sub": "1", "email": "alice@example.com", "roles": ["admin"], ...}
🚨
JWT payloads are not encrypted — they are only Base64-encoded. Never put sensitive data (passwords, credit cards) in a JWT. Anyone with the token can decode and read the payload.
🔄
Expiration & Refresh Tokens

Access tokens should be short-lived (15-60 minutes). When they expire, instead of asking the user to log in again, issue a refresh token — a long-lived (days/weeks) token stored securely that can get a new access token.

Access Token
15–60 min
expires →
401 Unauthorized
POST /refresh
send refresh token
New Access Token
without re-login
Python — access + refresh token pair
import jwt, datetime, uuid
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel

SECRET      = "access-secret"
REF_SECRET  = "refresh-secret"  # different secret for refresh tokens!
ALGO        = "HS256"

app = FastAPI()

def create_access_token(user_id: int) -> str:
    payload = {
        "sub": str(user_id),
        "exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=15),
        "type": "access"
    }
    return jwt.encode(payload, SECRET, algorithm=ALGO)

def create_refresh_token(user_id: int) -> str:
    payload = {
        "sub": str(user_id),
        "exp": datetime.datetime.utcnow() + datetime.timedelta(days=30),
        "jti": str(uuid.uuid4()),    # track in DB for revocation
        "type": "refresh"
    }
    return jwt.encode(payload, REF_SECRET, algorithm=ALGO)

class RefreshRequest(BaseModel):
    refresh_token: str

@app.post("/refresh")
async def refresh(body: RefreshRequest):
    try:
        payload = jwt.decode(body.refresh_token, REF_SECRET, algorithms=[ALGO])
        if payload.get("type") != "refresh":
            raise ValueError("Wrong token type")
    except jwt.PyJWTError:
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid refresh token")

    user_id = int(payload["sub"])
    return {
        "access_token":  create_access_token(user_id),
        "refresh_token": create_refresh_token(user_id),  # rotate!
        "token_type":    "bearer"
    }
💡
Rotate refresh tokens — issue a new refresh token every time one is used. If an old refresh token is presented twice, it indicates theft; revoke the entire session.
🚫
Token Revocation

JWTs are stateless — once issued, you can't "cancel" them until they expire. Revocation requires keeping a server-side record. Two common patterns:

Blocklist (Denylist)
  • Store revoked jti in Redis
  • Check blocklist on every request
  • Delete entries after token expires
  • Minimal storage (only revoked tokens)
  • Best for logout / token theft
Allowlist (Safelist)
  • Store all valid jtis in DB/Redis
  • Check allowlist on every request
  • More storage, more control
  • Can revoke entire user's tokens
  • Best for high-security apps
Python — blocklist revocation with Redis
import redis, jwt, datetime
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Redis client (pip install redis)
r = redis.Redis(host="localhost", port=6379, decode_responses=True)

SECRET = "your-secret"
ALGO   = "HS256"

def revoke_token(jti: str, expires_in_seconds: int):
    # Store jti in Redis with TTL matching token expiry
    r.setex(f"revoked:{jti}", expires_in_seconds, "1")

def is_revoked(jti: str) -> bool:
    return r.exists(f"revoked:{jti}") == 1

async def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET, algorithms=[ALGO])
    except jwt.PyJWTError:
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token")

    # Check if this specific token has been revoked
    jti = payload.get("jti")
    if jti and is_revoked(jti):
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Token has been revoked")

    return payload

@app.post("/logout")
async def logout(token: str = Depends(oauth2_scheme)):
    payload = jwt.decode(token, SECRET, algorithms=[ALGO])
    jti = payload.get("jti")
    exp = payload.get("exp")

    if jti and exp:
        ttl = int(exp - datetime.datetime.utcnow().timestamp())
        if ttl > 0:
            revoke_token(jti, ttl)   # only store while token is still valid

    return {"message": "Logged out successfully"}
▶ Interactive — JWT Token Inspector

Paste a JWT below to decode and inspect its header and payload claims.

Decoded JWT will appear here ↑
10.3

Authorization

Once we know who the user is (AuthN), we need to decide what they can do (AuthZ). FastAPI handles authorization through dependency functions that check the current user's roles, permissions, or attributes.

👥
RBAC — Role-Based Access Control

In RBAC, users are assigned roles (e.g. admin, editor, viewer), and each role has a set of permissions. Routes check if the current user has the required role.

admin → can: read, write, delete, manage_users
editor → can: read, write
viewer → can: read
Python — RBAC with FastAPI dependencies
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
import jwt
from enum import Enum

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET = "secret"

# ── Define roles as an enum ───────────────────────
class Role(str, Enum):
    admin  = "admin"
    editor = "editor"
    viewer = "viewer"

# ── Current user dependency ───────────────────────
class CurrentUser(BaseModel):
    id:    int
    email: str
    roles: list[Role]

async def get_current_user(token: str = Depends(oauth2_scheme)) -> CurrentUser:
    try:
        payload = jwt.decode(token, SECRET, algorithms=["HS256"])
    except jwt.PyJWTError:
        raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token")
    return CurrentUser(
        id=int(payload["sub"]),
        email=payload["email"],
        roles=payload.get("roles", [])
    )

# ── Role checker — returns a dependency function ──
def require_roles(*roles: Role):
    """Factory that creates a dependency checking for any of the given roles."""
    async def checker(user: CurrentUser = Depends(get_current_user)):
        if not any(role in user.roles for role in roles):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Required roles: {[r.value for r in roles]}"
            )
        return user
    return checker

# ── Routes with role requirements ─────────────────
@app.get("/reports")                         # anyone logged in
async def get_reports(user = Depends(get_current_user)):
    return {"reports": []}

@app.post("/articles")                        # editors and admins only
async def create_article(user = Depends(require_roles(Role.editor, Role.admin))):
    return {"created_by": user.email}

@app.delete("/users/{user_id}")               # admins only
async def delete_user(user_id: int, user = Depends(require_roles(Role.admin))):
    return {"deleted": user_id}
💡
The require_roles(*roles) factory pattern is powerful — it returns a fresh dependency function each time, so you can do Depends(require_roles(Role.admin)) inline without defining a new function for every combination.
🎯
ABAC — Attribute-Based Access Control

ABAC is more fine-grained than RBAC. Access decisions are based on attributes of the user, the resource, and the environment. Example: "a user can edit an article only if they are the author". RBAC can't express this — ABAC can.

RBAC — Role check
  • user.role == "admin"
  • Coarse-grained
  • Simple to implement
  • Good for most apps
  • Hard to express ownership
ABAC — Attribute check
  • user.id == article.author_id
  • Fine-grained
  • More complex
  • Good for resource ownership
  • Can combine with roles
Python — ABAC ownership + policy check
from fastapi import FastAPI, Depends, HTTPException, status
from pydantic import BaseModel

app = FastAPI()

class Article(BaseModel):
    id:        int
    title:     str
    author_id: int
    published: bool

# Simulated data store
ARTICLES = {
    1: Article(id=1, title="Hello World", author_id=1, published=False),
    2: Article(id=2, title="FastAPI Rocks", author_id=2, published=True),
}

# ── ABAC Policy functions ─────────────────────────
def can_edit_article(user: dict, article: Article) -> bool:
    # A user can edit if: they are the author, OR they are an admin
    return user["id"] == article.author_id or "admin" in user.get("roles", [])

def can_delete_article(user: dict, article: Article) -> bool:
    # Only admins can delete; authors cannot delete their own published articles
    if "admin" in user.get("roles", []):
        return True
    if user["id"] == article.author_id and not article.published:
        return True  # authors can delete unpublished drafts
    return False

# ── Routes using ABAC policies ────────────────────
@app.put("/articles/{article_id}")
async def update_article(article_id: int, current_user = Depends(get_current_user)):
    article = ARTICLES.get(article_id)
    if not article:
        raise HTTPException(status.HTTP_404_NOT_FOUND, "Article not found")

    if not can_edit_article(current_user, article):
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="You don't have permission to edit this article"
        )
    return {"message": "Article updated"}

@app.delete("/articles/{article_id}")
async def delete_article(article_id: int, current_user = Depends(get_current_user)):
    article = ARTICLES.get(article_id)
    if not article:
        raise HTTPException(status.HTTP_404_NOT_FOUND, "Article not found")

    if not can_delete_article(current_user, article):
        raise HTTPException(status.HTTP_403_FORBIDDEN, "Cannot delete this article")
    return {"message": "Article deleted"}
▶ Interactive — ABAC Access Simulator

Pick a user and action to see ABAC decision:

Select a user and action, then click Check Access ↑
📋
Topic 10 — Quick Reference Summary
ConceptTool / PatternKey Note
API KeysAPIKeyHeader, APIKeyQueryUse header, always HTTPS
JWT BearerOAuth2PasswordBearer + PyJWTShort-lived, stateless
OAuth2 LoginOAuth2PasswordRequestFormPassword flow for first-party
CookiesCookie(), response.set_cookie()Use httponly + secure + samesite
JWT Claimssub, exp, iat, jti + customPayload is not encrypted
Refresh TokensSeparate secret, long-lived, rotateStore jti for revocation
RBACrequire_roles() dependency factoryRoles embedded in JWT
ABACPolicy functions per actionCheck user + resource attributes
Up next
Topic 11 — Security
CORS · CSRF · XSS · Injection Attacks · Secret Management