Setup DuckDB with Apache Airflow for automated data pipelines

Intermediate 45 min May 11, 2026 66 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Configure DuckDB as a high-performance analytical database backend for Apache Airflow workflows. Build automated data pipelines that process files, APIs, and databases using DuckDB's columnar engine.

Prerequisites

  • Root or sudo access
  • Python 3.8 or higher
  • 4GB RAM minimum
  • 10GB free disk space

What this solves

DuckDB provides a fast, embedded analytical database that works perfectly with Apache Airflow for data pipeline automation. This setup gives you columnar analytics on CSV files, Parquet data, and database connections without managing a separate database cluster. You'll configure the DuckDB provider for Airflow and build DAGs that automate data ingestion, transformation, and analysis.

Step-by-step installation

Update system packages

Start by updating your package manager to ensure you get the latest versions of Python and system libraries.

sudo apt update && sudo apt upgrade -y
sudo apt install -y python3 python3-pip python3-venv build-essential
sudo dnf update -y
sudo dnf install -y python3 python3-pip python3-devel gcc gcc-c++ make

Create dedicated user for Airflow

Run Airflow as a dedicated user for better security isolation and file permission management.

sudo useradd -m -s /bin/bash airflow
sudo usermod -aG sudo airflow
sudo su - airflow

Create Python virtual environment

Isolate Airflow and DuckDB dependencies from the system Python installation to avoid package conflicts.

python3 -m venv ~/airflow-venv
source ~/airflow-venv/bin/activate
pip install --upgrade pip setuptools wheel

Install Apache Airflow with DuckDB provider

Install Airflow with the DuckDB provider package and required dependencies for data pipeline operations.

export AIRFLOW_VERSION=2.8.1
export PYTHON_VERSION="$(python --version | cut -d" " -f 2 | cut -d"." -f 1-2)"
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install apache-airflow-providers-duckdb
pip install duckdb pandas pyarrow

Initialize Airflow database

Set up the Airflow metadata database and create the default admin user for the web interface.

export AIRFLOW_HOME=~/airflow
airflow db init
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com \
    --password SecurePassword123

Configure Airflow settings

Optimize Airflow configuration for DuckDB workflows and enable parallel task execution.

[core]
executor = LocalExecutor
max_active_runs_per_dag = 3
max_active_tasks_per_dag = 8
parallelism = 16

[scheduler]
dag_dir_list_interval = 300
max_threads = 2

[webserver]
expose_config = False
web_server_port = 8080
base_url = http://localhost:8080

Create DuckDB connection in Airflow

Configure a reusable DuckDB connection that your DAGs can reference for database operations.

mkdir -p ~/airflow/dags ~/airflow/logs ~/airflow/plugins ~/data
airflow connections add 'duckdb_default' \
    --conn-type 'duckdb' \
    --conn-host '/home/airflow/data/analytics.duckdb' \
    --conn-description 'Default DuckDB connection for analytics'

Create systemd service for Airflow scheduler

Set up Airflow scheduler to run automatically as a system service with proper logging and restart policies.

sudo tee /etc/systemd/system/airflow-scheduler.service > /dev/null << 'EOF'
[Unit]
Description=Airflow Scheduler
After=network.target
Wants=network.target

[Service]
User=airflow
Group=airflow
Type=simple
ExecStart=/home/airflow/airflow-venv/bin/airflow scheduler
Environment=AIRFLOW_HOME=/home/airflow/airflow
Restart=always
RestartSec=10
WorkingDirectory=/home/airflow
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target
EOF

Create systemd service for Airflow webserver

Set up the Airflow web interface as a system service for DAG monitoring and management.

sudo tee /etc/systemd/system/airflow-webserver.service > /dev/null << 'EOF'
[Unit]
Description=Airflow Webserver
After=network.target
Wants=network.target

[Service]
User=airflow
Group=airflow
Type=simple
ExecStart=/home/airflow/airflow-venv/bin/airflow webserver --port 8080
Environment=AIRFLOW_HOME=/home/airflow/airflow
Restart=always
RestartSec=10
WorkingDirectory=/home/airflow
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target
EOF

Enable and start Airflow services

Start both Airflow services and enable them to start automatically on system boot.

