Configure FastAPI database connection pooling with PostgreSQL 17 for high-performance applications

Intermediate 25 min Apr 29, 2026 175 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Set up optimized async database connection pooling in FastAPI using SQLAlchemy and asyncpg with PostgreSQL 17. Learn connection pool tuning, session management patterns, and performance optimization for production workloads.

Prerequisites

  • Python 3.8+
  • PostgreSQL 17
  • Root access
  • 4GB RAM minimum

What this solves

FastAPI applications need efficient database connections to handle high traffic without exhausting PostgreSQL resources. Connection pooling reuses database connections instead of creating new ones for each request, dramatically improving performance and reducing server load. This tutorial configures SQLAlchemy async connection pooling with asyncpg for production FastAPI applications.

Step-by-step installation

Update system packages

Start by updating your package manager to ensure you have the latest package information and security updates.

sudo apt update && sudo apt upgrade -y
sudo dnf update -y

Install PostgreSQL 17

Install PostgreSQL 17 server and development headers needed for Python database drivers.

wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -
echo "deb http://apt.postgresql.org/pub/repos/apt/ $(lsb_release -cs)-pgdg main" | sudo tee /etc/apt/sources.list.d/pgdg.list
sudo apt update
sudo apt install -y postgresql-17 postgresql-17-dev python3-dev build-essential
sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-9-x86_64/pgdg-redhat-repo-latest.noarch.rpm
sudo dnf install -y postgresql17-server postgresql17-devel python3-devel gcc

Initialize and start PostgreSQL

Initialize the PostgreSQL database cluster and start the service.

sudo systemctl enable --now postgresql
sudo systemctl status postgresql
sudo /usr/pgsql-17/bin/postgresql-17-setup initdb
sudo systemctl enable --now postgresql-17
sudo systemctl status postgresql-17

Create application database and user

Create a dedicated database and user for your FastAPI application with appropriate permissions.

sudo -u postgres psql -c "CREATE DATABASE fastapi_app;"
sudo -u postgres psql -c "CREATE USER fastapi_user WITH ENCRYPTED PASSWORD 'SecurePass123!';"
sudo -u postgres psql -c "GRANT ALL PRIVILEGES ON DATABASE fastapi_app TO fastapi_user;"
sudo -u postgres psql -c "ALTER USER fastapi_user CREATEDB;"

Configure PostgreSQL for connection pooling

Optimize PostgreSQL settings for connection pooling and performance. These settings allow more concurrent connections and improve memory usage.

max_connections = 200
shared_buffers = 256MB
effective_cache_size = 1GB
maintenance_work_mem = 64MB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100
random_page_cost = 1.1
effective_io_concurrency = 200
work_mem = 4MB
min_wal_size = 1GB
max_wal_size = 4GB

Restart PostgreSQL

Restart PostgreSQL to apply the configuration changes.

sudo systemctl restart postgresql
sudo systemctl restart postgresql-17

Install Python dependencies

Create a virtual environment and install FastAPI, SQLAlchemy, and asyncpg for async database operations.

python3 -m venv fastapi_env
source fastapi_env/bin/activate
pip install --upgrade pip
pip install fastapi[all] sqlalchemy[asyncio] asyncpg psycopg2-binary alembic

Configure SQLAlchemy connection pooling

Create database configuration module

Set up the database configuration with optimized connection pool settings. The QueuePool manages connection reuse efficiently.

import os
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
from sqlalchemy import event
from sqlalchemy.engine import Engine

Database configuration

DATABASE_URL = "postgresql+asyncpg://fastapi_user:SecurePass123!@localhost/fastapi_app"

Create async engine with connection pooling

engine = create_async_engine( DATABASE_URL, poolclass=QueuePool, pool_size=20, # Number of connections to maintain in pool max_overflow=30, # Additional connections beyond pool_size pool_pre_ping=True, # Validate connections before use pool_recycle=3600, # Recreate connections after 1 hour pool_timeout=30, # Timeout when getting connection from pool echo=False, # Set to True for SQL debugging future=True )

Create async session factory

AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, autoflush=False, autocommit=False ) Base = declarative_base()

Database dependency for FastAPI

async def get_database_session(): async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close()

Create database models

Define SQLAlchemy models for your application. This example creates a User model with optimized indexing.

