Install and configure DuckDB for analytical workloads with Python integration

Beginner 25 min Apr 03, 2026 23 views
Ubuntu 24.04 Ubuntu 22.04 Debian 12 AlmaLinux 9 Rocky Linux 9 Fedora 41

Set up DuckDB, the high-performance analytical database, with CLI tools and Python integration for fast OLAP queries and data analytics workloads.

Prerequisites

  • Python 3.8 or later
  • 4GB RAM minimum (8GB+ recommended)
  • 2GB free disk space

What this solves

DuckDB is an embedded analytical database designed for OLAP workloads, data analytics, and business intelligence applications. Unlike traditional databases like PostgreSQL or MySQL that excel at transactional workloads, DuckDB specializes in analytical queries over large datasets with columnar storage and vectorized query execution.

This tutorial shows you how to install DuckDB CLI tools and Python client libraries, configure memory settings for analytical workloads, integrate with pandas for data science workflows, and optimize performance for large-scale data processing tasks.

Step-by-step installation

Update system packages

Start by updating your package manager to ensure you get the latest versions of dependencies.

sudo apt update && sudo apt upgrade -y
sudo dnf update -y

Install Python development packages

Install Python and development tools needed for DuckDB Python integration and data analytics libraries.

sudo apt install -y python3 python3-pip python3-venv python3-dev build-essential wget curl
sudo dnf install -y python3 python3-pip python3-devel gcc gcc-c++ make wget curl

Download and install DuckDB CLI

Download the latest DuckDB CLI binary directly from GitHub releases for optimal performance and latest features.

cd /tmp
wget https://github.com/duckdb/duckdb/releases/latest/download/duckdb_cli-linux-amd64.zip
unzip duckdb_cli-linux-amd64.zip
sudo mv duckdb /usr/local/bin/
sudo chmod 755 /usr/local/bin/duckdb

Create Python virtual environment

Create an isolated Python environment for DuckDB and analytics libraries to avoid conflicts with system packages.

mkdir -p ~/duckdb-analytics
cd ~/duckdb-analytics
python3 -m venv duckdb_env
source duckdb_env/bin/activate

Install DuckDB Python client and analytics libraries

Install DuckDB Python library along with pandas, numpy, and other data science packages for comprehensive analytics workflows.

pip install --upgrade pip
pip install duckdb pandas numpy pyarrow matplotlib seaborn jupyter

Create DuckDB configuration directory

Set up directories for DuckDB databases, configuration, and analytics projects with proper permissions.