sudo systemctl daemon-reload
sudo systemctl enable --now airflow-scheduler airflow-webserver
sudo systemctl status airflow-scheduler airflow-webserver

Create your first DuckDB data pipeline

Create sample data processing DAG

Build a complete data pipeline that demonstrates DuckDB's capabilities with file processing, data transformation, and analytics.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.duckdb.operators.duckdb import DuckDBOperator
from airflow.operators.python import PythonOperator
from airflow.providers.duckdb.hooks.duckdb import DuckDBHook
import pandas as pd
import os

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'duckdb_analytics_pipeline',
    default_args=default_args,
    description='DuckDB data analytics pipeline',
    schedule_interval=timedelta(hours=6),
    catchup=False,
    max_active_runs=1
)

def generate_sample_data(**context):
    """Generate sample sales data for processing"""
    import random
    from datetime import datetime, timedelta
    
    # Create sample data
    data = []
    base_date = datetime.now() - timedelta(days=30)
    
    for i in range(1000):
        data.append({
            'order_id': f'ORD-{i:06d}',
            'customer_id': f'CUST-{random.randint(1, 100):03d}',
            'product_id': f'PROD-{random.randint(1, 50):03d}',
            'quantity': random.randint(1, 10),
            'price': round(random.uniform(10, 500), 2),
            'order_date': (base_date + timedelta(days=random.randint(0, 30))).strftime('%Y-%m-%d'),
            'region': random.choice(['North', 'South', 'East', 'West'])
        })
    
    df = pd.DataFrame(data)
    os.makedirs('/home/airflow/data/raw', exist_ok=True)
    df.to_csv('/home/airflow/data/raw/sales_data.csv', index=False)
    print(f"Generated {len(data)} sales records")

generate_data = PythonOperator(
    task_id='generate_sample_data',
    python_callable=generate_sample_data,
    dag=dag
)

create_tables = DuckDBOperator(
    task_id='create_analytics_tables',
    duckdb_conn_id='duckdb_default',
    sql="""
        -- Create raw sales table
        CREATE TABLE IF NOT EXISTS raw_sales (
            order_id VARCHAR,
            customer_id VARCHAR,
            product_id VARCHAR,
            quantity INTEGER,
            price DECIMAL(10,2),
            order_date DATE,
            region VARCHAR
        );
        
        -- Create aggregated daily sales table
        CREATE TABLE IF NOT EXISTS daily_sales_summary (
            order_date DATE,
            region VARCHAR,
            total_orders INTEGER,
            total_revenue DECIMAL(12,2),
            avg_order_value DECIMAL(10,2),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
    """,
    dag=dag
)

load_data = DuckDBOperator(
    task_id='load_sales_data',
    duckdb_conn_id='duckdb_default',
    sql="""
        -- Clear existing data
        DELETE FROM raw_sales;
        
        -- Load CSV data into DuckDB
        INSERT INTO raw_sales 
        SELECT * FROM read_csv_auto('/home/airflow/data/raw/sales_data.csv');
        
        SELECT COUNT(*) as loaded_records FROM raw_sales;
    """,
    dag=dag
)

analyze_data = DuckDBOperator(
    task_id='analyze_sales_data',
    duckdb_conn_id='duckdb_default',
    sql="""
        -- Clear previous analysis
        DELETE FROM daily_sales_summary;
        
        -- Generate daily sales analytics
        INSERT INTO daily_sales_summary (order_date, region, total_orders, total_revenue, avg_order_value)
        SELECT 
            order_date,
            region,
            COUNT(*) as total_orders,
            ROUND(SUM(quantity * price), 2) as total_revenue,
            ROUND(AVG(quantity * price), 2) as avg_order_value
        FROM raw_sales
        GROUP BY order_date, region
        ORDER BY order_date DESC, region;
        
        -- Show top performing regions
        SELECT 
            region,
            SUM(total_orders) as total_orders,
            ROUND(SUM(total_revenue), 2) as total_revenue
        FROM daily_sales_summary
        GROUP BY region
        ORDER BY total_revenue DESC;
    """,
    dag=dag
)

