Configure DuckDB as a high-performance analytical database backend for Apache Airflow workflows. Build automated data pipelines that process files, APIs, and databases using DuckDB's columnar engine.
Prerequisites
- Root or sudo access
- Python 3.8 or higher
- 4GB RAM minimum
- 10GB free disk space
What this solves
DuckDB provides a fast, embedded analytical database that works perfectly with Apache Airflow for data pipeline automation. This setup gives you columnar analytics on CSV files, Parquet data, and database connections without managing a separate database cluster. You'll configure the DuckDB provider for Airflow and build DAGs that automate data ingestion, transformation, and analysis.
Step-by-step installation
Update system packages
Start by updating your package manager to ensure you get the latest versions of Python and system libraries.
sudo apt update && sudo apt upgrade -y
sudo apt install -y python3 python3-pip python3-venv build-essentialCreate dedicated user for Airflow
Run Airflow as a dedicated user for better security isolation and file permission management.
sudo useradd -m -s /bin/bash airflow
sudo usermod -aG sudo airflow
sudo su - airflowCreate Python virtual environment
Isolate Airflow and DuckDB dependencies from the system Python installation to avoid package conflicts.
python3 -m venv ~/airflow-venv
source ~/airflow-venv/bin/activate
pip install --upgrade pip setuptools wheelInstall Apache Airflow with DuckDB provider
Install Airflow with the DuckDB provider package and required dependencies for data pipeline operations.
export AIRFLOW_VERSION=2.8.1
export PYTHON_VERSION="$(python --version | cut -d" " -f 2 | cut -d"." -f 1-2)"
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install apache-airflow-providers-duckdb
pip install duckdb pandas pyarrowInitialize Airflow database
Set up the Airflow metadata database and create the default admin user for the web interface.
export AIRFLOW_HOME=~/airflow
airflow db init
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password SecurePassword123Configure Airflow settings
Optimize Airflow configuration for DuckDB workflows and enable parallel task execution.
[core]
executor = LocalExecutor
max_active_runs_per_dag = 3
max_active_tasks_per_dag = 8
parallelism = 16
[scheduler]
dag_dir_list_interval = 300
max_threads = 2
[webserver]
expose_config = False
web_server_port = 8080
base_url = http://localhost:8080Create DuckDB connection in Airflow
Configure a reusable DuckDB connection that your DAGs can reference for database operations.
mkdir -p ~/airflow/dags ~/airflow/logs ~/airflow/plugins ~/data
airflow connections add 'duckdb_default' \
--conn-type 'duckdb' \
--conn-host '/home/airflow/data/analytics.duckdb' \
--conn-description 'Default DuckDB connection for analytics'Create systemd service for Airflow scheduler
Set up Airflow scheduler to run automatically as a system service with proper logging and restart policies.
sudo tee /etc/systemd/system/airflow-scheduler.service > /dev/null << 'EOF'
[Unit]
Description=Airflow Scheduler
After=network.target
Wants=network.target
[Service]
User=airflow
Group=airflow
Type=simple
ExecStart=/home/airflow/airflow-venv/bin/airflow scheduler
Environment=AIRFLOW_HOME=/home/airflow/airflow
Restart=always
RestartSec=10
WorkingDirectory=/home/airflow
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
EOFCreate systemd service for Airflow webserver
Set up the Airflow web interface as a system service for DAG monitoring and management.
sudo tee /etc/systemd/system/airflow-webserver.service > /dev/null << 'EOF'
[Unit]
Description=Airflow Webserver
After=network.target
Wants=network.target
[Service]
User=airflow
Group=airflow
Type=simple
ExecStart=/home/airflow/airflow-venv/bin/airflow webserver --port 8080
Environment=AIRFLOW_HOME=/home/airflow/airflow
Restart=always
RestartSec=10
WorkingDirectory=/home/airflow
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
EOFEnable and start Airflow services
Start both Airflow services and enable them to start automatically on system boot.
sudo systemctl daemon-reload
sudo systemctl enable --now airflow-scheduler airflow-webserver
sudo systemctl status airflow-scheduler airflow-webserverCreate your first DuckDB data pipeline
Create sample data processing DAG
Build a complete data pipeline that demonstrates DuckDB's capabilities with file processing, data transformation, and analytics.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.duckdb.operators.duckdb import DuckDBOperator
from airflow.operators.python import PythonOperator
from airflow.providers.duckdb.hooks.duckdb import DuckDBHook
import pandas as pd
import os
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'duckdb_analytics_pipeline',
default_args=default_args,
description='DuckDB data analytics pipeline',
schedule_interval=timedelta(hours=6),
catchup=False,
max_active_runs=1
)
def generate_sample_data(**context):
"""Generate sample sales data for processing"""
import random
from datetime import datetime, timedelta
# Create sample data
data = []
base_date = datetime.now() - timedelta(days=30)
for i in range(1000):
data.append({
'order_id': f'ORD-{i:06d}',
'customer_id': f'CUST-{random.randint(1, 100):03d}',
'product_id': f'PROD-{random.randint(1, 50):03d}',
'quantity': random.randint(1, 10),
'price': round(random.uniform(10, 500), 2),
'order_date': (base_date + timedelta(days=random.randint(0, 30))).strftime('%Y-%m-%d'),
'region': random.choice(['North', 'South', 'East', 'West'])
})
df = pd.DataFrame(data)
os.makedirs('/home/airflow/data/raw', exist_ok=True)
df.to_csv('/home/airflow/data/raw/sales_data.csv', index=False)
print(f"Generated {len(data)} sales records")
generate_data = PythonOperator(
task_id='generate_sample_data',
python_callable=generate_sample_data,
dag=dag
)
create_tables = DuckDBOperator(
task_id='create_analytics_tables',
duckdb_conn_id='duckdb_default',
sql="""
-- Create raw sales table
CREATE TABLE IF NOT EXISTS raw_sales (
order_id VARCHAR,
customer_id VARCHAR,
product_id VARCHAR,
quantity INTEGER,
price DECIMAL(10,2),
order_date DATE,
region VARCHAR
);
-- Create aggregated daily sales table
CREATE TABLE IF NOT EXISTS daily_sales_summary (
order_date DATE,
region VARCHAR,
total_orders INTEGER,
total_revenue DECIMAL(12,2),
avg_order_value DECIMAL(10,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""",
dag=dag
)
load_data = DuckDBOperator(
task_id='load_sales_data',
duckdb_conn_id='duckdb_default',
sql="""
-- Clear existing data
DELETE FROM raw_sales;
-- Load CSV data into DuckDB
INSERT INTO raw_sales
SELECT * FROM read_csv_auto('/home/airflow/data/raw/sales_data.csv');
SELECT COUNT(*) as loaded_records FROM raw_sales;
""",
dag=dag
)
analyze_data = DuckDBOperator(
task_id='analyze_sales_data',
duckdb_conn_id='duckdb_default',
sql="""
-- Clear previous analysis
DELETE FROM daily_sales_summary;
-- Generate daily sales analytics
INSERT INTO daily_sales_summary (order_date, region, total_orders, total_revenue, avg_order_value)
SELECT
order_date,
region,
COUNT(*) as total_orders,
ROUND(SUM(quantity * price), 2) as total_revenue,
ROUND(AVG(quantity * price), 2) as avg_order_value
FROM raw_sales
GROUP BY order_date, region
ORDER BY order_date DESC, region;
-- Show top performing regions
SELECT
region,
SUM(total_orders) as total_orders,
ROUND(SUM(total_revenue), 2) as total_revenue
FROM daily_sales_summary
GROUP BY region
ORDER BY total_revenue DESC;
""",
dag=dag
)
def export_results(**context):
"""Export analysis results to files"""
hook = DuckDBHook(duckdb_conn_id='duckdb_default')
# Export daily summary
daily_results = hook.get_pandas_df(
"SELECT * FROM daily_sales_summary ORDER BY order_date DESC"
)
# Export regional summary
regional_results = hook.get_pandas_df("""
SELECT
region,
SUM(total_orders) as total_orders,
ROUND(SUM(total_revenue), 2) as total_revenue,
ROUND(AVG(avg_order_value), 2) as avg_order_value
FROM daily_sales_summary
GROUP BY region
ORDER BY total_revenue DESC
""")
os.makedirs('/home/airflow/data/reports', exist_ok=True)
daily_results.to_csv('/home/airflow/data/reports/daily_sales.csv', index=False)
regional_results.to_csv('/home/airflow/data/reports/regional_summary.csv', index=False)
print(f"Exported {len(daily_results)} daily records and {len(regional_results)} regional summaries")
export_data = PythonOperator(
task_id='export_analysis_results',
python_callable=export_results,
dag=dag
)
Set task dependencies
generate_data >> create_tables >> load_data >> analyze_data >> export_dataCreate advanced DuckDB analytics DAG
Build a more complex pipeline that demonstrates DuckDB's advanced features like JSON processing, window functions, and data export.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.duckdb.operators.duckdb import DuckDBOperator
from airflow.operators.python import PythonOperator
from airflow.providers.duckdb.hooks.duckdb import DuckDBHook
import json
import os
default_args = {
'owner': 'analytics-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'retries': 2,
'retry_delay': timedelta(minutes=3)
}
dag = DAG(
'duckdb_advanced_analytics',
default_args=default_args,
description='Advanced DuckDB analytics with JSON and time-series',
schedule_interval=timedelta(hours=12),
catchup=False
)
def create_json_data(**context):
"""Generate sample JSON event data"""
import random
from datetime import datetime, timedelta
events = []
base_time = datetime.now() - timedelta(hours=24)
for i in range(500):
event = {
'event_id': f'evt_{i:06d}',
'timestamp': (base_time + timedelta(minutes=random.randint(0, 1440))).isoformat(),
'user_id': f'user_{random.randint(1, 100):03d}',
'event_type': random.choice(['page_view', 'click', 'purchase', 'signup']),
'properties': {
'page': random.choice(['/home', '/products', '/cart', '/checkout']),
'browser': random.choice(['chrome', 'firefox', 'safari', 'edge']),
'device': random.choice(['desktop', 'mobile', 'tablet']),
'value': round(random.uniform(0, 100), 2) if random.random() > 0.7 else None
},
'session_id': f'sess_{random.randint(1, 50):03d}'
}
events.append(event)
os.makedirs('/home/airflow/data/events', exist_ok=True)
with open('/home/airflow/data/events/user_events.json', 'w') as f:
for event in events:
f.write(json.dumps(event) + '\n')
print(f"Generated {len(events)} event records")
generate_events = PythonOperator(
task_id='generate_event_data',
python_callable=create_json_data,
dag=dag
)
create_event_tables = DuckDBOperator(
task_id='create_event_tables',
duckdb_conn_id='duckdb_default',
sql="""
-- Install and load JSON extension
INSTALL json;
LOAD json;
-- Create events table
CREATE TABLE IF NOT EXISTS user_events (
event_id VARCHAR,
timestamp TIMESTAMP,
user_id VARCHAR,
event_type VARCHAR,
page VARCHAR,
browser VARCHAR,
device VARCHAR,
value DECIMAL(10,2),
session_id VARCHAR
);
-- Create hourly analytics table
CREATE TABLE IF NOT EXISTS hourly_analytics (
hour_bucket TIMESTAMP,
event_type VARCHAR,
device VARCHAR,
event_count INTEGER,
unique_users INTEGER,
total_value DECIMAL(12,2),
avg_value DECIMAL(10,2)
);
-- Create user session analytics
CREATE TABLE IF NOT EXISTS session_analytics (
session_id VARCHAR,
user_id VARCHAR,
session_start TIMESTAMP,
session_end TIMESTAMP,
session_duration_minutes INTEGER,
page_views INTEGER,
total_events INTEGER,
conversion_value DECIMAL(10,2)
);
""",
dag=dag
)
process_json_events = DuckDBOperator(
task_id='process_json_events',
duckdb_conn_id='duckdb_default',
sql="""
-- Clear existing event data
DELETE FROM user_events;
-- Load and parse JSON events
INSERT INTO user_events
SELECT
json_extract_string(json_data, '$.event_id') as event_id,
STRPTIME(json_extract_string(json_data, '$.timestamp'), '%Y-%m-%dT%H:%M:%S.%f') as timestamp,
json_extract_string(json_data, '$.user_id') as user_id,
json_extract_string(json_data, '$.event_type') as event_type,
json_extract_string(json_data, '$.properties.page') as page,
json_extract_string(json_data, '$.properties.browser') as browser,
json_extract_string(json_data, '$.properties.device') as device,
CAST(json_extract_string(json_data, '$.properties.value') AS DECIMAL(10,2)) as value,
json_extract_string(json_data, '$.session_id') as session_id
FROM (
SELECT json(line) as json_data
FROM read_text('/home/airflow/data/events/user_events.json')
) WHERE json_data IS NOT NULL;
SELECT COUNT(*) as loaded_events FROM user_events;
""",
dag=dag
)
generate_hourly_analytics = DuckDBOperator(
task_id='generate_hourly_analytics',
duckdb_conn_id='duckdb_default',
sql="""
DELETE FROM hourly_analytics;
INSERT INTO hourly_analytics
SELECT
DATE_TRUNC('hour', timestamp) as hour_bucket,
event_type,
device,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users,
COALESCE(SUM(value), 0) as total_value,
ROUND(AVG(value), 2) as avg_value
FROM user_events
GROUP BY DATE_TRUNC('hour', timestamp), event_type, device
ORDER BY hour_bucket DESC, event_count DESC;
-- Show top analytics
SELECT
hour_bucket,
SUM(event_count) as total_events,
SUM(unique_users) as total_unique_users,
ROUND(SUM(total_value), 2) as total_revenue
FROM hourly_analytics
GROUP BY hour_bucket
ORDER BY hour_bucket DESC
LIMIT 10;
""",
dag=dag
)
analyze_user_sessions = DuckDBOperator(
task_id='analyze_user_sessions',
duckdb_conn_id='duckdb_default',
sql="""
DELETE FROM session_analytics;
INSERT INTO session_analytics
SELECT
session_id,
user_id,
MIN(timestamp) as session_start,
MAX(timestamp) as session_end,
CAST(EXTRACT(EPOCH FROM (MAX(timestamp) - MIN(timestamp)))/60 AS INTEGER) as session_duration_minutes,
SUM(CASE WHEN event_type = 'page_view' THEN 1 ELSE 0 END) as page_views,
COUNT(*) as total_events,
COALESCE(SUM(CASE WHEN event_type = 'purchase' THEN value ELSE 0 END), 0) as conversion_value
FROM user_events
GROUP BY session_id, user_id
HAVING COUNT(*) > 1
ORDER BY conversion_value DESC, session_duration_minutes DESC;
-- Show session insights
SELECT
'Total Sessions' as metric,
COUNT(*) as value
FROM session_analytics
UNION ALL
SELECT
'Avg Session Duration (min)' as metric,
ROUND(AVG(session_duration_minutes), 1) as value
FROM session_analytics
UNION ALL
SELECT
'Sessions with Purchases' as metric,
COUNT(*) as value
FROM session_analytics
WHERE conversion_value > 0;
""",
dag=dag
)
def export_analytics_reports(**context):
"""Export comprehensive analytics reports"""
hook = DuckDBHook(duckdb_conn_id='duckdb_default')
# Export hourly trends
hourly_data = hook.get_pandas_df("""
SELECT
hour_bucket,
event_type,
device,
event_count,
unique_users,
total_value
FROM hourly_analytics
ORDER BY hour_bucket DESC, event_count DESC
""")
# Export session analysis
session_data = hook.get_pandas_df("""
SELECT
session_id,
user_id,
session_start,
session_duration_minutes,
page_views,
total_events,
conversion_value
FROM session_analytics
ORDER BY conversion_value DESC
""")
# Export device performance
device_data = hook.get_pandas_df("""
SELECT
device,
SUM(event_count) as total_events,
SUM(unique_users) as total_users,
ROUND(SUM(total_value), 2) as total_value,
ROUND(AVG(avg_value), 2) as avg_event_value
FROM hourly_analytics
GROUP BY device
ORDER BY total_value DESC
""")
os.makedirs('/home/airflow/data/analytics', exist_ok=True)
hourly_data.to_csv('/home/airflow/data/analytics/hourly_trends.csv', index=False)
session_data.to_csv('/home/airflow/data/analytics/session_analysis.csv', index=False)
device_data.to_csv('/home/airflow/data/analytics/device_performance.csv', index=False)
print(f"Exported analytics: {len(hourly_data)} hourly records, {len(session_data)} sessions, {len(device_data)} device summaries")
export_reports = PythonOperator(
task_id='export_analytics_reports',
python_callable=export_analytics_reports,
dag=dag
)
Set dependencies
generate_events >> create_event_tables >> process_json_events >> [generate_hourly_analytics, analyze_user_sessions] >> export_reportsSet proper file permissions
Configure correct ownership and permissions for Airflow directories and data files.
sudo chown -R airflow:airflow /home/airflow
sudo chmod -R 755 /home/airflow/airflow
sudo chmod -R 775 /home/airflow/data
sudo chmod 644 /home/airflow/airflow/dags/*.pyConfigure monitoring and optimization
Create DuckDB performance monitoring DAG
Build a monitoring pipeline that tracks DuckDB performance metrics and identifies optimization opportunities.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.duckdb.operators.duckdb import DuckDBOperator
from airflow.operators.python import PythonOperator
from airflow.providers.duckdb.hooks.duckdb import DuckDBHook
import os
default_args = {
'owner': 'ops-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=2)
}
dag = DAG(
'duckdb_monitoring',
default_args=default_args,
description='Monitor DuckDB performance and usage',
schedule_interval=timedelta(hours=1),
catchup=False
)
create_monitoring_tables = DuckDBOperator(
task_id='create_monitoring_tables',
duckdb_conn_id='duckdb_default',
sql="""
CREATE TABLE IF NOT EXISTS db_metrics (
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
database_size_mb DECIMAL(10,2),
table_count INTEGER,
total_rows BIGINT,
memory_usage_mb DECIMAL(10,2)
);
CREATE TABLE IF NOT EXISTS query_performance (
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
query_type VARCHAR,
execution_time_ms INTEGER,
rows_processed BIGINT
);
""",
dag=dag
)
collect_db_metrics = DuckDBOperator(
task_id='collect_database_metrics',
duckdb_conn_id='duckdb_default',
sql="""
-- Collect current database metrics
INSERT INTO db_metrics (database_size_mb, table_count, total_rows, memory_usage_mb)
SELECT
ROUND(SUM(estimated_size) / 1024.0 / 1024.0, 2) as database_size_mb,
COUNT(DISTINCT table_name) as table_count,
SUM(estimated_size) as total_rows,
64.0 as memory_usage_mb -- Placeholder for actual memory monitoring
FROM duckdb_tables();
-- Performance test queries
INSERT INTO query_performance (query_type, execution_time_ms, rows_processed)
SELECT 'table_scan', 0, COUNT(*) FROM raw_sales;
INSERT INTO query_performance (query_type, execution_time_ms, rows_processed)
SELECT 'aggregate_query', 0, COUNT(*) FROM daily_sales_summary;
""",
dag=dag
)
def analyze_performance(**context):
"""Analyze DuckDB performance trends and generate alerts"""
hook = DuckDBHook(duckdb_conn_id='duckdb_default')
# Get recent metrics
metrics = hook.get_pandas_df("""
SELECT
timestamp,
database_size_mb,
table_count,
total_rows
FROM db_metrics
ORDER BY timestamp DESC
LIMIT 24
""")
# Check for performance issues
latest_metrics = hook.get_pandas_df("""
SELECT
database_size_mb,
table_count
FROM db_metrics
ORDER BY timestamp DESC
LIMIT 1
""").iloc[0]
# Simple alerting logic
alerts = []
if latest_metrics['database_size_mb'] > 1000:
alerts.append(f"Database size is {latest_metrics['database_size_mb']:.2f} MB")
if latest_metrics['table_count'] > 20:
alerts.append(f"High table count: {latest_metrics['table_count']} tables")
print(f"Performance analysis complete. Database: {latest_metrics['database_size_mb']:.2f} MB")
if alerts:
print("Alerts:", "; ".join(alerts))
analyze_perf = PythonOperator(
task_id='analyze_performance',
python_callable=analyze_performance,
dag=dag
)
Task dependencies
create_monitoring_tables >> collect_db_metrics >> analyze_perfConfigure Airflow logging
Enhance Airflow logging configuration for better troubleshooting and monitoring of DuckDB operations.
[logging]
logging_level = INFO
fab_logging_level = WARN
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /home/airflow/airflow/logs/dag_processor_manager/dag_processor_manager.logVerify your setup
Check that both Airflow services are running and the web interface is accessible.
sudo systemctl status airflow-scheduler airflow-webserver
curl -s http://localhost:8080/health | grep -o '"status":"[^"]*"'
ls -la ~/airflow/dags/
ls -la ~/data/Verify DuckDB connection and data pipeline functionality.
source ~/airflow-venv/bin/activate
airflow connections test duckdb_default
airflow dags list | grep duckdb
airflow tasks test duckdb_analytics_pipeline generate_sample_data 2024-01-01You can now access the Airflow web interface at http://your-server-ip:8080 with username admin and password SecurePassword123. The DAGs should appear in the interface and you can trigger them manually or wait for their scheduled runs.
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| DAG import errors | Python syntax errors or missing imports | airflow dags list-import-errors to check syntax |
| DuckDB connection fails | Incorrect file path or permissions | Check connection config: airflow connections get duckdb_default |
| Tasks stuck in running state | Insufficient executor capacity | Increase parallelism in airflow.cfg and restart services |
| Memory errors during data processing | Large datasets exceed available RAM | Process data in chunks or increase system memory |
| Permission denied on data files | Wrong file ownership | sudo chown -R airflow:airflow /home/airflow/data |
| Scheduler not picking up DAGs | DAG directory permissions | sudo chmod -R 755 ~/airflow/dags |
Performance optimization
Configure DuckDB-specific optimizations for better performance with large datasets and complex analytical queries. These settings help DuckDB handle concurrent operations and memory management more efficiently.
from airflow.providers.duckdb.hooks.duckdb import DuckDBHook
def optimize_duckdb_settings():
"""Apply DuckDB performance optimizations"""
hook = DuckDBHook(duckdb_conn_id='duckdb_default')
# Configure memory and threading
hook.run("""
SET memory_limit = '2GB';
SET threads = 4;
SET enable_progress_bar = false;
SET enable_object_cache = true;
SET checkpoint_threshold = '256MB';
""")
# Create indexes for frequently queried columns
hook.run("""
PRAGMA table_info('raw_sales');
-- DuckDB automatically optimizes queries, but you can hint with ORDER BY
""")
print("DuckDB optimization settings applied")For production workloads, consider using DuckDB's columnar storage optimizations and implementing automated backup strategies for your analytical databases.
Next steps
- Scale Airflow with CeleryExecutor for distributed processing
- Set up comprehensive Airflow monitoring with Prometheus and Grafana
- Integrate DuckDB with Apache Spark for larger datasets
- Monitor DuckDB performance metrics with custom dashboards
- Implement data quality checks in your Airflow pipelines
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Configuration
AIRFLOW_VERSION="2.8.1"
AIRFLOW_USER="airflow"
AIRFLOW_HOME="/home/${AIRFLOW_USER}/airflow"
VENV_PATH="/home/${AIRFLOW_USER}/airflow-venv"
# Print colored output
print_status() { echo -e "${GREEN}[INFO]${NC} $1"; }
print_warning() { echo -e "${YELLOW}[WARN]${NC} $1"; }
print_error() { echo -e "${RED}[ERROR]${NC} $1"; }
# Cleanup function for rollback
cleanup() {
print_error "Installation failed. Cleaning up..."
systemctl stop airflow-scheduler airflow-webserver 2>/dev/null || true
systemctl disable airflow-scheduler airflow-webserver 2>/dev/null || true
rm -f /etc/systemd/system/airflow-*.service
userdel -r ${AIRFLOW_USER} 2>/dev/null || true
exit 1
}
trap cleanup ERR
# Check prerequisites
if [[ $EUID -ne 0 ]]; then
print_error "This script must be run as root"
exit 1
fi
# Detect distribution
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update && apt upgrade -y"
PKG_INSTALL="apt install -y"
BUILD_DEPS="python3 python3-pip python3-venv build-essential"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
BUILD_DEPS="python3 python3-pip python3-devel gcc gcc-c++ make"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
BUILD_DEPS="python3 python3-pip python3-devel gcc gcc-c++ make"
;;
*)
print_error "Unsupported distribution: $ID"
exit 1
;;
esac
else
print_error "Cannot detect distribution"
exit 1
fi
print_status "[1/10] Updating system packages..."
${PKG_UPDATE}
${PKG_INSTALL} ${BUILD_DEPS}
print_status "[2/10] Creating dedicated airflow user..."
if id "${AIRFLOW_USER}" &>/dev/null; then
print_warning "User ${AIRFLOW_USER} already exists"
else
useradd -m -s /bin/bash ${AIRFLOW_USER}
usermod -aG wheel ${AIRFLOW_USER} 2>/dev/null || usermod -aG sudo ${AIRFLOW_USER} 2>/dev/null || true
fi
print_status "[3/10] Creating Python virtual environment..."
sudo -u ${AIRFLOW_USER} python3 -m venv ${VENV_PATH}
print_status "[4/10] Installing Apache Airflow with DuckDB provider..."
sudo -u ${AIRFLOW_USER} bash -c "
source ${VENV_PATH}/bin/activate
pip install --upgrade pip setuptools wheel
export PYTHON_VERSION=\$(python --version | cut -d' ' -f 2 | cut -d'.' -f 1-2)
export CONSTRAINT_URL=\"https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-\${PYTHON_VERSION}.txt\"
pip install \"apache-airflow==${AIRFLOW_VERSION}\" --constraint \"\${CONSTRAINT_URL}\"
pip install apache-airflow-providers-duckdb duckdb pandas pyarrow
"
print_status "[5/10] Initializing Airflow database..."
sudo -u ${AIRFLOW_USER} bash -c "
source ${VENV_PATH}/bin/activate
export AIRFLOW_HOME=${AIRFLOW_HOME}
airflow db init
"
print_status "[6/10] Creating Airflow admin user..."
sudo -u ${AIRFLOW_USER} bash -c "
source ${VENV_PATH}/bin/activate
export AIRFLOW_HOME=${AIRFLOW_HOME}
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password SecurePassword123
"
print_status "[7/10] Configuring Airflow settings..."
sudo -u ${AIRFLOW_USER} tee ${AIRFLOW_HOME}/airflow.cfg > /dev/null << 'EOF'
[core]
executor = LocalExecutor
max_active_runs_per_dag = 3
max_active_tasks_per_dag = 8
parallelism = 16
[scheduler]
dag_dir_list_interval = 300
max_threads = 2
[webserver]
expose_config = False
web_server_port = 8080
base_url = http://localhost:8080
EOF
print_status "[8/10] Setting up DuckDB connection and directories..."
sudo -u ${AIRFLOW_USER} bash -c "
mkdir -p ${AIRFLOW_HOME}/{dags,logs,plugins} /home/${AIRFLOW_USER}/data
source ${VENV_PATH}/bin/activate
export AIRFLOW_HOME=${AIRFLOW_HOME}
airflow connections add 'duckdb_default' \
--conn-type 'duckdb' \
--conn-host '/home/${AIRFLOW_USER}/data/analytics.duckdb' \
--conn-description 'Default DuckDB connection for analytics'
"
print_status "[9/10] Creating systemd services..."
# Airflow Scheduler Service
tee /etc/systemd/system/airflow-scheduler.service > /dev/null << EOF
[Unit]
Description=Airflow Scheduler
After=network.target
[Service]
Type=simple
User=${AIRFLOW_USER}
Group=${AIRFLOW_USER}
WorkingDirectory=/home/${AIRFLOW_USER}
Environment=PATH=${VENV_PATH}/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
Environment=AIRFLOW_HOME=${AIRFLOW_HOME}
ExecStart=${VENV_PATH}/bin/airflow scheduler
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
EOF
# Airflow Webserver Service
tee /etc/systemd/system/airflow-webserver.service > /dev/null << EOF
[Unit]
Description=Airflow Webserver
After=network.target
[Service]
Type=simple
User=${AIRFLOW_USER}
Group=${AIRFLOW_USER}
WorkingDirectory=/home/${AIRFLOW_USER}
Environment=PATH=${VENV_PATH}/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
Environment=AIRFLOW_HOME=${AIRFLOW_HOME}
ExecStart=${VENV_PATH}/bin/airflow webserver --port 8080
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
EOF
# Set proper permissions
chmod 644 /etc/systemd/system/airflow-*.service
chown root:root /etc/systemd/system/airflow-*.service
print_status "[10/10] Enabling and starting services..."
systemctl daemon-reload
systemctl enable airflow-scheduler airflow-webserver
systemctl start airflow-scheduler airflow-webserver
# Configure firewall if active
if systemctl is-active --quiet firewalld; then
firewall-cmd --permanent --add-port=8080/tcp
firewall-cmd --reload
print_status "Firewall configured for port 8080"
elif systemctl is-active --quiet ufw; then
ufw allow 8080/tcp
print_status "UFW configured for port 8080"
fi
print_status "Verifying installation..."
sleep 10
# Verification checks
if systemctl is-active --quiet airflow-scheduler; then
print_status "✓ Airflow Scheduler is running"
else
print_error "✗ Airflow Scheduler is not running"
exit 1
fi
if systemctl is-active --quiet airflow-webserver; then
print_status "✓ Airflow Webserver is running"
else
print_error "✗ Airflow Webserver is not running"
exit 1
fi
if sudo -u ${AIRFLOW_USER} ${VENV_PATH}/bin/python -c "import duckdb; print('DuckDB version:', duckdb.__version__)"; then
print_status "✓ DuckDB is properly installed"
else
print_error "✗ DuckDB installation failed"
exit 1
fi
print_status "Installation completed successfully!"
print_status "Airflow web interface: http://localhost:8080"
print_status "Default credentials: admin / SecurePassword123"
print_status "DuckDB data directory: /home/${AIRFLOW_USER}/data"
print_status "Airflow home: ${AIRFLOW_HOME}"
Review the script before running. Execute with: bash install.sh