mkdir -p ~/duckdb-analytics/{databases,config,scripts,exports}
chmod 755 ~/duckdb-analytics
chmod 755 ~/duckdb-analytics/*

Create DuckDB configuration file

Configure DuckDB with optimized settings for analytical workloads including memory limits and performance tuning.

-- DuckDB Configuration for Analytical Workloads
-- Memory configuration (adjust based on your system RAM)
SET memory_limit = '4GB';
SET max_memory = '8GB';

-- Performance optimization
SET threads = 4;
SET enable_progress_bar = true;
SET enable_profiling = 'json';
SET profiling_output = '/tmp/duckdb_profile.json';

-- File format optimization
SET default_order = 'ASC';
SET preserve_insertion_order = false;

-- Analytics-specific settings
SET enable_http_metadata_cache = true;
SET enable_object_cache = true;

Create DuckDB startup script

Create a convenience script to start DuckDB with your configuration and connect to your analytics database.

#!/bin/bash

DuckDB Analytics Environment Startup Script

DB_PATH="$HOME/duckdb-analytics/databases/analytics.duckdb" CONFIG_PATH="$HOME/duckdb-analytics/config/duckdb.sql" echo "Starting DuckDB Analytics Environment..." echo "Database: $DB_PATH" echo "Configuration: $CONFIG_PATH"

Activate Python virtual environment

source "$HOME/duckdb-analytics/duckdb_env/bin/activate"

Start DuckDB CLI with configuration

duckdb "$DB_PATH" -init "$CONFIG_PATH"

Make startup script executable

Set proper permissions on the startup script so it can be executed directly.

chmod 755 ~/duckdb-analytics/scripts/start_duckdb.sh

Create Python integration example

Create a sample Python script demonstrating DuckDB integration with pandas for data analytics workflows.

#!/usr/bin/env python3
"""
DuckDB Analytics Example
Demonstrates DuckDB integration with pandas for data analytics
"""

import duckdb
import pandas as pd
import numpy as np
from pathlib import Path

def main():
    # Connect to DuckDB database
    db_path = Path.home() / 'duckdb-analytics' / 'databases' / 'analytics.duckdb'
    conn = duckdb.connect(str(db_path))
    
    # Configure DuckDB for analytics
    conn.execute("SET memory_limit = '4GB'")
    conn.execute("SET threads = 4")
    
    # Create sample data
    print("Creating sample sales data...")
    sample_data = pd.DataFrame({
        'date': pd.date_range('2024-01-01', periods=1000),
        'product': np.random.choice(['A', 'B', 'C', 'D'], 1000),
        'region': np.random.choice(['North', 'South', 'East', 'West'], 1000),
        'sales_amount': np.random.uniform(100, 1000, 1000),
        'quantity': np.random.randint(1, 50, 1000)
    })
    
    # Insert data into DuckDB
    conn.execute("CREATE TABLE IF NOT EXISTS sales AS SELECT * FROM sample_data")
    
    # Perform analytical queries
    print("\nRunning analytical queries...")
    
    # Query 1: Sales by product
    result1 = conn.execute("""
        SELECT product, 
               SUM(sales_amount) as total_sales,
               AVG(sales_amount) as avg_sales,
               COUNT(*) as transaction_count
        FROM sales 
        GROUP BY product 
        ORDER BY total_sales DESC
    """).fetchdf()
    
    print("\nSales by Product:")
    print(result1)
    
    # Query 2: Monthly sales trends
    result2 = conn.execute("""
        SELECT DATE_TRUNC('month', date) as month,
               SUM(sales_amount) as monthly_sales,
               COUNT(*) as transactions
        FROM sales 
        GROUP BY month 
        ORDER BY month
    """).fetchdf()
    
    print("\nMonthly Sales Trends:")
    print(result2.head())
    
    # Export results
    export_path = Path.home() / 'duckdb-analytics' / 'exports'
    result1.to_csv(export_path / 'sales_by_product.csv', index=False)
    result2.to_csv(export_path / 'monthly_trends.csv', index=False)
    
    print(f"\nResults exported to {export_path}")
    
    # Close connection
    conn.close()
    print("\nAnalysis complete!")

if __name__ == '__main__':
    main()

Make Python script executable

Set proper permissions on the Python analytics script.

chmod 755 ~/duckdb-analytics/scripts/example_analytics.py

Create systemd user service (optional)

Create a systemd user service to run DuckDB analytics jobs on schedule or on-demand.

[Unit]
Description=DuckDB Analytics Service
After=network.target

[Service]
Type=oneshot
WorkingDirectory=%h/duckdb-analytics
Environment=PATH=%h/duckdb-analytics/duckdb_env/bin:/usr/local/bin:/usr/bin:/bin
ExecStart=%h/duckdb-analytics/duckdb_env/bin/python %h/duckdb-analytics/scripts/example_analytics.py
User=%i

[Install]
WantedBy=default.target

Enable systemd user service

Create the systemd user directory and reload the daemon to recognize the new service.

mkdir -p ~/.config/systemd/user
systemctl --user daemon-reload
systemctl --user enable duckdb-analytics.service

Configure DuckDB for analytical workloads

Optimize memory settings

Configure DuckDB memory limits based on your system RAM and expected dataset sizes.

-- Memory configuration for different system sizes
-- For systems with 8GB RAM:
SET memory_limit = '4GB';
SET max_memory = '6GB';

-- For systems with 16GB RAM:
-- SET memory_limit = '8GB';
-- SET max_memory = '12GB';

-- For systems with 32GB+ RAM:
-- SET memory_limit = '16GB';
-- SET max_memory = '24GB';

-- Buffer pool settings
SET max_temp_directory_size = '10GB';
SET temp_directory = '/tmp/duckdb_temp';

Configure performance settings

Set up DuckDB for optimal analytical query performance with parallel processing and caching.

-- Threading and parallelism
SET threads = 0;  -- Use all available CPU cores
SET external_threads = 1;

-- Query optimization
SET enable_optimizer = true;
SET force_compression = 'AUTO';
SET enable_progress_bar = true;

-- Caching for better performance
SET enable_http_metadata_cache = true;
SET enable_object_cache = true;
SET http_timeout = 30000;

-- File format optimization
SET binary_as_string = false;
SET calendar = 'gregorian';

Set up data import/export configuration

Configure DuckDB for efficient data loading from various sources including CSV, Parquet, and JSON files.

-- CSV import settings
SET default_delimiter = ',';
SET default_quote = '"';
SET default_escape = '"';

-- Parquet optimization
SET arrow_large_buffer_size = true;

-- HTTP/S3 settings for remote data
SET s3_region = 'us-east-1';
SET s3_url_style = 'vhost';

-- JSON handling
SET json_format = 'auto';

Set up Python integration with pandas

Create pandas integration module

Create a Python module that provides convenient functions for DuckDB and pandas integration.

#!/usr/bin/env python3
"""
DuckDB Pandas Integration Module
Provides convenient functions for data analytics workflows
"""

import duckdb
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
from typing import Optional, Union, Dict, Any

class DuckDBAnalytics:
    def __init__(self, db_path: Optional[str] = None):
        """Initialize DuckDB connection with analytics configuration"""
        if db_path is None:
            db_path = str(Path.home() / 'duckdb-analytics' / 'databases' / 'analytics.duckdb')
        
        self.db_path = db_path
        self.conn = duckdb.connect(db_path)
        self._configure_for_analytics()
    
    def _configure_for_analytics(self):
        """Apply optimal configuration for analytical workloads"""
        config_queries = [
            "SET memory_limit = '4GB'",
            "SET threads = 0",
            "SET enable_progress_bar = true",
            "SET enable_optimizer = true",
            "SET enable_object_cache = true"
        ]
        
        for query in config_queries:
            self.conn.execute(query)
    
    def load_csv(self, file_path: str, table_name: str, **kwargs) -> None:
        """Load CSV file into DuckDB table"""
        df = pd.read_csv(file_path, **kwargs)
        self.conn.execute(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM df")
        print(f"Loaded {len(df)} rows into table '{table_name}'")
    
    def load_parquet(self, file_path: str, table_name: str) -> None:
        """Load Parquet file into DuckDB table"""
        query = f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM read_parquet('{file_path}')"
        self.conn.execute(query)
        count = self.conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
        print(f"Loaded {count} rows into table '{table_name}'")
    
    def query_to_df(self, query: str) -> pd.DataFrame:
        """Execute query and return results as pandas DataFrame"""
        return self.conn.execute(query).fetchdf()
    
    def df_to_table(self, df: pd.DataFrame, table_name: str, if_exists: str = 'replace') -> None:
        """Store pandas DataFrame as DuckDB table"""
        if if_exists == 'replace':
            self.conn.execute(f"DROP TABLE IF EXISTS {table_name}")
        
        self.conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM df")
        print(f"Stored DataFrame with {len(df)} rows as table '{table_name}'")
    
    def export_to_csv(self, table_name: str, file_path: str) -> None:
        """Export DuckDB table to CSV file"""
        query = f"COPY {table_name} TO '{file_path}' (HEADER, DELIMITER ',')"
        self.conn.execute(query)
        print(f"Exported table '{table_name}' to {file_path}")
    
    def export_to_parquet(self, table_name: str, file_path: str) -> None:
        """Export DuckDB table to Parquet file"""
        query = f"COPY {table_name} TO '{file_path}' (FORMAT PARQUET)"
        self.conn.execute(query)
        print(f"Exported table '{table_name}' to {file_path}")
    
    def get_table_info(self, table_name: str) -> pd.DataFrame:
        """Get schema information for a table"""
        return self.conn.execute(f"DESCRIBE {table_name}").fetchdf()
    
    def list_tables(self) -> pd.DataFrame:
        """List all tables in the database"""
        return self.conn.execute("SHOW TABLES").fetchdf()
    
    def close(self):
        """Close database connection"""
        self.conn.close()
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

Create Jupyter notebook example

Create a comprehensive Jupyter notebook demonstrating DuckDB analytics workflows.

#!/usr/bin/env python3
"""
Create DuckDB Analytics Jupyter Notebook
"""

import nbformat as nbf
from pathlib import Path

Create notebook

nb = nbf.v4.new_notebook()

Add cells

cells = [ nbf.v4.new_markdown_cell("""

DuckDB Analytics Tutorial

This notebook demonstrates DuckDB integration with pandas for data analytics workloads. """), nbf.v4.new_code_cell("""

Import required libraries

import sys sys.path.append('/home/$(whoami)/duckdb-analytics/scripts') from duckdb_pandas import DuckDBAnalytics import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns

Set up plotting

plt.style.use('seaborn-v0_8') sns.set_palette('husl') """), nbf.v4.new_code_cell("""

Initialize DuckDB Analytics

db = DuckDBAnalytics()

Create sample dataset

np.random.seed(42) sample_data = pd.DataFrame({ 'date': pd.date_range('2023-01-01', periods=5000), 'category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Home'], 5000), 'product_id': np.random.randint(1000, 9999, 5000), 'sales_amount': np.random.lognormal(mean=5, sigma=1, size=5000), 'quantity': np.random.randint(1, 20, 5000), 'customer_segment': np.random.choice(['Premium', 'Standard', 'Budget'], 5000, p=[0.2, 0.5, 0.3]) }) print(f"Created sample dataset with {len(sample_data)} records") sample_data.head() """), nbf.v4.new_code_cell("""

Load data into DuckDB

db.df_to_table(sample_data, 'sales_data')

Show table info

print("Table Schema:") print(db.get_table_info('sales_data')) """), nbf.v4.new_code_cell("""

Analytical Query 1: Sales by Category

category_sales = db.query_to_df(""" SELECT category, COUNT(*) as transaction_count, SUM(sales_amount) as total_sales, AVG(sales_amount) as avg_sales, SUM(quantity) as total_quantity FROM sales_data GROUP BY category ORDER BY total_sales DESC """) print("Sales by Category:") print(category_sales)

Visualization

plt.figure(figsize=(12, 5)) plt.subplot(1, 2, 1) plt.bar(category_sales['category'], category_sales['total_sales']) plt.title('Total Sales by Category') plt.xlabel('Category') plt.ylabel('Sales Amount') plt.xticks(rotation=45) plt.subplot(1, 2, 2) plt.bar(category_sales['category'], category_sales['transaction_count']) plt.title('Transaction Count by Category') plt.xlabel('Category') plt.ylabel('Transaction Count') plt.xticks(rotation=45) plt.tight_layout() plt.show() """), nbf.v4.new_markdown_cell("""

Advanced Analytics

DuckDB excels at complex analytical queries with window functions, aggregations, and time-series analysis. """), nbf.v4.new_code_cell("""

Advanced Query: Monthly trends with window functions

monthly_trends = db.query_to_df(""" SELECT DATE_TRUNC('month', date) as month, category, SUM(sales_amount) as monthly_sales, LAG(SUM(sales_amount)) OVER ( PARTITION BY category ORDER BY DATE_TRUNC('month', date) ) as previous_month_sales, (SUM(sales_amount) - LAG(SUM(sales_amount)) OVER ( PARTITION BY category ORDER BY DATE_TRUNC('month', date) )) / LAG(SUM(sales_amount)) OVER ( PARTITION BY category ORDER BY DATE_TRUNC('month', date) ) * 100 as growth_rate FROM sales_data GROUP BY DATE_TRUNC('month', date), category ORDER BY month, category """) print("Monthly Growth Analysis:") print(monthly_trends.head(10)) """), nbf.v4.new_code_cell("""

Export results

export_path = Path.home() / 'duckdb-analytics' / 'exports' db.export_to_csv('sales_data', str(export_path / 'sales_data_export.csv')) db.export_to_parquet('sales_data', str(export_path / 'sales_data_export.parquet')) print("Data exported successfully!") print(f"Exports saved to: {export_path}")

Clean up

db.close() """) ]

Add cells to notebook

nb.cells = cells

Save notebook

notebook_path = Path.home() / 'duckdb-analytics' / 'DuckDB_Analytics_Tutorial.ipynb' with open(notebook_path, 'w') as f: nbf.write(nb, f) print(f"Jupyter notebook created: {notebook_path}") print("To start Jupyter: cd ~/duckdb-analytics && source duckdb_env/bin/activate && jupyter notebook")

Performance optimization and memory tuning

Create performance monitoring script

Set up monitoring to track DuckDB query performance and resource usage for optimization.

#!/usr/bin/env python3
"""
DuckDB Performance Monitoring and Optimization
"""

import duckdb
import psutil
import time
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, Any

class DuckDBPerformanceMonitor:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.conn = duckdb.connect(db_path)
        self._configure_profiling()
    
    def _configure_profiling(self):
        """Enable DuckDB profiling for performance analysis"""
        self.conn.execute("SET enable_profiling = 'json'")
        self.conn.execute("SET profiling_output = '/tmp/duckdb_profile.json'")
        self.conn.execute("SET enable_progress_bar = true")
    
    def get_system_info(self) -> Dict[str, Any]:
        """Get system resource information"""
        memory = psutil.virtual_memory()
        cpu_count = psutil.cpu_count()
        
        return {
            'timestamp': datetime.now().isoformat(),
            'total_memory_gb': round(memory.total / (1024**3), 2),
            'available_memory_gb': round(memory.available / (1024**3), 2),
            'memory_usage_percent': memory.percent,
            'cpu_cores': cpu_count,
            'cpu_usage_percent': psutil.cpu_percent(interval=1)
        }
    
    def benchmark_query(self, query: str, iterations: int = 3) -> Dict[str, Any]:
        """Benchmark query performance"""
        execution_times = []
        
        for i in range(iterations):
            start_time = time.time()
            result = self.conn.execute(query)
            rows = result.fetchall()
            end_time = time.time()
            
            execution_times.append(end_time - start_time)
        
        return {
            'query': query[:100] + '...' if len(query) > 100 else query,
            'iterations': iterations,
            'execution_times': execution_times,
            'avg_time': sum(execution_times) / len(execution_times),
            'min_time': min(execution_times),
            'max_time': max(execution_times),
            'result_count': len(rows)
        }
    
    def analyze_memory_usage(self) -> Dict[str, Any]:
        """Analyze DuckDB memory usage"""
        # Get DuckDB memory info
        memory_info = self.conn.execute("SELECT current_setting('memory_limit') as memory_limit").fetchone()
        
        # Get system memory info
        system_info = self.get_system_info()
        
        return {
            'duckdb_memory_limit': memory_info[0],
            'system_memory_info': system_info
        }
    
    def optimize_settings(self) -> Dict[str, str]:
        """Suggest optimal settings based on system resources"""
        memory = psutil.virtual_memory()
        cpu_count = psutil.cpu_count()
        
        # Calculate optimal memory limit (50-70% of available RAM)
        optimal_memory_gb = int((memory.available / (1024**3)) * 0.6)
        max_memory_gb = int((memory.total / (1024**3)) * 0.8)
        
        recommendations = {
            'memory_limit': f'{optimal_memory_gb}GB',
            'max_memory': f'{max_memory_gb}GB',
            'threads': str(cpu_count),
            'temp_directory': '/tmp/duckdb_temp'
        }
        
        return recommendations
    
    def apply_optimizations(self):
        """Apply recommended performance optimizations"""
        recommendations = self.optimize_settings()
        
        optimization_queries = [
            f"SET memory_limit = '{recommendations['memory_limit']}'",
            f"SET max_memory = '{recommendations['max_memory']}'",
            f"SET threads = {recommendations['threads']}",
            "SET enable_optimizer = true",
            "SET enable_object_cache = true",
            "SET force_compression = 'AUTO'"
        ]
        
        for query in optimization_queries:
            self.conn.execute(query)
            print(f"Applied: {query}")
    
    def generate_report(self, output_path: str):
        """Generate performance analysis report"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'system_info': self.get_system_info(),
            'memory_analysis': self.analyze_memory_usage(),
            'recommendations': self.optimize_settings()
        }
        
        with open(output_path, 'w') as f:
            json.dump(report, f, indent=2)
        
        print(f"Performance report saved to: {output_path}")
    
    def close(self):
        self.conn.close()

if __name__ == '__main__':
    db_path = str(Path.home() / 'duckdb-analytics' / 'databases' / 'analytics.duckdb')
    monitor = DuckDBPerformanceMonitor(db_path)
    
    # Apply optimizations
    monitor.apply_optimizations()
    
    # Generate report
    report_path = Path.home() / 'duckdb-analytics' / 'exports' / 'performance_report.json'
    monitor.generate_report(str(report_path))
    
    monitor.close()

Create memory optimization configuration

Set up DuckDB memory management for different workload patterns and system configurations.

-- Memory optimization for different workload patterns

-- Large dataset analytics (adjust based on your RAM)
SET memory_limit = '8GB';          -- Primary memory limit
SET max_memory = '12GB';           -- Maximum memory DuckDB can use
SET max_temp_directory_size = '20GB';  -- Temporary file space

-- Buffer management
SET buffer_manager_track_eviction_timestamps = true;
SET force_compression = 'AUTO';     -- Enable automatic compression

-- Threading optimization
SET threads = 0;                    -- Use all available CPU cores
SET external_threads = 1;          -- Enable external threading

-- Query optimization
SET enable_optimizer = true;
SET optimizer_join_order = true;
SET enable_progress_bar = true;

-- Caching for repeated queries
SET enable_object_cache = true;
SET enable_http_metadata_cache = true;

-- Disk spill configuration for large datasets
SET temp_directory = '/tmp/duckdb_temp';
SET preserve_insertion_order = false;  -- Better compression

Backup and data export strategies

Create backup automation script

Set up automated backup procedures for DuckDB databases with compression and rotation.

#!/bin/bash

DuckDB Backup Automation Script

set -euo pipefail

Configuration

DB_PATH="$HOME/duckdb-analytics/databases/analytics.duckdb" BACKUP_DIR="$HOME/duckdb-analytics/backups" DATE=$(date +"%Y%m%d_%H%M%S") BACKUP_NAME="analytics_backup_${DATE}" RETENTION_DAYS=30

Logging

LOG_FILE="$HOME/duckdb-analytics/logs/backup.log" mkdir -p "$(dirname "$LOG_FILE")" log_message() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE" }

Create backup directory

mkdir -p "$BACKUP_DIR" mkdir -p "$HOME/duckdb-analytics/logs" log_message "Starting DuckDB backup process..."

Check if database exists

if [[ ! -f "$DB_PATH" ]]; then log_message "ERROR: Database file not found: $DB_PATH" exit 1 fi

Get database size

DB_SIZE=$(du -h "$DB_PATH" | cut -f1) log_message "Database size: $DB_SIZE"

Create backup using DuckDB EXPORT command

log_message "Creating database backup..."

Export database to SQL format

duckdb "$DB_PATH" <Create compressed binary backup log_message "Creating compressed backup..." cp "$DB_PATH" "${BACKUP_DIR}/${BACKUP_NAME}.duckdb" gzip "${BACKUP_DIR}/${BACKUP_NAME}.duckdb"

Compress SQL export if it exists

if [[ -d "${BACKUP_DIR}/${BACKUP_NAME}_export" ]]; then tar -czf "${BACKUP_DIR}/${BACKUP_NAME}_export.tar.gz" -C "$BACKUP_DIR" "${BACKUP_NAME}_export" rm -rf "${BACKUP_DIR}/${BACKUP_NAME}_export" fi

Calculate backup size

BACKUP_SIZE=$(du -h "${BACKUP_DIR}/${BACKUP_NAME}.duckdb.gz" | cut -f1) log_message "Compressed backup size: $BACKUP_SIZE"

Clean up old backups

log_message "Cleaning up old backups (retention: $RETENTION_DAYS days)..." find "$BACKUP_DIR" -name "analytics_backup_*" -type f -mtime +$RETENTION_DAYS -exec rm -f {} \;

Verify backup integrity

log_message "Verifying backup integrity..." if gunzip -t "${BACKUP_DIR}/${BACKUP_NAME}.duckdb.gz" 2>/dev/null; then log_message "Backup verification successful" else log_message "ERROR: Backup verification failed" exit 1 fi

List current backups

log_message "Current backups:" ls -lh "$BACKUP_DIR"/analytics_backup_* | tee -a "$LOG_FILE" log_message "Backup process completed successfully" log_message "Backup location: ${BACKUP_DIR}/${BACKUP_NAME}.duckdb.gz"

Create data export utilities

Set up comprehensive data export functions for various formats and destinations.

#!/usr/bin/env python3
"""
DuckDB Data Export Utilities
Provides comprehensive export capabilities for analytical data
"""

import duckdb
import pandas as pd
import json
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict, Any

class DuckDBExporter:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self.conn = duckdb.connect(db_path)
        self.export_dir = Path.home() / 'duckdb-analytics' / 'exports'
        self.export_dir.mkdir(exist_ok=True)
    
    def export_table_to_csv(self, table_name: str, output_file: Optional[str] = None) -> str:
        """Export table to CSV format"""
        if output_file is None:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            output_file = str(self.export_dir / f'{table_name}_{timestamp}.csv')
        
        query = f"COPY {table_name} TO '{output_file}' (HEADER, DELIMITER ',')"
        self.conn.execute(query)
        
        print(f"Exported table '{table_name}' to {output_file}")
        return output_file
    
    def export_table_to_parquet(self, table_name: str, output_file: Optional[str] = None) -> str:
        """Export table to Parquet format"""
        if output_file is None:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            output_file = str(self.export_dir / f'{table_name}_{timestamp}.parquet')
        
        query = f"COPY {table_name} TO '{output_file}' (FORMAT PARQUET)"
        self.conn.execute(query)
        
        print(f"Exported table '{table_name}' to {output_file}")
        return output_file
    
    def export_query_results(self, query: str, output_file: str, format: str = 'csv') -> str:
        """Export query results to specified format"""
        if format.lower() == 'csv':
            export_query = f"COPY ({query}) TO '{output_file}' (HEADER, DELIMITER ',')"
        elif format.lower() == 'parquet':
            export_query = f"COPY ({query}) TO '{output_file}' (FORMAT PARQUET)"
        elif format.lower() == 'json':
            # For JSON, we'll use pandas
            df = self.conn.execute(query).fetchdf()
            df.to_json(output_file, orient='records', indent=2)
            return output_file
        else:
            raise ValueError(f"Unsupported format: {format}")
        
        self.conn.execute(export_query)
        print(f"Exported query results to {output_file}")
        return output_file
    
    def export_database_schema(self, output_file: Optional[str] = None) -> str:
        """Export complete database schema"""
        if output_file is None:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            output_file = str(self.export_dir / f'schema_export_{timestamp}.sql')
        
        # Get all tables
        tables = self.conn.execute("SHOW TABLES").fetchall()
        
        with open(output_file, 'w') as f:
            f.write(f"-- DuckDB Schema Export\n-- Generated: {datetime.now()}\n\n")
            
            for table in tables:
                table_name = table[0]
                
                # Get CREATE statement
                create_stmt = self.conn.execute(f"SHOW CREATE TABLE {table_name}").fetchone()[1]
                f.write(f"-- Table: {table_name}\n")
                f.write(f"{create_stmt};\n\n")
        
        print(f"Exported database schema to {output_file}")
        return output_file
    
    def export_table_statistics(self, output_file: Optional[str] = None) -> str:
        """Export table statistics and metadata"""
        if output_file is None:
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            output_file = str(self.export_dir / f'table_stats_{timestamp}.json')
        
        tables = self.conn.execute("SHOW TABLES").fetchall()
        stats = {
            'export_timestamp': datetime.now().isoformat(),
            'database_path': self.db_path,
            'tables': {}
        }
        
        for table in tables:
            table_name = table[0]
            
            # Get row count
            row_count = self.conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
            
            # Get column information
            columns = self.conn.execute(f"DESCRIBE {table_name}").fetchdf().to_dict('records')
            
            stats['tables'][table_name] = {
                'row_count': row_count,
                'columns': columns
            }
        
        with open(output_file, 'w') as f:
            json.dump(stats, f, indent=2, default=str)
        
        print(f"Exported table statistics to {output_file}")
        return output_file
    
    def bulk_export_tables(self, tables: List[str], formats: List[str] = ['csv', 'parquet']) -> Dict[str, List[str]]:
        """Export multiple tables in multiple formats"""
        results = {}
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        
        for table_name in tables:
            results[table_name] = []
            
            for format in formats:
                if format == 'csv':
                    output_file = str(self.export_dir / f'{table_name}_{timestamp}.csv')
                    self.export_table_to_csv(table_name, output_file)
                elif format == 'parquet':
                    output_file = str(self.export_dir / f'{table_name}_{timestamp}.parquet')
                    self.export_table_to_parquet(table_name, output_file)
                
                results[table_name].append(output_file)
        
        return results
    
    def close(self):
        """Close database connection"""
        self.conn.close()

if __name__ == '__main__':
    db_path = str(Path.home() / 'duckdb-analytics' / 'databases' / 'analytics.duckdb')
    exporter = DuckDBExporter(db_path)
    
    # Export database schema
    exporter.export_database_schema()
    
    # Export table statistics
    exporter.export_table_statistics()
    
    print("Export operations completed!")
    exporter.close()

Set up automated backup with systemd timer

Configure systemd timer to run automated backups on a regular schedule.

[Unit]
Description=DuckDB Analytics Backup Timer
Requires=duckdb-backup.service

[Timer]
OnCalendar=daily
Persistent=true
RandomizedDelaySec=1h

[Install]
WantedBy=timers.target

Create backup service unit

Create the systemd service unit that performs the actual backup operation.

[Unit]
Description=DuckDB Analytics Backup Service
After=network.target

[Service]
Type=oneshot
WorkingDirectory=%h/duckdb-analytics
ExecStart=%h/duckdb-analytics/scripts/backup_duckdb.sh
User=%i

[Install]
WantedBy=default.target

Enable and start backup timer

Make the backup scripts executable and enable the systemd timer for automated backups.

chmod +x ~/duckdb-analytics/scripts/backup_duckdb.sh
chmod +x ~/duckdb-analytics/scripts/data_export.py
chmod +x ~/duckdb-analytics/scripts/create_notebook.py

systemctl --user daemon-reload
systemctl --user enable --now duckdb-backup.timer
systemctl --user list-timers | grep duckdb

Verify your setup

# Check DuckDB CLI installation
duckdb --version

Activate Python environment and check DuckDB Python client

source ~/duckdb-analytics/duckdb_env/bin/activate python -c "import duckdb; print(f'DuckDB Python version: {duckdb.__version__}')"

Test database creation and basic operations

duckdb ~/duckdb-analytics/databases/test.duckdb <Run the example analytics script cd ~/duckdb-analytics source duckdb_env/bin/activate python scripts/example_analytics.py

Check backup timer status

systemctl --user status duckdb-backup.timer

Verify directory structure

ls -la ~/duckdb-analytics/ ls -la ~/duckdb-analytics/databases/ ls -la ~/duckdb-analytics/exports/
Note: DuckDB databases are single files that can be easily copied, moved, and shared. For production workloads, consider implementing regular backups to your automated backup system.

Common issues

SymptomCauseFix
"Memory limit exceeded" error Dataset larger than configured memory limit Increase memory_limit setting or optimize query with LIMIT clauses
Python import duckdb fails DuckDB not installed in current Python environment Activate virtual environment and run pip install duckdb
Permission denied on database file Incorrect file ownership or permissions Run chown $USER:$USER ~/duckdb-analytics/databases/*.duckdb
Slow query performance Suboptimal memory configuration or missing indexes Run performance monitor script and apply recommended settings
"No space left on device" during large queries Insufficient disk space for temporary files Set temp_directory to location with more space or increase disk capacity
Backup script fails Database file locked or insufficient permissions Ensure database connections are closed and run chmod +x backup_duckdb.sh
Jupyter notebook kernel crashes Memory exhaustion during large data operations Reduce dataset size, use LIMIT clauses, or increase system RAM
Important: DuckDB is designed for analytical workloads, not high-concurrency transactional applications. For OLTP workloads, consider PostgreSQL instead.

Next steps

Automated install script

Run this to automate the entire setup

#duckdb #analytical database #embedded database #duckdb python #olap database

Need help?

Don't want to manage this yourself?

We handle infrastructure for businesses that depend on uptime. From initial setup to ongoing operations.

Talk to an engineer