Set up production-ready FastAPI applications with Redis-backed rate limiting, security middleware, and JWT authentication to protect against abuse and secure your API endpoints.
Prerequisites
- Python 3.8 or higher
- Redis server
- Root or sudo access
- Basic Python knowledge
What this solves
FastAPI applications need protection against abuse, brute force attacks, and unauthorized access in production environments. This tutorial shows you how to implement Redis-backed rate limiting, security middleware with CORS and security headers, and JWT authentication to create a robust, secure API that can handle real-world traffic patterns and security threats.
Step-by-step installation
Update system packages and install Redis
Start by updating your system and installing Redis server which will store rate limiting data and session information.
sudo apt update && sudo apt upgrade -y
sudo apt install -y redis-server python3-pip python3-venv
Configure Redis for production use
Secure Redis by requiring authentication and binding to localhost only for security.
# Bind to localhost only
bind 127.0.0.1
Set a strong password
requirepass your_strong_redis_password_here
Enable persistence
save 900 1
save 300 10
save 60 10000
Set max memory policy
maxmemory 256mb
maxmemory-policy allkeys-lru
Security settings
protected-mode yes
port 6379
Start and enable Redis service
Enable Redis to start automatically on boot and verify it's running correctly.
sudo systemctl enable --now redis
sudo systemctl status redis
redis-cli ping
Create Python virtual environment and install dependencies
Set up an isolated Python environment for your FastAPI application with all required security and rate limiting packages.
mkdir -p /opt/fastapi-secure
cd /opt/fastapi-secure
python3 -m venv venv
source venv/bin/activate
pip install fastapi[all] redis slowapi python-jose[cryptography] passlib[bcrypt] python-multipart
Create rate limiting middleware
Implement Redis-backed rate limiting using SlowAPI to prevent abuse and control request frequency per client.
import redis
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import Request, HTTPException
import os
Redis connection with authentication
redis_client = redis.Redis(
host='127.0.0.1',
port=6379,
password='your_strong_redis_password_here',
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True
)
Custom key function that combines IP and user ID if authenticated
def get_rate_limit_key(request: Request):
# Get client IP
forwarded = request.headers.get('X-Forwarded-For')
if forwarded:
client_ip = forwarded.split(',')[0].strip()
else:
client_ip = get_remote_address(request)
# If user is authenticated, include user ID in key
user_id = getattr(request.state, 'user_id', None)
if user_id:
return f"rate_limit:{client_ip}:{user_id}"
return f"rate_limit:{client_ip}"
Initialize limiter with Redis backend
limiter = Limiter(
key_func=get_rate_limit_key,
storage_uri="redis://127.0.0.1:6379",
storage_options={"password": "your_strong_redis_password_here"}
)
Custom rate limit exceeded handler
def custom_rate_limit_handler(request: Request, exc: RateLimitExceeded):
response = JSONResponse(
status_code=429,
content={
"error": "Rate limit exceeded",
"message": f"Rate limit exceeded: {exc.detail}",
"retry_after": exc.retry_after
}
)
response.headers["Retry-After"] = str(exc.retry_after)
return response
Create JWT authentication system
Implement secure JWT token handling with proper password hashing and token validation.
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import redis
import json
JWT Configuration
SECRET_KEY = "your-secret-key-change-this-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
Security scheme
security = HTTPBearer()
Redis client for token blacklisting
redis_client = redis.Redis(
host='127.0.0.1',
port=6379,
password='your_strong_redis_password_here',
decode_responses=True
)
class AuthManager:
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
@staticmethod
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
@staticmethod
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire, "type": "access"})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@staticmethod
def create_refresh_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@staticmethod
def verify_token(token: str, token_type: str = "access"):
try:
# Check if token is blacklisted
if redis_client.get(f"blacklist:{token}"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has been revoked"
)
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != token_type:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type"
)
username: str = payload.get("sub")
user_id: str = payload.get("user_id")
if username is None or user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
return {"username": username, "user_id": user_id}
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
@staticmethod
def blacklist_token(token: str, expires_in: int = None):
"""Add token to Redis blacklist"""
if expires_in is None:
expires_in = ACCESS_TOKEN_EXPIRE_MINUTES * 60
redis_client.setex(f"blacklist:{token}", expires_in, "true")
Dependency to get current user
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
return AuthManager.verify_token(token)
Dependency for optional authentication
async def get_current_user_optional(credentials: HTTPAuthorizationCredentials = Depends(security)):
if not credentials:
return None
try:
token = credentials.credentials
return AuthManager.verify_token(token)
except HTTPException:
return None
Create security middleware
Implement comprehensive security headers and CORS configuration to protect against common web vulnerabilities.
from fastapi import Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
import time
import uuid
class SecurityHeadersMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
request = Request(scope, receive)
# Add request ID for tracking
request.state.request_id = str(uuid.uuid4())
# Process request
response = await self.app(scope, receive, send)
# Add security headers
if hasattr(response, 'headers'):
self.add_security_headers(response)
return response
return await self.app(scope, receive, send)
def add_security_headers(self, response: Response):
# Prevent XSS attacks
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
# HTTPS enforcement
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
# Content Security Policy
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self'; "
"connect-src 'self'; "
"frame-ancestors 'none';"
)
# Referrer Policy
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Feature Policy
response.headers["Permissions-Policy"] = (
"geolocation=(), "
"microphone=(), "
"camera=()"
)
# Remove server information
response.headers["Server"] = "FastAPI"
def configure_cors(app):
"""Configure CORS with secure defaults"""
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://example.com", # Replace with your domain
"https://app.example.com",
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=[
"Accept",
"Accept-Language",
"Content-Language",
"Content-Type",
"Authorization"
],
expose_headers=["X-Request-ID"]
)
def configure_trusted_hosts(app):
"""Configure trusted hosts middleware"""
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=[
"example.com", # Replace with your domain
"*.example.com",
"127.0.0.1",
"localhost"
]
)
Create the main FastAPI application
Integrate all security components into a complete FastAPI application with protected and public endpoints.
from fastapi import FastAPI, HTTPException, Depends, Request, status
from fastapi.responses import JSONResponse
from fastapi.security import HTTPAuthorizationCredentials
from pydantic import BaseModel, EmailStr
from slowapi.errors import RateLimitExceeded
import uvicorn
import logging
from datetime import timedelta
Import our modules
from rate_limiter import limiter, custom_rate_limit_handler
from auth import AuthManager, get_current_user, get_current_user_optional
from security_middleware import SecurityHeadersMiddleware, configure_cors, configure_trusted_hosts
Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Create FastAPI app
app = FastAPI(
title="Secure FastAPI Application",
description="FastAPI with rate limiting, security middleware, and authentication",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
Add rate limiter
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler)
Add security middleware
app.add_middleware(SecurityHeadersMiddleware)
configure_cors(app)
configure_trusted_hosts(app)
Request/Response models
class LoginRequest(BaseModel):
username: str
password: str
class RegisterRequest(BaseModel):
username: str
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str
class RefreshRequest(BaseModel):
refresh_token: str
Mock user database (replace with real database)
fake_users_db = {
"testuser": {
"user_id": "1",
"username": "testuser",
"email": "test@example.com",
"hashed_password": AuthManager.get_password_hash("testpassword")
}
}
Public endpoints with rate limiting
@app.get("/")
@limiter.limit("100/minute")
async def root(request: Request):
return {"message": "Welcome to Secure FastAPI", "status": "healthy"}
@app.get("/health")
@limiter.limit("200/minute")
async def health_check(request: Request):
return {"status": "healthy", "timestamp": "2024-01-01T00:00:00Z"}
Authentication endpoints
@app.post("/auth/register", response_model=dict)
@limiter.limit("5/minute") # Strict limit for registration
async def register(request: Request, user_data: RegisterRequest):
# Check if user exists
if user_data.username in fake_users_db:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
)
# Create new user
user_id = str(len(fake_users_db) + 1)
fake_users_db[user_data.username] = {
"user_id": user_id,
"username": user_data.username,
"email": user_data.email,
"hashed_password": AuthManager.get_password_hash(user_data.password)
}
logger.info(f"New user registered: {user_data.username}")
return {"message": "User registered successfully", "username": user_data.username}
@app.post("/auth/login", response_model=TokenResponse)
@limiter.limit("10/minute") # Strict limit for login attempts
async def login(request: Request, credentials: LoginRequest):
# Verify user
user = fake_users_db.get(credentials.username)
if not user or not AuthManager.verify_password(credentials.password, user["hashed_password"]):
logger.warning(f"Failed login attempt for user: {credentials.username}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
)
# Create tokens
access_token = AuthManager.create_access_token(
data={"sub": user["username"], "user_id": user["user_id"]}
)
refresh_token = AuthManager.create_refresh_token(
data={"sub": user["username"], "user_id": user["user_id"]}
)
logger.info(f"User logged in: {credentials.username}")
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
@app.post("/auth/refresh", response_model=dict)
@limiter.limit("20/minute")
async def refresh_token(request: Request, refresh_data: RefreshRequest):
# Verify refresh token
user_data = AuthManager.verify_token(refresh_data.refresh_token, "refresh")
# Create new access token
new_access_token = AuthManager.create_access_token(
data={"sub": user_data["username"], "user_id": user_data["user_id"]}
)
return {
"access_token": new_access_token,
"token_type": "bearer"
}
@app.post("/auth/logout")
@limiter.limit("30/minute")
async def logout(request: Request, current_user: dict = Depends(get_current_user)):
# Get token from request
auth_header = request.headers.get("authorization")
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ")[1]
AuthManager.blacklist_token(token)
logger.info(f"User logged out: {current_user['username']}")
return {"message": "Successfully logged out"}
Protected endpoints
@app.get("/protected")
@limiter.limit("50/minute")
async def protected_route(request: Request, current_user: dict = Depends(get_current_user)):
# Store user_id in request state for rate limiting
request.state.user_id = current_user["user_id"]
return {
"message": "This is a protected route",
"user": current_user["username"],
"user_id": current_user["user_id"]
}
@app.get("/user/profile")
@limiter.limit("30/minute")
async def get_profile(request: Request, current_user: dict = Depends(get_current_user)):
request.state.user_id = current_user["user_id"]
user = fake_users_db.get(current_user["username"])
return {
"username": user["username"],
"email": user["email"],
"user_id": user["user_id"]
}
Public API with optional authentication
@app.get("/api/data")
@limiter.limit("100/minute")
async def get_data(request: Request, current_user: dict = Depends(get_current_user_optional)):
if current_user:
request.state.user_id = current_user["user_id"]
# Authenticated users get more data
return {
"data": "Full dataset for authenticated users",
"user": current_user["username"],
"premium": True
}
else:
# Public data for unauthenticated users
return {
"data": "Limited public dataset",
"premium": False
}
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)
Create systemd service file
Set up a systemd service to run your FastAPI application automatically and manage it like other system services.
[Unit]
Description=Secure FastAPI Application
After=network.target redis.service
Requires=redis.service
[Service]
Type=exec
User=www-data
Group=www-data
WorkingDirectory=/opt/fastapi-secure
Environment=PATH=/opt/fastapi-secure/venv/bin
ExecStart=/opt/fastapi-secure/venv/bin/uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
Restart=always
RestartSec=10
Security settings
NoNewPrivileges=yes
PrivateTmp=yes
ProtectSystem=strict
ProtectHome=yes
ReadWritePaths=/opt/fastapi-secure
Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=fastapi-secure
[Install]
WantedBy=multi-user.target
Set proper file permissions and ownership
Secure your application files with correct ownership and permissions for the web server user.
sudo chown -R www-data:www-data /opt/fastapi-secure
sudo chmod -R 755 /opt/fastapi-secure
sudo chmod 644 /opt/fastapi-secure/*.py
sudo systemctl daemon-reload
sudo systemctl enable --now fastapi-secure
Configure Nginx reverse proxy (optional)
Set up Nginx as a reverse proxy to handle SSL termination and static file serving for better production performance.
server {
listen 80;
server_name example.com;
# Rate limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req zone=api burst=20 nodelay;
# Security headers
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
# Proxy to FastAPI
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeouts
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
# Health check endpoint
location /health {
proxy_pass http://127.0.0.1:8000/health;
access_log off;
}
}
Enable Nginx configuration
Activate the Nginx configuration and restart the service to apply changes.
sudo ln -s /etc/nginx/sites-available/fastapi-secure /etc/nginx/sites-enabled/
sudo nginx -t
sudo systemctl restart nginx
Verify your setup
Test your secure FastAPI application to ensure all components are working correctly.
# Check services are running
sudo systemctl status redis
sudo systemctl status fastapi-secure
sudo systemctl status nginx
Test the API endpoints
curl -X GET http://localhost:8000/health
Test rate limiting (should fail after many requests)
for i in {1..15}; do curl -X GET http://localhost:8000/; done
Test authentication
curl -X POST http://localhost:8000/auth/register \
-H "Content-Type: application/json" \
-d '{"username": "newuser", "email": "user@example.com", "password": "securepass123"}'
You can also access the interactive API documentation at http://your-server:8000/docs to test all endpoints.
Configure environment variables for production
Create environment configuration
Use environment variables for sensitive configuration instead of hardcoded values.
# JWT Configuration
SECRET_KEY=your-very-long-secret-key-change-this-in-production-min-32-chars
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
Redis Configuration
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_PASSWORD=your_strong_redis_password_here
REDIS_DB=0
Application Configuration
ENVIRONMENT=production
DEBUG=false
ALLOWED_HOSTS=example.com,api.example.com
Rate Limiting Configuration
GLOBAL_RATE_LIMIT=100/minute
AUTH_RATE_LIMIT=10/minute
REGISTER_RATE_LIMIT=5/minute
Update application to use environment variables
Modify your FastAPI application to load configuration from environment variables.
import os
from pydantic import BaseSettings
class Settings(BaseSettings):
# JWT Settings
secret_key: str = os.getenv("SECRET_KEY", "dev-key-change-in-production")
algorithm: str = os.getenv("ALGORITHM", "HS256")
access_token_expire_minutes: int = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
refresh_token_expire_days: int = int(os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", "7"))
# Redis Settings
redis_host: str = os.getenv("REDIS_HOST", "127.0.0.1")
redis_port: int = int(os.getenv("REDIS_PORT", "6379"))
redis_password: str = os.getenv("REDIS_PASSWORD", "")
redis_db: int = int(os.getenv("REDIS_DB", "0"))
# Application Settings
environment: str = os.getenv("ENVIRONMENT", "development")
debug: bool = os.getenv("DEBUG", "false").lower() == "true"
allowed_hosts: list = os.getenv("ALLOWED_HOSTS", "localhost").split(",")
# Rate Limiting
global_rate_limit: str = os.getenv("GLOBAL_RATE_LIMIT", "100/minute")
auth_rate_limit: str = os.getenv("AUTH_RATE_LIMIT", "10/minute")
register_rate_limit: str = os.getenv("REGISTER_RATE_LIMIT", "5/minute")
class Config:
env_file = ".env"
settings = Settings()
Set secure file permissions
Protect your environment file from unauthorized access.
sudo chown www-data:www-data /opt/fastapi-secure/.env
sudo chmod 600 /opt/fastapi-secure/.env
sudo systemctl restart fastapi-secure
.env files to version control. Consider using a secrets management system like HashiCorp Vault for production deployments.Common issues
| Symptom | Cause | Fix |
|---|---|---|
| FastAPI service won't start | Missing Python dependencies or permission errors | sudo -u www-data /opt/fastapi-secure/venv/bin/python -m pip list to check deps |
| Redis connection failed | Redis not running or wrong password | sudo systemctl status redis and check password in config |
| Rate limiting not working | Redis not accessible or wrong configuration | redis-cli -a your_password ping to test Redis connection |
| JWT tokens invalid | Clock skew or wrong secret key | Check system time with timedatectl status and verify SECRET_KEY |
| CORS errors in browser | Wrong allowed origins in CORS configuration | Update allowed_origins in security_middleware.py with your domain |
| High memory usage | Too many uvicorn workers for available RAM | Reduce workers in systemd service file or increase server memory |
Next steps
- Setup FastAPI email verification and password reset functionality with Redis and PostgreSQL for complete user management
- Monitor FastAPI applications with Prometheus and Grafana for production observability to track performance and security metrics
- Configure NGINX SSL termination with Certbot for Let's Encrypt certificates to enable HTTPS
- Implement FastAPI background tasks with Celery and Redis for async processing
- Setup FastAPI database migrations with Alembic and PostgreSQL for data persistence
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
# Configuration
APP_DIR="/opt/fastapi-secure"
REDIS_PASSWORD=""
JWT_SECRET=""
# Functions
log_info() { echo -e "${GREEN}[INFO]${NC} $1"; }
log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; }
log_error() { echo -e "${RED}[ERROR]${NC} $1"; }
cleanup() {
if [[ $? -ne 0 ]]; then
log_error "Installation failed. Cleaning up..."
[[ -d "$APP_DIR" ]] && rm -rf "$APP_DIR"
systemctl stop redis 2>/dev/null || true
fi
}
generate_password() {
openssl rand -base64 32 | tr -d "=+/" | cut -c1-25
}
usage() {
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " --redis-password PASSWORD Set custom Redis password (auto-generated if not provided)"
echo " --jwt-secret SECRET Set custom JWT secret (auto-generated if not provided)"
echo " -h, --help Show this help message"
exit 1
}
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
--redis-password)
REDIS_PASSWORD="$2"
shift 2
;;
--jwt-secret)
JWT_SECRET="$2"
shift 2
;;
-h|--help)
usage
;;
*)
log_error "Unknown option: $1"
usage
;;
esac
done
# Generate passwords if not provided
[[ -z "$REDIS_PASSWORD" ]] && REDIS_PASSWORD=$(generate_password)
[[ -z "$JWT_SECRET" ]] && JWT_SECRET=$(generate_password)
trap cleanup ERR
# Check prerequisites
if [[ $EUID -ne 0 ]]; then
log_error "This script must be run as root"
exit 1
fi
# Detect distribution
if [[ ! -f /etc/os-release ]]; then
log_error "Cannot detect OS distribution"
exit 1
fi
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update && apt upgrade -y"
PKG_INSTALL="apt install -y"
REDIS_SERVICE="redis-server"
REDIS_CONF="/etc/redis/redis.conf"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
REDIS_SERVICE="redis"
REDIS_CONF="/etc/redis/redis.conf"
if ! command -v dnf &> /dev/null; then
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
fi
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
REDIS_SERVICE="redis"
REDIS_CONF="/etc/redis.conf"
;;
*)
log_error "Unsupported distribution: $ID"
exit 1
;;
esac
log_info "Detected OS: $PRETTY_NAME"
# Install packages
echo "[1/7] Updating system packages..."
$PKG_UPDATE
echo "[2/7] Installing Redis and Python dependencies..."
$PKG_INSTALL redis python3-pip python3-venv
# Enable and configure Redis based on distro
if [[ "$ID" == "ubuntu" || "$ID" == "debian" ]]; then
[[ ! -f "$REDIS_CONF" ]] && REDIS_CONF="/etc/redis/redis.conf"
else
# Enable EPEL for RHEL-based systems if needed
if [[ "$ID" == "centos" || "$ID" == "rhel" ]]; then
$PKG_INSTALL epel-release || true
fi
fi
echo "[3/7] Configuring Redis..."
cp "$REDIS_CONF" "${REDIS_CONF}.backup"
# Configure Redis
cat > "$REDIS_CONF" << EOF
bind 127.0.0.1
port 6379
requirepass $REDIS_PASSWORD
save 900 1
save 300 10
save 60 10000
maxmemory 256mb
maxmemory-policy allkeys-lru
protected-mode yes
daemonize yes
EOF
# Set proper permissions for Redis config
chown redis:redis "$REDIS_CONF" 2>/dev/null || chown root:root "$REDIS_CONF"
chmod 640 "$REDIS_CONF"
systemctl enable --now "$REDIS_SERVICE"
echo "[4/7] Creating FastAPI application directory..."
mkdir -p "$APP_DIR"
cd "$APP_DIR"
# Create virtual environment
python3 -m venv venv
source venv/bin/activate
echo "[5/7] Installing Python dependencies..."
pip install --upgrade pip
pip install fastapi[all] redis slowapi python-jose[cryptography] passlib[bcrypt] python-multipart
echo "[6/7] Creating application files..."
# Create rate limiting middleware
cat > rate_limiting.py << 'EOF'
import redis
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import Request
from fastapi.responses import JSONResponse
import os
redis_client = redis.Redis(
host='127.0.0.1',
port=6379,
password=os.getenv('REDIS_PASSWORD', ''),
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True
)
def get_rate_limit_key(request: Request):
forwarded = request.headers.get('X-Forwarded-For')
if forwarded:
client_ip = forwarded.split(',')[0].strip()
else:
client_ip = get_remote_address(request)
user_id = getattr(request.state, 'user_id', None)
if user_id:
return f"rate_limit:{client_ip}:{user_id}"
return f"rate_limit:{client_ip}"
limiter = Limiter(
key_func=get_rate_limit_key,
storage_uri=f"redis://127.0.0.1:6379",
storage_options={"password": os.getenv('REDIS_PASSWORD', '')}
)
def custom_rate_limit_handler(request: Request, exc: RateLimitExceeded):
response = JSONResponse(
status_code=429,
content={
"error": "Rate limit exceeded",
"message": f"Rate limit exceeded: {exc.detail}",
"retry_after": exc.retry_after
}
)
response.headers["Retry-After"] = str(exc.retry_after)
return response
EOF
# Create authentication module
cat > auth.py << 'EOF'
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import os
SECRET_KEY = os.getenv('JWT_SECRET', 'fallback-secret-key')
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
return username
EOF
# Create main application
cat > main.py << 'EOF'
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from rate_limiting import limiter, custom_rate_limit_handler
from auth import verify_token, create_access_token, get_password_hash
import secrets
app = FastAPI(title="Secure FastAPI with Rate Limiting")
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, custom_rate_limit_handler)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
@app.middleware("http")
async def security_headers(request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
return response
@app.get("/")
@limiter.limit("100/minute")
async def root(request):
return {"message": "Secure FastAPI with Redis rate limiting"}
@app.post("/login")
@limiter.limit("5/minute")
async def login(request, credentials: HTTPBasicCredentials = Depends(HTTPBasic())):
# Demo user - replace with real authentication
if credentials.username == "admin" and credentials.password == "password":
access_token = create_access_token(data={"sub": credentials.username})
return {"access_token": access_token, "token_type": "bearer"}
raise HTTPException(status_code=401, detail="Invalid credentials")
@app.get("/protected")
@limiter.limit("50/minute")
async def protected_route(request, current_user: str = Depends(verify_token)):
return {"message": f"Hello {current_user}", "data": "This is protected content"}
EOF
# Create environment file
cat > .env << EOF
REDIS_PASSWORD=$REDIS_PASSWORD
JWT_SECRET=$JWT_SECRET
EOF
# Set proper permissions
chown -R root:root "$APP_DIR"
chmod 755 "$APP_DIR"
chmod 644 "$APP_DIR"/*.py
chmod 600 "$APP_DIR"/.env
echo "[7/7] Verifying installation..."
sleep 2
# Test Redis connection
if redis-cli -a "$REDIS_PASSWORD" ping | grep -q PONG; then
log_info "Redis is running and accessible"
else
log_error "Redis connection failed"
exit 1
fi
# Test Python imports
if source venv/bin/activate && python -c "import fastapi, redis, slowapi; print('All imports successful')"; then
log_info "Python dependencies installed successfully"
else
log_error "Python dependency check failed"
exit 1
fi
log_info "FastAPI security setup completed successfully!"
echo ""
echo "Configuration:"
echo "- Application directory: $APP_DIR"
echo "- Redis password: $REDIS_PASSWORD"
echo "- JWT secret: $JWT_SECRET"
echo ""
echo "To start the application:"
echo "cd $APP_DIR"
echo "source venv/bin/activate"
echo "uvicorn main:app --host 0.0.0.0 --port 8000"
Review the script before running. Execute with: bash install.sh