Setup FastAPI email verification and password reset functionality with Redis and PostgreSQL

Intermediate 45 min May 31, 2026 256 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Build secure user authentication with email verification and password reset features in FastAPI using Redis for session management and PostgreSQL for user storage.

Prerequisites

  • Python 3.8+
  • PostgreSQL 12+
  • Redis 6+
  • SMTP server access
  • Basic Python knowledge

What this solves

Email verification and password reset functionality is essential for secure web applications. This tutorial shows you how to implement these features in FastAPI with Redis for session management and PostgreSQL for user storage. You'll build a complete authentication system with email verification, password reset tokens, rate limiting, and proper security middleware.

Step-by-step installation

Update system packages

Start by updating your package manager and installing required system dependencies.

sudo apt update && sudo apt upgrade -y
sudo apt install -y python3 python3-pip python3-venv redis-server postgresql postgresql-contrib
sudo dnf update -y
sudo dnf install -y python3 python3-pip redis postgresql postgresql-server postgresql-contrib

Configure PostgreSQL database

Initialize PostgreSQL and create a database and user for your FastAPI application.

sudo systemctl enable --now postgresql
sudo postgresql-setup --initdb
sudo systemctl enable --now postgresql
sudo -u postgres createuser --interactive

Enter username: fastapi_user

Shall the new role be a superuser? n

Shall the new role be allowed to create databases? y

Shall the new role be allowed to create more new roles? n

sudo -u postgres createdb fastapi_db -O fastapi_user sudo -u postgres psql -c "ALTER USER fastapi_user PASSWORD 'your_secure_password';"

Configure Redis server

Enable Redis and configure it for session storage with proper security settings.

sudo systemctl enable --now redis-server
# Bind to localhost only for security
bind 127.0.0.1

Set a password (uncomment and change)

requirepass your_redis_password

Configure memory policy

maxmemory 256mb maxmemory-policy allkeys-lru

Enable persistence

save 900 1 save 300 10 save 60 10000
sudo systemctl restart redis-server

Create Python virtual environment

Set up an isolated Python environment for your FastAPI application dependencies.

mkdir fastapi-auth && cd fastapi-auth
python3 -m venv venv
source venv/bin/activate

Install FastAPI dependencies

Install all required Python packages for authentication, database access, and email functionality.

pip install fastapi[all] uvicorn sqlalchemy psycopg2-binary redis python-jose[cryptography] passlib[bcrypt] python-multipart email-validator jinja2 aiosmtplib slowapi

Create database models

Define the user model and database schema with SQLAlchemy for PostgreSQL integration.

from sqlalchemy import Column, Integer, String, Boolean, DateTime, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import os

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean, default=False)
    is_verified = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class PasswordResetToken(Base):
    __tablename__ = "password_reset_tokens"
    
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, nullable=False)
    token = Column(String, unique=True, nullable=False)
    expires_at = Column(DateTime, nullable=False)
    used = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)

class EmailVerificationToken(Base):
    __tablename__ = "email_verification_tokens"
    
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, nullable=False)
    token = Column(String, unique=True, nullable=False)
    expires_at = Column(DateTime, nullable=False)
    used = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)

Database connection

DATABASE_URL = "postgresql://fastapi_user:your_secure_password@localhost/fastapi_db" engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Create tables

Base.metadata.create_all(bind=engine)

Configure authentication utilities

Set up password hashing, JWT tokens, and Redis session management.

from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
import secrets
import redis
import json

Password hashing

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

JWT settings

SECRET_KEY = "your-secret-key-change-this-in-production" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30

Redis connection

redis_client = redis.Redis(host='localhost', port=6379, password='your_redis_password', decode_responses=True) def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_token(token: str): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) email: str = payload.get("sub") if email is None: return None return email except JWTError: return None def generate_verification_token() -> str: return secrets.token_urlsafe(32) def store_session(session_id: str, user_data: dict, expire_seconds: int = 1800): redis_client.setex(session_id, expire_seconds, json.dumps(user_data)) def get_session(session_id: str) -> dict: session_data = redis_client.get(session_id) if session_data: return json.loads(session_data) return None def delete_session(session_id: str): redis_client.delete(session_id)