from sqlalchemy import Column, Integer, String, DateTime, Boolean, Index
from sqlalchemy.sql import func
from database import Base

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(255), unique=True, index=True, nullable=False)
    username = Column(String(100), unique=True, index=True, nullable=False)
    hashed_password = Column(String(255), nullable=False)
    is_active = Column(Boolean, default=True, nullable=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
    
    # Composite index for common queries
    __table_args__ = (
        Index('idx_user_email_active', 'email', 'is_active'),
        Index('idx_user_username_active', 'username', 'is_active'),
    )

Create FastAPI application with async endpoints

Build a FastAPI application that uses the async database connection pool. This example shows proper async/await patterns.

from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from sqlalchemy.exc import IntegrityError
from typing import List
import asyncio
from pydantic import BaseModel, EmailStr

from database import get_database_session, engine
from models import User, Base

app = FastAPI(title="FastAPI Connection Pooling Demo")

Pydantic models

class UserCreate(BaseModel): username: str email: EmailStr password: str class UserResponse(BaseModel): id: int username: str email: str is_active: bool class Config: from_attributes = True

Create tables on startup

@app.on_event("startup") async def create_tables(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all)

Health check endpoint

@app.get("/health") async def health_check(db: AsyncSession = Depends(get_database_session)): try: # Test database connectivity result = await db.execute(select(func.count()).select_from(User)) user_count = result.scalar() return { "status": "healthy", "database": "connected", "total_users": user_count } except Exception as e: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Database connection failed: {str(e)}" )

Create user endpoint

@app.post("/users/", response_model=UserResponse) async def create_user( user: UserCreate, db: AsyncSession = Depends(get_database_session) ): try: # Hash password in production hashed_password = f"hashed_{user.password}" db_user = User( username=user.username, email=user.email, hashed_password=hashed_password ) db.add(db_user) await db.flush() # Get the ID without committing await db.refresh(db_user) return db_user except IntegrityError: await db.rollback() raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="User with this email or username already exists" )

Get users with pagination

@app.get("/users/", response_model=List[UserResponse]) async def get_users( skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_database_session) ): query = select(User).where(User.is_active == True).offset(skip).limit(limit) result = await db.execute(query) users = result.scalars().all() return users

Get user by ID

@app.get("/users/{user_id}", response_model=UserResponse) async def get_user( user_id: int, db: AsyncSession = Depends(get_database_session) ): query = select(User).where(User.id == user_id, User.is_active == True) result = await db.execute(query) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return user

Connection pool stats endpoint

@app.get("/pool-stats") async def get_pool_stats(): pool = engine.pool return { "pool_size": pool.size(), "checked_in": pool.checkedin(), "checked_out": pool.checkedout(), "overflow": pool.overflow(), "invalid": pool.invalid() }

Optimize connection pool parameters

Create environment-specific configurations

Set up different connection pool parameters for development, testing, and production environments.

import os
from typing import Dict, Any

class DatabaseConfig:
    def __init__(self, environment: str = "development"):
        self.environment = environment
        self.base_url = "postgresql+asyncpg://fastapi_user:SecurePass123!@localhost/fastapi_app"
        
    def get_engine_config(self) -> Dict[str, Any]:
        configs = {
            "development": {
                "pool_size": 5,
                "max_overflow": 10,
                "pool_timeout": 30,
                "pool_recycle": 1800,
                "echo": True
            },
            "testing": {
                "pool_size": 2,
                "max_overflow": 5,
                "pool_timeout": 10,
                "pool_recycle": 600,
                "echo": False
            },
            "production": {
                "pool_size": 20,
                "max_overflow": 30,
                "pool_timeout": 30,
                "pool_recycle": 3600,
                "pool_pre_ping": True,
                "echo": False
            }
        }
        return configs.get(self.environment, configs["development"])

Usage

ENVIRONMENT = os.getenv("ENVIRONMENT", "development") db_config = DatabaseConfig(ENVIRONMENT) engine_config = db_config.get_engine_config()

Add connection pool monitoring

Create middleware to monitor connection pool usage and detect potential issues before they impact performance.

import time
import logging
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from database import engine

logger = logging.getLogger(__name__)

class DatabasePoolMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start_time = time.time()
        
        # Log pool stats before request
        pool = engine.pool
        logger.info(
            f"Pool stats before request: "
            f"size={pool.size()}, checked_out={pool.checkedout()}, "
            f"overflow={pool.overflow()}, invalid={pool.invalid()}"
        )
        
        response = await call_next(request)
        
        # Log pool stats and timing after request
        process_time = time.time() - start_time
        logger.info(
            f"Pool stats after request: "
            f"size={pool.size()}, checked_out={pool.checkedout()}, "
            f"process_time={process_time:.3f}s"
        )
        
        # Warn if pool utilization is high
        utilization = pool.checkedout() / (pool.size() + pool.overflow())
        if utilization > 0.8:
            logger.warning(
                f"High pool utilization: {utilization:.2%} "
                f"({pool.checkedout()}/{pool.size() + pool.overflow()})"
            )
        
        response.headers["X-Process-Time"] = str(process_time)
        return response

Add middleware to FastAPI app

Register the pool monitoring middleware to track connection usage across requests.

# Add these imports at the top
from middleware import DatabasePoolMiddleware
import logging

Configure logging

logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" )

Add middleware to the app