def export_results(**context):
    """Export analysis results to files"""
    hook = DuckDBHook(duckdb_conn_id='duckdb_default')
    
    # Export daily summary
    daily_results = hook.get_pandas_df(
        "SELECT * FROM daily_sales_summary ORDER BY order_date DESC"
    )
    
    # Export regional summary
    regional_results = hook.get_pandas_df("""
        SELECT 
            region,
            SUM(total_orders) as total_orders,
            ROUND(SUM(total_revenue), 2) as total_revenue,
            ROUND(AVG(avg_order_value), 2) as avg_order_value
        FROM daily_sales_summary
        GROUP BY region
        ORDER BY total_revenue DESC
    """)
    
    os.makedirs('/home/airflow/data/reports', exist_ok=True)
    daily_results.to_csv('/home/airflow/data/reports/daily_sales.csv', index=False)
    regional_results.to_csv('/home/airflow/data/reports/regional_summary.csv', index=False)
    
    print(f"Exported {len(daily_results)} daily records and {len(regional_results)} regional summaries")

export_data = PythonOperator(
    task_id='export_analysis_results',
    python_callable=export_results,
    dag=dag
)

Set task dependencies

generate_data >> create_tables >> load_data >> analyze_data >> export_data

Create advanced DuckDB analytics DAG

Build a more complex pipeline that demonstrates DuckDB's advanced features like JSON processing, window functions, and data export.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.duckdb.operators.duckdb import DuckDBOperator
from airflow.operators.python import PythonOperator
from airflow.providers.duckdb.hooks.duckdb import DuckDBHook
import json
import os

default_args = {
    'owner': 'analytics-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=3)
}

dag = DAG(
    'duckdb_advanced_analytics',
    default_args=default_args,
    description='Advanced DuckDB analytics with JSON and time-series',
    schedule_interval=timedelta(hours=12),
    catchup=False
)

def create_json_data(**context):
    """Generate sample JSON event data"""
    import random
    from datetime import datetime, timedelta
    
    events = []
    base_time = datetime.now() - timedelta(hours=24)
    
    for i in range(500):
        event = {
            'event_id': f'evt_{i:06d}',
            'timestamp': (base_time + timedelta(minutes=random.randint(0, 1440))).isoformat(),
            'user_id': f'user_{random.randint(1, 100):03d}',
            'event_type': random.choice(['page_view', 'click', 'purchase', 'signup']),
            'properties': {
                'page': random.choice(['/home', '/products', '/cart', '/checkout']),
                'browser': random.choice(['chrome', 'firefox', 'safari', 'edge']),
                'device': random.choice(['desktop', 'mobile', 'tablet']),
                'value': round(random.uniform(0, 100), 2) if random.random() > 0.7 else None
            },
            'session_id': f'sess_{random.randint(1, 50):03d}'
        }
        events.append(event)
    
    os.makedirs('/home/airflow/data/events', exist_ok=True)
    with open('/home/airflow/data/events/user_events.json', 'w') as f:
        for event in events:
            f.write(json.dumps(event) + '\n')
    
    print(f"Generated {len(events)} event records")

generate_events = PythonOperator(
    task_id='generate_event_data',
    python_callable=create_json_data,
    dag=dag
)

create_event_tables = DuckDBOperator(
    task_id='create_event_tables',
    duckdb_conn_id='duckdb_default',
    sql="""
        -- Install and load JSON extension
        INSTALL json;
        LOAD json;
        
        -- Create events table
        CREATE TABLE IF NOT EXISTS user_events (
            event_id VARCHAR,
            timestamp TIMESTAMP,
            user_id VARCHAR,
            event_type VARCHAR,
            page VARCHAR,
            browser VARCHAR,
            device VARCHAR,
            value DECIMAL(10,2),
            session_id VARCHAR
        );
        
        -- Create hourly analytics table
        CREATE TABLE IF NOT EXISTS hourly_analytics (
            hour_bucket TIMESTAMP,
            event_type VARCHAR,
            device VARCHAR,
            event_count INTEGER,
            unique_users INTEGER,
            total_value DECIMAL(12,2),
            avg_value DECIMAL(10,2)
        );
        
        -- Create user session analytics
        CREATE TABLE IF NOT EXISTS session_analytics (
            session_id VARCHAR,
            user_id VARCHAR,
            session_start TIMESTAMP,
            session_end TIMESTAMP,
            session_duration_minutes INTEGER,
            page_views INTEGER,
            total_events INTEGER,
            conversion_value DECIMAL(10,2)
        );
    """,
    dag=dag
)