Configure email functionality

Set up SMTP email sending for verification and password reset emails.

import aiosmtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from jinja2 import Template
import os

SMTP configuration

SMTP_HOST = os.getenv("SMTP_HOST", "smtp.gmail.com") SMTP_PORT = int(os.getenv("SMTP_PORT", 587)) SMTP_USERNAME = os.getenv("SMTP_USERNAME", "your-email@example.com") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "your-app-password") FROM_EMAIL = os.getenv("FROM_EMAIL", "noreply@example.com")

Email verification template

VERIFICATION_EMAIL_TEMPLATE = """ Verify Your Email

Email Verification Required

Hello,

Please click the link below to verify your email address:

Verify Email

This link will expire in 24 hours.

If you didn't create an account, please ignore this email.

"""

Password reset template

PASSWORD_RESET_TEMPLATE = """ Reset Your Password

Password Reset Request

Hello,

You requested a password reset. Click the link below to reset your password:

Reset Password

This link will expire in 1 hour.

If you didn't request this, please ignore this email.

""" async def send_email(to_email: str, subject: str, html_content: str): message = MIMEMultipart("alternative") message["Subject"] = subject message["From"] = FROM_EMAIL message["To"] = to_email html_part = MIMEText(html_content, "html") message.attach(html_part) try: await aiosmtplib.send( message, hostname=SMTP_HOST, port=SMTP_PORT, username=SMTP_USERNAME, password=SMTP_PASSWORD, use_tls=True, ) return True except Exception as e: print(f"Failed to send email: {e}") return False async def send_verification_email(email: str, token: str): verification_url = f"http://localhost:8000/verify-email?token={token}" template = Template(VERIFICATION_EMAIL_TEMPLATE) html_content = template.render(verification_url=verification_url) return await send_email( to_email=email, subject="Verify Your Email Address", html_content=html_content ) async def send_password_reset_email(email: str, token: str): reset_url = f"http://localhost:8000/reset-password?token={token}" template = Template(PASSWORD_RESET_TEMPLATE) html_content = template.render(reset_url=reset_url) return await send_email( to_email=email, subject="Reset Your Password", html_content=html_content )

Create main FastAPI application

Build the main application with all authentication endpoints and security middleware.

from fastapi import FastAPI, HTTPException, Depends, Request, Response
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from pydantic import BaseModel, EmailStr
import uuid

from models import User, PasswordResetToken, EmailVerificationToken, SessionLocal
from auth_utils import (
    verify_password, get_password_hash, create_access_token, 
    verify_token, generate_verification_token, store_session, 
    get_session, delete_session
)
from email_service import send_verification_email, send_password_reset_email

Initialize FastAPI app

app = FastAPI(title="FastAPI Authentication API", version="1.0.0")

Rate limiting

limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

CORS middleware

app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:3000"], # Add your frontend URL allow_credentials=True, allow_methods=["*"], allow_headers=["*"], )

Security

security = HTTPBearer()

Pydantic models

class UserCreate(BaseModel): email: EmailStr password: str class UserLogin(BaseModel): email: EmailStr password: str class PasswordReset(BaseModel): email: EmailStr class PasswordResetConfirm(BaseModel): token: str new_password: str class UserResponse(BaseModel): id: int email: str is_active: bool is_verified: bool

Database dependency

def get_db(): db = SessionLocal() try: yield db finally: db.close()

Authentication dependency

