Set up optimized async database connection pooling in FastAPI using SQLAlchemy and asyncpg with PostgreSQL 17. Learn connection pool tuning, session management patterns, and performance optimization for production workloads.
Prerequisites
- Python 3.8+
- PostgreSQL 17
- Root access
- 4GB RAM minimum
What this solves
FastAPI applications need efficient database connections to handle high traffic without exhausting PostgreSQL resources. Connection pooling reuses database connections instead of creating new ones for each request, dramatically improving performance and reducing server load. This tutorial configures SQLAlchemy async connection pooling with asyncpg for production FastAPI applications.
Step-by-step installation
Update system packages
Start by updating your package manager to ensure you have the latest package information and security updates.
sudo apt update && sudo apt upgrade -y
Install PostgreSQL 17
Install PostgreSQL 17 server and development headers needed for Python database drivers.
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
echo "deb http://apt.postgresql.org/pub/repos/apt/ $(lsb_release -cs)-pgdg main" | sudo tee /etc/apt/sources.list.d/pgdg.list
sudo apt update
sudo apt install -y postgresql-17 postgresql-17-dev python3-dev build-essential
Initialize and start PostgreSQL
Initialize the PostgreSQL database cluster and start the service.
sudo systemctl enable --now postgresql
sudo systemctl status postgresql
Create application database and user
Create a dedicated database and user for your FastAPI application with appropriate permissions.
sudo -u postgres psql -c "CREATE DATABASE fastapi_app;"
sudo -u postgres psql -c "CREATE USER fastapi_user WITH ENCRYPTED PASSWORD 'SecurePass123!';"
sudo -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE fastapi_app TO fastapi_user;"
sudo -u postgres psql -c "ALTER USER fastapi_user CREATEDB;"
Configure PostgreSQL for connection pooling
Optimize PostgreSQL settings for connection pooling and performance. These settings allow more concurrent connections and improve memory usage.
max_connections = 200
shared_buffers = 256MB
effective_cache_size = 1GB
maintenance_work_mem = 64MB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100
random_page_cost = 1.1
effective_io_concurrency = 200
work_mem = 4MB
min_wal_size = 1GB
max_wal_size = 4GB
Restart PostgreSQL
Restart PostgreSQL to apply the configuration changes.
sudo systemctl restart postgresql
Install Python dependencies
Create a virtual environment and install FastAPI, SQLAlchemy, and asyncpg for async database operations.
python3 -m venv fastapi_env
source fastapi_env/bin/activate
pip install --upgrade pip
pip install fastapi[all] sqlalchemy[asyncio] asyncpg psycopg2-binary alembic
Configure SQLAlchemy connection pooling
Create database configuration module
Set up the database configuration with optimized connection pool settings. The QueuePool manages connection reuse efficiently.
import os
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
from sqlalchemy import event
from sqlalchemy.engine import Engine
Database configuration
DATABASE_URL = "postgresql+asyncpg://fastapi_user:SecurePass123!@localhost/fastapi_app"
Create async engine with connection pooling
engine = create_async_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # Number of connections to maintain in pool
max_overflow=30, # Additional connections beyond pool_size
pool_pre_ping=True, # Validate connections before use
pool_recycle=3600, # Recreate connections after 1 hour
pool_timeout=30, # Timeout when getting connection from pool
echo=False, # Set to True for SQL debugging
future=True
)
Create async session factory
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
autocommit=False
)
Base = declarative_base()
Database dependency for FastAPI
async def get_database_session():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
Create database models
Define SQLAlchemy models for your application. This example creates a User model with optimized indexing.
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Index
from sqlalchemy.sql import func
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True, nullable=False)
username = Column(String(100), unique=True, index=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Composite index for common queries
__table_args__ = (
Index('idx_user_email_active', 'email', 'is_active'),
Index('idx_user_username_active', 'username', 'is_active'),
)
Create FastAPI application with async endpoints
Build a FastAPI application that uses the async database connection pool. This example shows proper async/await patterns.
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy.exc import IntegrityError
from typing import List
import asyncio
from pydantic import BaseModel, EmailStr
from database import get_database_session, engine
from models import User, Base
app = FastAPI(title="FastAPI Connection Pooling Demo")
Pydantic models
class UserCreate(BaseModel):
username: str
email: EmailStr
password: str
class UserResponse(BaseModel):
id: int
username: str
email: str
is_active: bool
class Config:
from_attributes = True
Create tables on startup
@app.on_event("startup")
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
Health check endpoint
@app.get("/health")
async def health_check(db: AsyncSession = Depends(get_database_session)):
try:
# Test database connectivity
result = await db.execute(select(func.count()).select_from(User))
user_count = result.scalar()
return {
"status": "healthy",
"database": "connected",
"total_users": user_count
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Database connection failed: {str(e)}"
)
Create user endpoint
@app.post("/users/", response_model=UserResponse)
async def create_user(
user: UserCreate,
db: AsyncSession = Depends(get_database_session)
):
try:
# Hash password in production
hashed_password = f"hashed_{user.password}"
db_user = User(
username=user.username,
email=user.email,
hashed_password=hashed_password
)
db.add(db_user)
await db.flush() # Get the ID without committing
await db.refresh(db_user)
return db_user
except IntegrityError:
await db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User with this email or username already exists"
)
Get users with pagination
@app.get("/users/", response_model=List[UserResponse])
async def get_users(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_database_session)
):
query = select(User).where(User.is_active == True).offset(skip).limit(limit)
result = await db.execute(query)
users = result.scalars().all()
return users
Get user by ID
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_database_session)
):
query = select(User).where(User.id == user_id, User.is_active == True)
result = await db.execute(query)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
Connection pool stats endpoint
@app.get("/pool-stats")
async def get_pool_stats():
pool = engine.pool
return {
"pool_size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
"invalid": pool.invalid()
}
Optimize connection pool parameters
Create environment-specific configurations
Set up different connection pool parameters for development, testing, and production environments.
import os
from typing import Dict, Any
class DatabaseConfig:
def __init__(self, environment: str = "development"):
self.environment = environment
self.base_url = "postgresql+asyncpg://fastapi_user:SecurePass123!@localhost/fastapi_app"
def get_engine_config(self) -> Dict[str, Any]:
configs = {
"development": {
"pool_size": 5,
"max_overflow": 10,
"pool_timeout": 30,
"pool_recycle": 1800,
"echo": True
},
"testing": {
"pool_size": 2,
"max_overflow": 5,
"pool_timeout": 10,
"pool_recycle": 600,
"echo": False
},
"production": {
"pool_size": 20,
"max_overflow": 30,
"pool_timeout": 30,
"pool_recycle": 3600,
"pool_pre_ping": True,
"echo": False
}
}
return configs.get(self.environment, configs["development"])
Usage
ENVIRONMENT = os.getenv("ENVIRONMENT", "development")
db_config = DatabaseConfig(ENVIRONMENT)
engine_config = db_config.get_engine_config()
Add connection pool monitoring
Create middleware to monitor connection pool usage and detect potential issues before they impact performance.
import time
import logging
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from database import engine
logger = logging.getLogger(__name__)
class DatabasePoolMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# Log pool stats before request
pool = engine.pool
logger.info(
f"Pool stats before request: "
f"size={pool.size()}, checked_out={pool.checkedout()}, "
f"overflow={pool.overflow()}, invalid={pool.invalid()}"
)
response = await call_next(request)
# Log pool stats and timing after request
process_time = time.time() - start_time
logger.info(
f"Pool stats after request: "
f"size={pool.size()}, checked_out={pool.checkedout()}, "
f"process_time={process_time:.3f}s"
)
# Warn if pool utilization is high
utilization = pool.checkedout() / (pool.size() + pool.overflow())
if utilization > 0.8:
logger.warning(
f"High pool utilization: {utilization:.2%} "
f"({pool.checkedout()}/{pool.size() + pool.overflow()})"
)
response.headers["X-Process-Time"] = str(process_time)
return response
Add middleware to FastAPI app
Register the pool monitoring middleware to track connection usage across requests.
# Add these imports at the top
from middleware import DatabasePoolMiddleware
import logging
Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
Add middleware to the app
app.add_middleware(DatabasePoolMiddleware)
Add this after creating the FastAPI app
@app.on_event("shutdown")
async def shutdown_event():
await engine.dispose()
Create performance testing script
Build a script to test connection pool performance under concurrent load.
import asyncio
import aiohttp
import time
from typing import List
async def make_request(session: aiohttp.ClientSession, url: str) -> dict:
"""Make a single HTTP request and return timing info."""
start_time = time.time()
try:
async with session.get(url) as response:
await response.json()
return {
"success": True,
"status": response.status,
"duration": time.time() - start_time
}
except Exception as e:
return {
"success": False,
"error": str(e),
"duration": time.time() - start_time
}
async def run_load_test(
base_url: str = "http://localhost:8000",
concurrent_requests: int = 50,
total_requests: int = 1000
):
"""Run concurrent requests to test connection pool performance."""
connector = aiohttp.TCPConnector(
limit=concurrent_requests,
limit_per_host=concurrent_requests
)
async with aiohttp.ClientSession(connector=connector) as session:
# Test endpoints
endpoints = [
f"{base_url}/health",
f"{base_url}/users/?limit=10",
f"{base_url}/pool-stats"
]
tasks = []
start_time = time.time()
for i in range(total_requests):
endpoint = endpoints[i % len(endpoints)]
task = make_request(session, endpoint)
tasks.append(task)
# Add small delay to simulate realistic load
if i % concurrent_requests == 0:
await asyncio.sleep(0.1)
# Execute all requests
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
# Analyze results
successful_requests = [r for r in results if r["success"]]
failed_requests = [r for r in results if not r["success"]]
if successful_requests:
avg_duration = sum(r["duration"] for r in successful_requests) / len(successful_requests)
max_duration = max(r["duration"] for r in successful_requests)
min_duration = min(r["duration"] for r in successful_requests)
else:
avg_duration = max_duration = min_duration = 0
print(f"\n=== Load Test Results ===")
print(f"Total requests: {total_requests}")
print(f"Concurrent requests: {concurrent_requests}")
print(f"Total time: {total_time:.2f}s")
print(f"Requests per second: {total_requests / total_time:.2f}")
print(f"Successful requests: {len(successful_requests)}")
print(f"Failed requests: {len(failed_requests)}")
print(f"Success rate: {len(successful_requests) / total_requests:.1%}")
print(f"Average response time: {avg_duration:.3f}s")
print(f"Min response time: {min_duration:.3f}s")
print(f"Max response time: {max_duration:.3f}s")
if failed_requests:
print(f"\nFirst few errors:")
for error in failed_requests[:5]:
print(f" {error['error']}")
if __name__ == "__main__":
asyncio.run(run_load_test())
Start the FastAPI application
Run the FastAPI application with optimized uvicorn settings for production.
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 --loop uvloop --http httptools
Verify your setup
Test the FastAPI application and connection pool configuration to ensure everything works correctly.
# Test basic connectivity
curl http://localhost:8000/health
Check connection pool stats
curl http://localhost:8000/pool-stats
Create a test user
curl -X POST "http://localhost:8000/users/" \
-H "Content-Type: application/json" \
-d '{"username": "testuser", "email": "test@example.com", "password": "testpass"}'
Get users list
curl http://localhost:8000/users/
Run performance test
python test_performance.py
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| "QueuePool limit exceeded" | Too many concurrent connections | Increase pool_size and max_overflow or reduce concurrent requests |
| Slow database responses | Connection pool exhaustion | Monitor pool stats and tune pool_timeout and pool_size |
| "asyncpg.exceptions.TooManyConnectionsError" | PostgreSQL max_connections exceeded | Increase PostgreSQL max_connections or reduce application pool size |
| Memory usage keeps growing | Connections not being returned | Ensure proper session handling with try/finally blocks |
| "SSL connection lost" errors | Long-lived connections timing out | Reduce pool_recycle time or enable pool_pre_ping |
| High CPU usage in database | Too many short-lived connections | Increase pool_size to improve connection reuse |
Next steps
- Set up PostgreSQL streaming replication with PgBouncer for high availability
- Monitor PostgreSQL performance with Prometheus and Grafana for production insights
- Setup Gunicorn blue-green deployment with NGINX for zero-downtime updates
- Configure FastAPI authentication with JWT and OAuth2 for secure APIs
- Implement FastAPI caching with Redis optimization for improved performance
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# FastAPI + PostgreSQL 17 Connection Pooling Setup Script
# Production-quality installation with auto-distro detection
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Configuration variables
DB_NAME="${1:-fastapi_app}"
DB_USER="${2:-fastapi_user}"
DB_PASS="${3:-$(openssl rand -base64 32)}"
APP_USER="${4:-fastapi}"
# Print usage if wrong number of args
if [[ $# -gt 4 ]]; then
echo "Usage: $0 [db_name] [db_user] [db_password] [app_user]"
echo "Example: $0 myapp myuser mypass webapp"
exit 1
fi
# Logging functions
log_info() { echo -e "${GREEN}[INFO]${NC} $1"; }
log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; }
log_error() { echo -e "${RED}[ERROR]${NC} $1"; }
# Cleanup function for rollback
cleanup() {
if [[ $? -ne 0 ]]; then
log_error "Installation failed. Cleaning up..."
systemctl stop postgresql* 2>/dev/null || true
userdel -r "$APP_USER" 2>/dev/null || true
fi
}
trap cleanup ERR
# Check if running as root or with sudo
if [[ $EUID -eq 0 ]]; then
SUDO=""
else
if ! command -v sudo &> /dev/null; then
log_error "This script requires sudo privileges"
exit 1
fi
SUDO="sudo"
fi
# Auto-detect distribution
if [[ -f /etc/os-release ]]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update && apt upgrade -y"
PG_SERVICE="postgresql"
PG_CONFIG_DIR="/etc/postgresql/17/main"
PG_DATA_DIR="/var/lib/postgresql/17/main"
;;
almalinux|rocky|centos|rhel|ol)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
PG_SERVICE="postgresql-17"
PG_CONFIG_DIR="/var/lib/pgsql/17/data"
PG_DATA_DIR="/var/lib/pgsql/17/data"
;;
fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
PG_SERVICE="postgresql"
PG_CONFIG_DIR="/var/lib/pgsql/data"
PG_DATA_DIR="/var/lib/pgsql/data"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
PG_SERVICE="postgresql"
PG_CONFIG_DIR="/var/lib/pgsql/data"
PG_DATA_DIR="/var/lib/pgsql/data"
;;
*)
log_error "Unsupported distribution: $ID"
exit 1
;;
esac
else
log_error "Cannot detect distribution"
exit 1
fi
log_info "Detected distribution: $PRETTY_NAME"
log_info "Using package manager: $PKG_MGR"
# Step 1: Update system packages
echo "[1/8] Updating system packages..."
$SUDO $PKG_UPDATE
# Step 2: Install PostgreSQL 17
echo "[2/8] Installing PostgreSQL 17..."
case "$ID" in
ubuntu|debian)
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | $SUDO apt-key add -
echo "deb http://apt.postgresql.org/pub/repos/apt/ $(lsb_release -cs)-pgdg main" | $SUDO tee /etc/apt/sources.list.d/pgdg.list
$SUDO apt update
$SUDO $PKG_INSTALL postgresql-17 postgresql-17-dev python3-dev build-essential python3-venv
;;
almalinux|rocky|centos|rhel|ol)
$SUDO $PKG_INSTALL https://download.postgresql.org/pub/repos/yum/reporpms/EL-9-x86_64/pgdg-redhat-repo-latest.noarch.rpm
$SUDO $PKG_INSTALL postgresql17-server postgresql17-devel python3-devel gcc python3-pip
;;
fedora)
$SUDO $PKG_INSTALL postgresql-server postgresql-devel python3-devel gcc python3-pip
;;
amzn)
$SUDO amazon-linux-extras install postgresql13
$SUDO $PKG_INSTALL postgresql-server postgresql-devel python3-devel gcc python3-pip
;;
esac
# Step 3: Initialize and start PostgreSQL
echo "[3/8] Initializing and starting PostgreSQL..."
case "$ID" in
ubuntu|debian)
$SUDO systemctl enable --now postgresql
;;
almalinux|rocky|centos|rhel|ol)
$SUDO /usr/pgsql-17/bin/postgresql-17-setup initdb
$SUDO systemctl enable --now postgresql-17
;;
*)
$SUDO postgresql-setup --initdb
$SUDO systemctl enable --now postgresql
;;
esac
sleep 3
$SUDO systemctl status $PG_SERVICE --no-pager
# Step 4: Create application user
echo "[4/8] Creating application user..."
if ! id "$APP_USER" &>/dev/null; then
$SUDO useradd -m -s /bin/bash "$APP_USER"
log_info "Created user: $APP_USER"
fi
# Step 5: Create database and user
echo "[5/8] Creating database and user..."
$SUDO -u postgres psql -c "CREATE DATABASE $DB_NAME;" 2>/dev/null || log_warn "Database $DB_NAME might already exist"
$SUDO -u postgres psql -c "CREATE USER $DB_USER WITH ENCRYPTED PASSWORD '$DB_PASS';" 2>/dev/null || log_warn "User $DB_USER might already exist"
$SUDO -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE $DB_NAME TO $DB_USER;"
$SUDO -u postgres psql -c "ALTER USER $DB_USER CREATEDB;"
# Step 6: Configure PostgreSQL for connection pooling
echo "[6/8] Configuring PostgreSQL for connection pooling..."
PG_CONF="$PG_CONFIG_DIR/postgresql.conf"
if [[ -f "$PG_CONF" ]]; then
$SUDO cp "$PG_CONF" "$PG_CONF.bak"
# Update PostgreSQL configuration
$SUDO tee -a "$PG_CONF" > /dev/null << EOF
# Connection pooling optimizations
max_connections = 200
shared_buffers = 256MB
effective_cache_size = 1GB
maintenance_work_mem = 64MB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100
random_page_cost = 1.1
effective_io_concurrency = 200
work_mem = 4MB
min_wal_size = 1GB
max_wal_size = 4GB
EOF
$SUDO systemctl restart $PG_SERVICE
else
log_warn "PostgreSQL config file not found at $PG_CONF"
fi
# Step 7: Install Python dependencies
echo "[7/8] Installing Python dependencies..."
$SUDO -u "$APP_USER" bash -c "
cd /home/$APP_USER
python3 -m venv fastapi_env
source fastapi_env/bin/activate
pip install --upgrade pip
pip install fastapi[all] sqlalchemy[asyncio] asyncpg psycopg2-binary alembic uvicorn
"
# Create database configuration
$SUDO -u "$APP_USER" tee /home/$APP_USER/database.py > /dev/null << EOF
import os
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
# Database configuration
DATABASE_URL = "postgresql+asyncpg://$DB_USER:$DB_PASS@localhost/$DB_NAME"
# Create async engine with connection pooling
engine = create_async_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20,
max_overflow=30,
pool_pre_ping=True,
pool_recycle=3600,
echo=False
)
# Create async session maker
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
Base = declarative_base()
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
EOF
$SUDO chown "$APP_USER:$APP_USER" /home/$APP_USER/database.py
$SUDO chmod 644 /home/$APP_USER/database.py
# Step 8: Verification
echo "[8/8] Running verification checks..."
# Test PostgreSQL connection
if $SUDO -u postgres psql -d "$DB_NAME" -c "SELECT version();" &>/dev/null; then
log_info "PostgreSQL connection: OK"
else
log_error "PostgreSQL connection: FAILED"
exit 1
fi
# Test Python environment
if $SUDO -u "$APP_USER" bash -c "cd /home/$APP_USER && source fastapi_env/bin/activate && python -c 'import fastapi, sqlalchemy, asyncpg'"; then
log_info "Python dependencies: OK"
else
log_error "Python dependencies: FAILED"
exit 1
fi
# Print summary
log_info "Installation completed successfully!"
echo ""
echo "Database Details:"
echo " Database: $DB_NAME"
echo " User: $DB_USER"
echo " Password: $DB_PASS"
echo " Connection URL: postgresql+asyncpg://$DB_USER:***@localhost/$DB_NAME"
echo ""
echo "Application user: $APP_USER"
echo "Virtual environment: /home/$APP_USER/fastapi_env"
echo "Database config: /home/$APP_USER/database.py"
echo ""
log_warn "Please save the database password securely!"
Review the script before running. Execute with: bash install.sh