process_json_events = DuckDBOperator(
    task_id='process_json_events',
    duckdb_conn_id='duckdb_default',
    sql="""
        -- Clear existing event data
        DELETE FROM user_events;
        
        -- Load and parse JSON events
        INSERT INTO user_events
        SELECT 
            json_extract_string(json_data, '$.event_id') as event_id,
            STRPTIME(json_extract_string(json_data, '$.timestamp'), '%Y-%m-%dT%H:%M:%S.%f') as timestamp,
            json_extract_string(json_data, '$.user_id') as user_id,
            json_extract_string(json_data, '$.event_type') as event_type,
            json_extract_string(json_data, '$.properties.page') as page,
            json_extract_string(json_data, '$.properties.browser') as browser,
            json_extract_string(json_data, '$.properties.device') as device,
            CAST(json_extract_string(json_data, '$.properties.value') AS DECIMAL(10,2)) as value,
            json_extract_string(json_data, '$.session_id') as session_id
        FROM (
            SELECT json(line) as json_data 
            FROM read_text('/home/airflow/data/events/user_events.json')
        ) WHERE json_data IS NOT NULL;
        
        SELECT COUNT(*) as loaded_events FROM user_events;
    """,
    dag=dag
)

generate_hourly_analytics = DuckDBOperator(
    task_id='generate_hourly_analytics',
    duckdb_conn_id='duckdb_default',
    sql="""
        DELETE FROM hourly_analytics;
        
        INSERT INTO hourly_analytics
        SELECT 
            DATE_TRUNC('hour', timestamp) as hour_bucket,
            event_type,
            device,
            COUNT(*) as event_count,
            COUNT(DISTINCT user_id) as unique_users,
            COALESCE(SUM(value), 0) as total_value,
            ROUND(AVG(value), 2) as avg_value
        FROM user_events
        GROUP BY DATE_TRUNC('hour', timestamp), event_type, device
        ORDER BY hour_bucket DESC, event_count DESC;
        
        -- Show top analytics
        SELECT 
            hour_bucket,
            SUM(event_count) as total_events,
            SUM(unique_users) as total_unique_users,
            ROUND(SUM(total_value), 2) as total_revenue
        FROM hourly_analytics
        GROUP BY hour_bucket
        ORDER BY hour_bucket DESC
        LIMIT 10;
    """,
    dag=dag
)

analyze_user_sessions = DuckDBOperator(
    task_id='analyze_user_sessions',
    duckdb_conn_id='duckdb_default',
    sql="""
        DELETE FROM session_analytics;
        
        INSERT INTO session_analytics
        SELECT 
            session_id,
            user_id,
            MIN(timestamp) as session_start,
            MAX(timestamp) as session_end,
            CAST(EXTRACT(EPOCH FROM (MAX(timestamp) - MIN(timestamp)))/60 AS INTEGER) as session_duration_minutes,
            SUM(CASE WHEN event_type = 'page_view' THEN 1 ELSE 0 END) as page_views,
            COUNT(*) as total_events,
            COALESCE(SUM(CASE WHEN event_type = 'purchase' THEN value ELSE 0 END), 0) as conversion_value
        FROM user_events
        GROUP BY session_id, user_id
        HAVING COUNT(*) > 1
        ORDER BY conversion_value DESC, session_duration_minutes DESC;
        
        -- Show session insights
        SELECT 
            'Total Sessions' as metric,
            COUNT(*) as value
        FROM session_analytics
        UNION ALL
        SELECT 
            'Avg Session Duration (min)' as metric,
            ROUND(AVG(session_duration_minutes), 1) as value
        FROM session_analytics
        UNION ALL
        SELECT 
            'Sessions with Purchases' as metric,
            COUNT(*) as value
        FROM session_analytics
        WHERE conversion_value > 0;
    """,
    dag=dag
)