def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)): token = credentials.credentials email = verify_token(token) if email is None: raise HTTPException(status_code=401, detail="Invalid authentication credentials") user = db.query(User).filter(User.email == email).first() if user is None: raise HTTPException(status_code=401, detail="User not found") return user @app.post("/register", response_model=dict) @limiter.limit("5/minute") async def register_user(request: Request, user_data: UserCreate, db: Session = Depends(get_db)): # Check if user already exists existing_user = db.query(User).filter(User.email == user_data.email).first() if existing_user: raise HTTPException(status_code=400, detail="Email already registered") # Create new user hashed_password = get_password_hash(user_data.password) user = User( email=user_data.email, hashed_password=hashed_password, is_active=True, is_verified=False ) db.add(user) db.commit() db.refresh(user) # Generate verification token verification_token = generate_verification_token() expires_at = datetime.utcnow() + timedelta(hours=24) token_record = EmailVerificationToken( email=user_data.email, token=verification_token, expires_at=expires_at ) db.add(token_record) db.commit() # Send verification email email_sent = await send_verification_email(user_data.email, verification_token) return { "message": "User registered successfully", "email_sent": email_sent, "user_id": user.id } @app.post("/login", response_model=dict) @limiter.limit("10/minute") async def login(request: Request, response: Response, user_credentials: UserLogin, db: Session = Depends(get_db)): user = db.query(User).filter(User.email == user_credentials.email).first() if not user or not verify_password(user_credentials.password, user.hashed_password): raise HTTPException(status_code=401, detail="Incorrect email or password") if not user.is_active: raise HTTPException(status_code=401, detail="Account is deactivated") if not user.is_verified: raise HTTPException(status_code=401, detail="Please verify your email address") # Create access token access_token_expires = timedelta(minutes=30) access_token = create_access_token( data={"sub": user.email}, expires_delta=access_token_expires ) # Create session session_id = str(uuid.uuid4()) session_data = { "user_id": user.id, "email": user.email, "is_verified": user.is_verified } store_session(session_id, session_data, expire_seconds=1800) # Set session cookie response.set_cookie( key="session_id", value=session_id, httponly=True, secure=True, samesite="lax", max_age=1800 ) return { "access_token": access_token, "token_type": "bearer", "user": { "id": user.id, "email": user.email, "is_verified": user.is_verified } } @app.get("/verify-email") @limiter.limit("5/minute") async def verify_email(request: Request, token: str, db: Session = Depends(get_db)): token_record = db.query(EmailVerificationToken).filter( EmailVerificationToken.token == token, EmailVerificationToken.used == False, EmailVerificationToken.expires_at > datetime.utcnow() ).first() if not token_record: raise HTTPException(status_code=400, detail="Invalid or expired verification token") # Update user verification status user = db.query(User).filter(User.email == token_record.email).first() if user: user.is_verified = True token_record.used = True db.commit() return {"message": "Email verified successfully"} raise HTTPException(status_code=404, detail="User not found") @app.post("/request-password-reset") @limiter.limit("3/minute") async def request_password_reset(request: Request, reset_data: PasswordReset, db: Session = Depends(get_db)): user = db.query(User).filter(User.email == reset_data.email).first() if not user: # Don't reveal if email exists return {"message": "If the email exists, a reset link has been sent"} # Generate reset token reset_token = generate_verification_token() expires_at = datetime.utcnow() + timedelta(hours=1) token_record = PasswordResetToken( email=reset_data.email, token=reset_token, expires_at=expires_at ) db.add(token_record) db.commit() # Send reset email await send_password_reset_email(reset_data.email, reset_token) return {"message": "If the email exists, a reset link has been sent"} @app.post("/reset-password") @limiter.limit("5/minute") async def reset_password(request: Request, reset_data: PasswordResetConfirm, db: Session = Depends(get_db)): token_record = db.query(PasswordResetToken).filter( PasswordResetToken.token == reset_data.token, PasswordResetToken.used == False, PasswordResetToken.expires_at > datetime.utcnow() ).first() if not token_record: raise HTTPException(status_code=400, detail="Invalid or expired reset token") # Update user password user = db.query(User).filter(User.email == token_record.email).first() if user: user.hashed_password = get_password_hash(reset_data.new_password) token_record.used = True db.commit() return {"message": "Password reset successfully"} raise HTTPException(status_code=404, detail="User not found") @app.get("/me", response_model=UserResponse) async def get_current_user_info(current_user: User = Depends(get_current_user)): return UserResponse( id=current_user.id, email=current_user.email, is_active=current_user.is_active, is_verified=current_user.is_verified ) @app.post("/logout") async def logout(request: Request, response: Response): session_id = request.cookies.get("session_id") if session_id: delete_session(session_id) response.delete_cookie("session_id") return {"message": "Logged out successfully"} @app.get("/") async def root(): return {"message": "FastAPI Authentication API", "version": "1.0.0"}

Configure environment variables

Create environment configuration for production deployment with secure defaults.