app.add_middleware(DatabasePoolMiddleware)

Add this after creating the FastAPI app

@app.on_event("shutdown") async def shutdown_event(): await engine.dispose()

Create performance testing script

Build a script to test connection pool performance under concurrent load.

import asyncio
import aiohttp
import time
from typing import List

async def make_request(session: aiohttp.ClientSession, url: str) -> dict:
    """Make a single HTTP request and return timing info."""
    start_time = time.time()
    try:
        async with session.get(url) as response:
            await response.json()
            return {
                "success": True,
                "status": response.status,
                "duration": time.time() - start_time
            }
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "duration": time.time() - start_time
        }

async def run_load_test(
    base_url: str = "http://localhost:8000",
    concurrent_requests: int = 50,
    total_requests: int = 1000
):
    """Run concurrent requests to test connection pool performance."""
    
    connector = aiohttp.TCPConnector(
        limit=concurrent_requests,
        limit_per_host=concurrent_requests
    )
    
    async with aiohttp.ClientSession(connector=connector) as session:
        # Test endpoints
        endpoints = [
            f"{base_url}/health",
            f"{base_url}/users/?limit=10",
            f"{base_url}/pool-stats"
        ]
        
        tasks = []
        start_time = time.time()
        
        for i in range(total_requests):
            endpoint = endpoints[i % len(endpoints)]
            task = make_request(session, endpoint)
            tasks.append(task)
            
            # Add small delay to simulate realistic load
            if i % concurrent_requests == 0:
                await asyncio.sleep(0.1)
        
        # Execute all requests
        results = await asyncio.gather(*tasks)
        total_time = time.time() - start_time
        
        # Analyze results
        successful_requests = [r for r in results if r["success"]]
        failed_requests = [r for r in results if not r["success"]]
        
        if successful_requests:
            avg_duration = sum(r["duration"] for r in successful_requests) / len(successful_requests)
            max_duration = max(r["duration"] for r in successful_requests)
            min_duration = min(r["duration"] for r in successful_requests)
        else:
            avg_duration = max_duration = min_duration = 0
        
        print(f"\n=== Load Test Results ===")
        print(f"Total requests: {total_requests}")
        print(f"Concurrent requests: {concurrent_requests}")
        print(f"Total time: {total_time:.2f}s")
        print(f"Requests per second: {total_requests / total_time:.2f}")
        print(f"Successful requests: {len(successful_requests)}")
        print(f"Failed requests: {len(failed_requests)}")
        print(f"Success rate: {len(successful_requests) / total_requests:.1%}")
        print(f"Average response time: {avg_duration:.3f}s")
        print(f"Min response time: {min_duration:.3f}s")
        print(f"Max response time: {max_duration:.3f}s")
        
        if failed_requests:
            print(f"\nFirst few errors:")
            for error in failed_requests[:5]:
                print(f"  {error['error']}")

if __name__ == "__main__":
    asyncio.run(run_load_test())

Start the FastAPI application

Run the FastAPI application with optimized uvicorn settings for production.

uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 --loop uvloop --http httptools

Verify your setup

Test the FastAPI application and connection pool configuration to ensure everything works correctly.

# Test basic connectivity
curl http://localhost:8000/health

Check connection pool stats

curl http://localhost:8000/pool-stats

Create a test user

curl -X POST "http://localhost:8000/users/" \ -H "Content-Type: application/json" \ -d '{"username": "testuser", "email": "test@example.com", "password": "testpass"}'

Get users list

curl http://localhost:8000/users/

Run performance test

python test_performance.py
Note: The connection pool is working correctly if you see consistent response times under load and the pool stats show efficient connection reuse without exhaustion.

Common issues

SymptomCauseFix
"QueuePool limit exceeded"Too many concurrent connectionsIncrease pool_size and max_overflow or reduce concurrent requests
Slow database responsesConnection pool exhaustionMonitor pool stats and tune pool_timeout and pool_size
"asyncpg.exceptions.TooManyConnectionsError"PostgreSQL max_connections exceededIncrease PostgreSQL max_connections or reduce application pool size
Memory usage keeps growingConnections not being returnedEnsure proper session handling with try/finally blocks
"SSL connection lost" errorsLong-lived connections timing outReduce pool_recycle time or enable pool_pre_ping
High CPU usage in databaseToo many short-lived connectionsIncrease pool_size to improve connection reuse
Production considerations: Monitor connection pool metrics in production. High pool utilization (>80%) or frequent overflow usage indicates the need for pool tuning. Consider implementing PgBouncer connection pooling for additional connection management.

Next steps

Running this in production?

Want this handled for you? Setting this up once is straightforward. Keeping it patched, monitored, backed up and performant across environments is the harder part. See how we run infrastructure like this for European teams.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle high availability infrastructure for businesses that depend on uptime. From initial setup to ongoing operations.