def export_analytics_reports(**context):
    """Export comprehensive analytics reports"""
    hook = DuckDBHook(duckdb_conn_id='duckdb_default')
    
    # Export hourly trends
    hourly_data = hook.get_pandas_df("""
        SELECT 
            hour_bucket,
            event_type,
            device,
            event_count,
            unique_users,
            total_value
        FROM hourly_analytics
        ORDER BY hour_bucket DESC, event_count DESC
    """)
    
    # Export session analysis
    session_data = hook.get_pandas_df("""
        SELECT 
            session_id,
            user_id,
            session_start,
            session_duration_minutes,
            page_views,
            total_events,
            conversion_value
        FROM session_analytics
        ORDER BY conversion_value DESC
    """)
    
    # Export device performance
    device_data = hook.get_pandas_df("""
        SELECT 
            device,
            SUM(event_count) as total_events,
            SUM(unique_users) as total_users,
            ROUND(SUM(total_value), 2) as total_value,
            ROUND(AVG(avg_value), 2) as avg_event_value
        FROM hourly_analytics
        GROUP BY device
        ORDER BY total_value DESC
    """)
    
    os.makedirs('/home/airflow/data/analytics', exist_ok=True)
    hourly_data.to_csv('/home/airflow/data/analytics/hourly_trends.csv', index=False)
    session_data.to_csv('/home/airflow/data/analytics/session_analysis.csv', index=False)
    device_data.to_csv('/home/airflow/data/analytics/device_performance.csv', index=False)
    
    print(f"Exported analytics: {len(hourly_data)} hourly records, {len(session_data)} sessions, {len(device_data)} device summaries")

export_reports = PythonOperator(
    task_id='export_analytics_reports',
    python_callable=export_analytics_reports,
    dag=dag
)

Set dependencies

generate_events >> create_event_tables >> process_json_events >> [generate_hourly_analytics, analyze_user_sessions] >> export_reports

Set proper file permissions

Configure correct ownership and permissions for Airflow directories and data files.