# Database
DATABASE_URL=postgresql://fastapi_user:your_secure_password@localhost/fastapi_db

Redis

REDIS_URL=redis://localhost:6379 REDIS_PASSWORD=your_redis_password

JWT

SECRET_KEY=your-secret-key-change-this-in-production ALGORITHM=HS256 ACCESS_TOKEN_EXPIRE_MINUTES=30

Email SMTP

SMTP_HOST=smtp.gmail.com SMTP_PORT=587 SMTP_USERNAME=your-email@example.com SMTP_PASSWORD=your-app-password FROM_EMAIL=noreply@example.com

Application

APP_NAME=FastAPI Auth APP_VERSION=1.0.0 DEBUG=False

Add rate limiting and security middleware

Configure additional security headers and advanced rate limiting for production use.

from fastapi import Request
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
import time

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)
        
        # Security headers
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-XSS-Protection"] = "1; mode=block"
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
        response.headers["Content-Security-Policy"] = "default-src 'self'"
        
        return response

class RateLimitMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, calls: int = 100, period: int = 60):
        super().__init__(app)
        self.calls = calls
        self.period = period
        self.clients = {}
    
    async def dispatch(self, request: Request, call_next):
        client_ip = request.client.host
        now = time.time()
        
        if client_ip not in self.clients:
            self.clients[client_ip] = []
        
        # Remove old requests
        self.clients[client_ip] = [
            req_time for req_time in self.clients[client_ip]
            if now - req_time < self.period
        ]
        
        # Check rate limit
        if len(self.clients[client_ip]) >= self.calls:
            return JSONResponse(
                status_code=429,
                content={"detail": "Rate limit exceeded"}
            )
        
        # Add current request
        self.clients[client_ip].append(now)
        
        response = await call_next(request)
        return response

Create startup script

Build a production-ready startup script with proper configuration loading.

import uvicorn
import os
from dotenv import load_dotenv

Load environment variables

load_dotenv() if __name__ == "__main__": # Production configuration uvicorn.run( "main:app", host=os.getenv("HOST", "0.0.0.0"), port=int(os.getenv("PORT", 8000)), reload=os.getenv("DEBUG", "False").lower() == "true", workers=int(os.getenv("WORKERS", 1)), log_level=os.getenv("LOG_LEVEL", "info"), access_log=True )

Start the application

Run the FastAPI application and test the authentication endpoints.

python start.py
Note: The application will start on http://localhost:8000. You can view the interactive API documentation at http://localhost:8000/docs.

Verify your setup

Test the authentication endpoints to ensure everything is working correctly.

# Test user registration
curl -X POST "http://localhost:8000/register" \
  -H "Content-Type: application/json" \
  -d '{"email": "test@example.com", "password": "testpassword123"}'

Test login (after email verification)

curl -X POST "http://localhost:8000/login" \ -H "Content-Type: application/json" \ -d '{"email": "test@example.com", "password": "testpassword123"}'

Test password reset request

curl -X POST "http://localhost:8000/request-password-reset" \ -H "Content-Type: application/json" \ -d '{"email": "test@example.com"}'

Check database connections

psql -U fastapi_user -d fastapi_db -c "SELECT count(*) FROM users;" redis-cli -a your_redis_password ping

Common issues

SymptomCauseFix
Email not sendingSMTP credentials incorrectVerify SMTP settings in .env file and test with email provider
Database connection failedPostgreSQL not running or wrong credentialsCheck service with sudo systemctl status postgresql and verify credentials
Redis connection errorRedis password or connection issuesTest with redis-cli -a your_password ping
Token verification failsSecret key mismatchEnsure SECRET_KEY is consistent across restarts
Rate limiting too aggressiveDefault limits too lowAdjust rate limits in @limiter.limit() decorators
CORS errors in browserFrontend URL not in allowed originsAdd your frontend URL to allow_origins in CORS middleware

Next steps

Running this in production?

Want this handled for you? Setting up FastAPI authentication once is straightforward. Keeping it patched, monitored, backed up and performant across environments is the harder part. See how we run infrastructure like this for European teams.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed cloud infrastructure for businesses that depend on uptime. From initial setup to ongoing operations.