sudo chown -R airflow:airflow /home/airflow
sudo chmod -R 755 /home/airflow/airflow
sudo chmod -R 775 /home/airflow/data
sudo chmod 644 /home/airflow/airflow/dags/*.py
Never use chmod 777. It gives every user on the system full access to your files. Instead, fix ownership with chown and use minimal permissions like 755 for directories and 644 for files.

Configure monitoring and optimization

Create DuckDB performance monitoring DAG

Build a monitoring pipeline that tracks DuckDB performance metrics and identifies optimization opportunities.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.duckdb.operators.duckdb import DuckDBOperator
from airflow.operators.python import PythonOperator
from airflow.providers.duckdb.hooks.duckdb import DuckDBHook
import os

default_args = {
    'owner': 'ops-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=2)
}

dag = DAG(
    'duckdb_monitoring',
    default_args=default_args,
    description='Monitor DuckDB performance and usage',
    schedule_interval=timedelta(hours=1),
    catchup=False
)

create_monitoring_tables = DuckDBOperator(
    task_id='create_monitoring_tables',
    duckdb_conn_id='duckdb_default',
    sql="""
        CREATE TABLE IF NOT EXISTS db_metrics (
            timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            database_size_mb DECIMAL(10,2),
            table_count INTEGER,
            total_rows BIGINT,
            memory_usage_mb DECIMAL(10,2)
        );
        
        CREATE TABLE IF NOT EXISTS query_performance (
            timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            query_type VARCHAR,
            execution_time_ms INTEGER,
            rows_processed BIGINT
        );
    """,
    dag=dag
)

collect_db_metrics = DuckDBOperator(
    task_id='collect_database_metrics',
    duckdb_conn_id='duckdb_default',
    sql="""
        -- Collect current database metrics
        INSERT INTO db_metrics (database_size_mb, table_count, total_rows, memory_usage_mb)
        SELECT 
            ROUND(SUM(estimated_size) / 1024.0 / 1024.0, 2) as database_size_mb,
            COUNT(DISTINCT table_name) as table_count,
            SUM(estimated_size) as total_rows,
            64.0 as memory_usage_mb  -- Placeholder for actual memory monitoring
        FROM duckdb_tables();
        
        -- Performance test queries
        INSERT INTO query_performance (query_type, execution_time_ms, rows_processed)
        SELECT 'table_scan', 0, COUNT(*) FROM raw_sales;
        
        INSERT INTO query_performance (query_type, execution_time_ms, rows_processed)
        SELECT 'aggregate_query', 0, COUNT(*) FROM daily_sales_summary;
    """,
    dag=dag
)

def analyze_performance(**context):
    """Analyze DuckDB performance trends and generate alerts"""
    hook = DuckDBHook(duckdb_conn_id='duckdb_default')
    
    # Get recent metrics
    metrics = hook.get_pandas_df("""
        SELECT 
            timestamp,
            database_size_mb,
            table_count,
            total_rows
        FROM db_metrics
        ORDER BY timestamp DESC
        LIMIT 24
    """)
    
    # Check for performance issues
    latest_metrics = hook.get_pandas_df("""
        SELECT 
            database_size_mb,
            table_count
        FROM db_metrics
        ORDER BY timestamp DESC
        LIMIT 1
    """).iloc[0]
    
    # Simple alerting logic
    alerts = []
    if latest_metrics['database_size_mb'] > 1000:
        alerts.append(f"Database size is {latest_metrics['database_size_mb']:.2f} MB")
    
    if latest_metrics['table_count'] > 20:
        alerts.append(f"High table count: {latest_metrics['table_count']} tables")
    
    print(f"Performance analysis complete. Database: {latest_metrics['database_size_mb']:.2f} MB")
    if alerts:
        print("Alerts:", "; ".join(alerts))

analyze_perf = PythonOperator(
    task_id='analyze_performance',
    python_callable=analyze_performance,
    dag=dag
)

Task dependencies

create_monitoring_tables >> collect_db_metrics >> analyze_perf

Configure Airflow logging

Enhance Airflow logging configuration for better troubleshooting and monitoring of DuckDB operations.

[logging]
logging_level = INFO
fab_logging_level = WARN
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /home/airflow/airflow/logs/dag_processor_manager/dag_processor_manager.log

Verify your setup

Check that both Airflow services are running and the web interface is accessible.

sudo systemctl status airflow-scheduler airflow-webserver
curl -s http://localhost:8080/health | grep -o '"status":"[^"]*"'
ls -la ~/airflow/dags/
ls -la ~/data/

Verify DuckDB connection and data pipeline functionality.

source ~/airflow-venv/bin/activate
airflow connections test duckdb_default
airflow dags list | grep duckdb
airflow tasks test duckdb_analytics_pipeline generate_sample_data 2024-01-01

You can now access the Airflow web interface at http://your-server-ip:8080 with username admin and password SecurePassword123. The DAGs should appear in the interface and you can trigger them manually or wait for their scheduled runs.

Common issues

SymptomCauseFix
DAG import errorsPython syntax errors or missing importsairflow dags list-import-errors to check syntax
DuckDB connection failsIncorrect file path or permissionsCheck connection config: airflow connections get duckdb_default
Tasks stuck in running stateInsufficient executor capacityIncrease parallelism in airflow.cfg and restart services
Memory errors during data processingLarge datasets exceed available RAMProcess data in chunks or increase system memory
Permission denied on data filesWrong file ownershipsudo chown -R airflow:airflow /home/airflow/data
Scheduler not picking up DAGsDAG directory permissionssudo chmod -R 755 ~/airflow/dags

Performance optimization

Configure DuckDB-specific optimizations for better performance with large datasets and complex analytical queries. These settings help DuckDB handle concurrent operations and memory management more efficiently.

from airflow.providers.duckdb.hooks.duckdb import DuckDBHook

def optimize_duckdb_settings():
    """Apply DuckDB performance optimizations"""
    hook = DuckDBHook(duckdb_conn_id='duckdb_default')
    
    # Configure memory and threading
    hook.run("""
        SET memory_limit = '2GB';
        SET threads = 4;
        SET enable_progress_bar = false;
        SET enable_object_cache = true;
        SET checkpoint_threshold = '256MB';
    """)
    
    # Create indexes for frequently queried columns
    hook.run("""
        PRAGMA table_info('raw_sales');
        -- DuckDB automatically optimizes queries, but you can hint with ORDER BY
    """)
    
    print("DuckDB optimization settings applied")

For production workloads, consider using DuckDB's columnar storage optimizations and implementing automated backup strategies for your analytical databases.

Next steps

Running this in production?

Want this handled for you? Setting up Airflow once is straightforward. Keeping it patched, monitored, backed up and tuned across environments is the harder part. See how we run infrastructure like this for European SaaS and e-commerce teams.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle high availability infrastructure for businesses that depend on uptime. From initial setup to ongoing operations.