Configure Apache Airflow data lineage tracking with OpenLineage for comprehensive workflow observability

Advanced 45 min Apr 23, 2026 93 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Set up OpenLineage with Apache Airflow to track data lineage across workflows, providing comprehensive observability into data transformations, dependencies, and quality issues in production environments.

Prerequisites

  • Apache Airflow with PostgreSQL backend
  • Docker for Marquez deployment
  • Python 3.8+ with pip
  • Network access for package downloads

What this solves

Data lineage tracking helps you understand how data flows through your Airflow workflows, identifying dependencies between datasets and transformations. OpenLineage provides standardized lineage collection across different data processing tools, while Marquez offers visualization capabilities. This setup enables data governance, impact analysis, and debugging of complex data pipelines.

Prerequisites

You'll need a working Apache Airflow installation with PostgreSQL backend. If you don't have this setup, follow our Apache Airflow installation guide first.

Ensure your Airflow installation has internet access for downloading OpenLineage packages and connecting to external lineage backends.

Step-by-step configuration

Install OpenLineage Python packages

Install the OpenLineage client and Airflow integration packages in your Airflow environment.

pip install openlineage-airflow openlineage-python

Install and configure Marquez backend

Set up Marquez as the OpenLineage backend for storing and visualizing lineage data.

docker run -d \
  --name marquez \
  -p 3000:3000 \
  -p 5432:5432 \
  -e MARQUEZ_CONFIG=/usr/src/app/marquez.yml \
  marquezproject/marquez:latest

Create OpenLineage configuration

Configure OpenLineage to send lineage events to your Marquez instance.

transport:
  type: http
  url: http://localhost:3000
  endpoint: /api/v1/lineage
  timeout: 5000
  auth:
    type: api_key
    api_key: your-api-key-here

facets:
  spark_version: true
  spark_logicalPlan: true
  processing_engine: true
  schema: true
  datasource: true
  lifecycle: true
  ownership: true
  columnLineage: true

dataset:
  namespaceResolvers:
    - type: hostname
      hosts:
        - localhost
        - 127.0.0.1

Configure Airflow for OpenLineage

Add OpenLineage configuration to your Airflow configuration file to enable automatic lineage collection.

[openlineage]
openlineage_config_path = /opt/airflow/openlineage.yml
extracts = true
log_level = INFO
namespace = production_airflow
transport = http
url = http://localhost:3000

[core]
dags_folder = /opt/airflow/dags
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:password@localhost:5432/airflow

Install OpenLineage Airflow provider

Install the official OpenLineage provider package for enhanced Airflow integration.

pip install apache-airflow-providers-openlineage

Configure environment variables

Set up environment variables for OpenLineage configuration that Airflow can use.

OPENLINEAGE_CONFIG=/opt/airflow/openlineage.yml
OPENLINEAGE_NAMESPACE=production_airflow
OPENLINEAGE_URL=http://localhost:3000
AIRFLOW__OPENLINEAGE__CONFIG_PATH=/opt/airflow/openlineage.yml
AIRFLOW__OPENLINEAGE__TRANSPORT_TYPE=http

Create lineage-enabled DAG example

Create a sample DAG that demonstrates data lineage tracking with various operators.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from openlineage.airflow.extractors import OperatorLineage

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'data_lineage_example',
    default_args=default_args,
    description='DAG demonstrating data lineage tracking',
    schedule_interval=timedelta(days=1),
    catchup=False,
    tags=['lineage', 'example']
)

SQL transformation with lineage

create_staging_table = PostgresOperator( task_id='create_staging_table', postgres_conn_id='postgres_default', sql=""" CREATE TABLE IF NOT EXISTS staging.customer_metrics AS SELECT customer_id, COUNT(*) as order_count, SUM(total_amount) as total_spent, AVG(total_amount) as avg_order_value, MAX(order_date) as last_order_date FROM raw.orders WHERE order_date >= '{{ ds }}' GROUP BY customer_id; """, dag=dag ) def process_customer_data(**context): """ Python function with explicit lineage metadata """ hook = PostgresHook(postgres_conn_id='postgres_default') # Input dataset information for lineage input_datasets = [ {'namespace': 'postgresql://localhost:5432', 'name': 'staging.customer_metrics'} ] # Output dataset information output_datasets = [ {'namespace': 'postgresql://localhost:5432', 'name': 'analytics.customer_segments'} ] # Perform data processing sql = """ INSERT INTO analytics.customer_segments SELECT customer_id, CASE WHEN total_spent > 1000 THEN 'high_value' WHEN total_spent > 500 THEN 'medium_value' ELSE 'low_value' END as segment, order_count, total_spent, avg_order_value, last_order_date, '{{ ds }}' as processing_date FROM staging.customer_metrics; """ hook.run(sql) return { 'input_datasets': input_datasets, 'output_datasets': output_datasets, 'transformation': 'customer_segmentation' } process_segments = PythonOperator( task_id='process_customer_segments', python_callable=process_customer_data, dag=dag )

Data quality check with lineage

quality_check = PostgresOperator( task_id='data_quality_check', postgres_conn_id='postgres_default', sql=""" INSERT INTO data_quality.check_results SELECT 'customer_segments' as table_name, COUNT(*) as record_count, COUNT(CASE WHEN segment IS NULL THEN 1 END) as null_segments, '{{ ds }}' as check_date FROM analytics.customer_segments WHERE processing_date = '{{ ds }}'; """, dag=dag ) create_staging_table >> process_segments >> quality_check

Configure custom extractors for operators

Create custom lineage extractors for operators that don't have built-in lineage support.

from typing import List, Optional, Union
from openlineage.airflow.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.facets import (
    SchemaDatasetFacet,
    SchemaField,
    DataSourceDatasetFacet,
    LifecycleStateChangeDatasetFacet,
    LifecycleStateChange
)
from openlineage.client.run import Dataset
from airflow.models import BaseOperator

class CustomPostgresExtractor(BaseExtractor):
    """
    Custom extractor for PostgreSQL operations with enhanced lineage
    """
    
    @classmethod
    def get_operator_classnames(cls) -> List[str]:
        return ['PostgresOperator', 'PostgresInsertOperator']
    
    def extract(self) -> Optional[OperatorLineage]:
        """
        Extract lineage information from PostgreSQL operators
        """
        if not hasattr(self.operator, 'sql'):
            return None
            
        sql = self.operator.sql
        if not sql:
            return None
            
        # Parse SQL to identify input/output tables
        inputs = self._parse_input_tables(sql)
        outputs = self._parse_output_tables(sql)
        
        input_datasets = []
        output_datasets = []
        
        # Create input datasets
        for table in inputs:
            dataset = Dataset(
                namespace=f"postgresql://{self._get_connection_host()}",
                name=table,
                facets={
                    'dataSource': DataSourceDatasetFacet(
                        name='postgresql',
                        uri=f"postgresql://{self._get_connection_host()}"
                    ),
                    'schema': self._get_table_schema(table)
                }
            )
            input_datasets.append(dataset)
            
        # Create output datasets
        for table in outputs:
            dataset = Dataset(
                namespace=f"postgresql://{self._get_connection_host()}",
                name=table,
                facets={
                    'dataSource': DataSourceDatasetFacet(
                        name='postgresql',
                        uri=f"postgresql://{self._get_connection_host()}"
                    ),
                    'lifecycleStateChange': LifecycleStateChangeDatasetFacet(
                        lifecycleStateChange=LifecycleStateChange.CREATE
                    ),
                    'schema': self._get_table_schema(table)
                }
            )
            output_datasets.append(dataset)
            
        return OperatorLineage(
            inputs=input_datasets,
            outputs=output_datasets
        )
    
    def _parse_input_tables(self, sql: str) -> List[str]:
        """Parse SQL to extract input table names"""
        import re
        # Simple regex to find tables in FROM and JOIN clauses
        pattern = r'(?:FROM|JOIN)\s+([\w\.]+)'
        matches = re.findall(pattern, sql, re.IGNORECASE)
        return list(set(matches))
    
    def _parse_output_tables(self, sql: str) -> List[str]:
        """Parse SQL to extract output table names"""
        import re
        # Simple regex to find tables in INSERT INTO and CREATE TABLE
        pattern = r'(?:INSERT\s+INTO|CREATE\s+TABLE(?:\s+IF\s+NOT\s+EXISTS)?)\s+([\w\.]+)'
        matches = re.findall(pattern, sql, re.IGNORECASE)
        return list(set(matches))
    
    def _get_connection_host(self) -> str:
        """Get database host from connection"""
        from airflow.hooks.base import BaseHook
        try:
            conn = BaseHook.get_connection(self.operator.postgres_conn_id)
            return f"{conn.host}:{conn.port or 5432}"
        except:
            return "localhost:5432"
    
    def _get_table_schema(self, table_name: str) -> Optional[SchemaDatasetFacet]:
        """Get table schema information"""
        try:
            from airflow.providers.postgres.hooks.postgres import PostgresHook
            hook = PostgresHook(postgres_conn_id=self.operator.postgres_conn_id)
            
            # Get column information
            sql = """
            SELECT column_name, data_type, is_nullable
            FROM information_schema.columns 
            WHERE table_name = %s
            ORDER BY ordinal_position
            """
            
            table_only = table_name.split('.')[-1]  # Remove schema prefix if present
            result = hook.get_records(sql, parameters=[table_only])
            
            if not result:
                return None
                
            fields = []
            for row in result:
                field = SchemaField(
                    name=row[0],
                    type=row[1],
                    description=f"Column {row[0]} ({'nullable' if row[2] == 'YES' else 'not null'})"
                )
                fields.append(field)
                
            return SchemaDatasetFacet(fields=fields)
        except Exception:
            return None

Set up Marquez UI configuration

Configure the Marquez web UI for better lineage visualization and navigation.

server:
  applicationConnectors:
    - type: http
      port: 3000
      bindHost: 0.0.0.0
  adminConnectors:
    - type: http
      port: 3001
      bindHost: 0.0.0.0

database:
  driverClass: org.postgresql.Driver
  url: jdbc:postgresql://localhost:5432/marquez
  user: marquez
  password: marquez_password
  maxWaitForConnection: 1s
  validationQuery: SELECT 1
  validationQueryTimeout: 3s
  minSize: 8
  maxSize: 32
  checkConnectionWhileIdle: false
  evictionInterval: 10s
  minIdleTime: 1m

logging:
  level: INFO
  loggers:
    marquez: INFO
    org.eclipse.jetty: WARN
  appenders:
    - type: console
      threshold: ALL
      timeZone: UTC
      target: stdout
      logFormat: "%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n"

migrateOnStartup: true

lineage:
  writeEnabled: true
  readEnabled: true

Configure lineage for custom operators

Create configuration for tracking lineage in custom operators and external integrations.

from openlineage.airflow import DAGLineageExtractor
from openlineage.client.run import RunEvent, RunState, Job, Run
from openlineage.client.facets import JobFacet, RunFacet
from typing import List, Dict, Any
from datetime import datetime

class CustomLineageConfig:
    """
    Configuration class for custom lineage tracking
    """
    
    def __init__(self, namespace: str = "airflow"):
        self.namespace = namespace
        
    def create_job_facets(self, 
                         task_id: str, 
                         dag_id: str, 
                         owner: str = None,
                         tags: List[str] = None) -> Dict[str, JobFacet]:
        """
        Create job facets with metadata
        """
        facets = {}
        
        if owner:
            facets['ownership'] = JobFacet(
                _producer="custom_lineage",
                _schemaURL="custom_schema",
                properties={'owner': owner, 'dag_id': dag_id}
            )
            
        if tags:
            facets['tags'] = JobFacet(
                _producer="custom_lineage",
                _schemaURL="custom_schema",
                properties={'tags': tags}
            )
            
        return facets
    
    def create_run_facets(self, 
                         execution_date: datetime,
                         task_instance: Any = None) -> Dict[str, RunFacet]:
        """
        Create run facets with execution metadata
        """
        facets = {}
        
        facets['processing'] = RunFacet(
            _producer="custom_lineage",
            _schemaURL="custom_schema",
            properties={
                'execution_date': execution_date.isoformat(),
                'processing_engine': 'airflow'
            }
        )
        
        if task_instance:
            facets['performance'] = RunFacet(
                _producer="custom_lineage",
                _schemaURL="custom_schema",
                properties={
                    'start_time': task_instance.start_date.isoformat() if task_instance.start_date else None,
                    'end_time': task_instance.end_date.isoformat() if task_instance.end_date else None,
                    'duration_seconds': (task_instance.end_date - task_instance.start_date).total_seconds() if task_instance.end_date and task_instance.start_date else None
                }
            )
            
        return facets
    
    def create_lineage_event(self,
                           job_name: str,
                           run_id: str,
                           run_state: RunState,
                           inputs: List[Dict] = None,
                           outputs: List[Dict] = None,
                           job_facets: Dict[str, JobFacet] = None,
                           run_facets: Dict[str, RunFacet] = None) -> RunEvent:
        """
        Create a complete lineage event
        """
        job = Job(
            namespace=self.namespace,
            name=job_name,
            facets=job_facets or {}
        )
        
        run = Run(
            runId=run_id,
            facets=run_facets or {}
        )
        
        event = RunEvent(
            eventType=run_state,
            eventTime=datetime.utcnow().isoformat(),
            run=run,
            job=job,
            inputs=inputs or [],
            outputs=outputs or [],
            producer="custom_airflow_lineage",
            schemaURL="https://openlineage.io/spec/1.0.0/OpenLineage.json"
        )
        
        return event

Global configuration instance

lineage_config = CustomLineageConfig(namespace="production_airflow")

Restart Airflow services

Restart Airflow to load the OpenLineage configuration and custom extractors.

sudo systemctl stop airflow-webserver
sudo systemctl stop airflow-scheduler
sudo systemctl start airflow-scheduler
sudo systemctl start airflow-webserver
sudo systemctl status airflow-webserver airflow-scheduler

Configure advanced lineage features

Set up column-level lineage

Configure detailed column-level lineage tracking for better data governance and impact analysis.

from openlineage.client.facets import ColumnLineageDatasetFacet, Fields, InputField
from openlineage.client.run import Dataset
from typing import List, Dict

def create_column_lineage_facet(
    input_fields: List[Dict[str, str]],
    output_fields: List[Dict[str, str]],
    transformations: Dict[str, List[str]]
) -> ColumnLineageDatasetFacet:
    """
    Create column-level lineage facet
    
    Args:
        input_fields: List of {"name": "field_name", "namespace": "dataset_namespace"}
        output_fields: List of {"name": "field_name"}
        transformations: Dict mapping output fields to input fields
    """
    
    fields = {}
    
    for output_field in output_fields:
        field_name = output_field["name"]
        
        if field_name in transformations:
            input_field_names = transformations[field_name]
            
            input_fields_lineage = []
            for input_field_name in input_field_names:
                # Find the corresponding input field with namespace
                for input_field in input_fields:
                    if input_field["name"] == input_field_name:
                        input_fields_lineage.append(
                            InputField(
                                namespace=input_field["namespace"],
                                name=input_field["name"],
                                field=input_field_name
                            )
                        )
                        break
            
            fields[field_name] = Fields(inputFields=input_fields_lineage)
    
    return ColumnLineageDatasetFacet(fields=fields)

Example usage in DAG

def create_customer_summary_with_column_lineage(**context): """ Example function demonstrating column-level lineage """ # Define input and output field mappings input_fields = [ {"name": "customer_id", "namespace": "postgresql://localhost:5432/raw.customers"}, {"name": "first_name", "namespace": "postgresql://localhost:5432/raw.customers"}, {"name": "last_name", "namespace": "postgresql://localhost:5432/raw.customers"}, {"name": "order_id", "namespace": "postgresql://localhost:5432/raw.orders"}, {"name": "total_amount", "namespace": "postgresql://localhost:5432/raw.orders"}, {"name": "order_date", "namespace": "postgresql://localhost:5432/raw.orders"} ] output_fields = [ {"name": "customer_id"}, {"name": "full_name"}, {"name": "total_orders"}, {"name": "total_spent"}, {"name": "last_order_date"} ] # Define transformations (which input fields contribute to each output field) transformations = { "customer_id": ["customer_id"], "full_name": ["first_name", "last_name"], "total_orders": ["order_id"], "total_spent": ["total_amount"], "last_order_date": ["order_date"] } # Create column lineage facet column_lineage_facet = create_column_lineage_facet( input_fields=input_fields, output_fields=output_fields, transformations=transformations ) # Execute the actual SQL transformation from airflow.providers.postgres.hooks.postgres import PostgresHook hook = PostgresHook(postgres_conn_id='postgres_default') sql = """ INSERT INTO analytics.customer_summary SELECT c.customer_id, CONCAT(c.first_name, ' ', c.last_name) as full_name, COUNT(o.order_id) as total_orders, COALESCE(SUM(o.total_amount), 0) as total_spent, MAX(o.order_date) as last_order_date FROM raw.customers c LEFT JOIN raw.orders o ON c.customer_id = o.customer_id WHERE o.order_date >= '{{ ds }}' GROUP BY c.customer_id, c.first_name, c.last_name; """ hook.run(sql) return { 'column_lineage': column_lineage_facet, 'input_datasets': ['raw.customers', 'raw.orders'], 'output_datasets': ['analytics.customer_summary'] }

Configure data quality lineage

Set up lineage tracking for data quality checks and validation processes.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from openlineage.client.facets import DataQualityMetricsInputDatasetFacet, DataQualityAssertionsDatasetFacet
from openlineage.client.run import Dataset

def run_data_quality_checks(**context):
    """
    Run data quality checks with lineage tracking
    """
    from airflow.providers.postgres.hooks.postgres import PostgresHook
    hook = PostgresHook(postgres_conn_id='postgres_default')
    
    # Define quality checks
    quality_checks = [
        {
            'name': 'null_customer_id_check',
            'sql': 'SELECT COUNT(*) FROM analytics.customer_summary WHERE customer_id IS NULL',
            'assertion': 'value = 0',
            'description': 'Customer ID should never be null'
        },
        {
            'name': 'negative_total_spent_check',
            'sql': 'SELECT COUNT(*) FROM analytics.customer_summary WHERE total_spent < 0',
            'assertion': 'value = 0',
            'description': 'Total spent should not be negative'
        },
        {
            'name': 'future_order_date_check',
            'sql': 'SELECT COUNT(*) FROM analytics.customer_summary WHERE last_order_date > CURRENT_DATE',
            'assertion': 'value = 0',
            'description': 'Order dates should not be in the future'
        }
    ]
    
    quality_results = []
    
    for check in quality_checks:
        result = hook.get_first(check['sql'])
        passed = eval(check['assertion'].replace('value', str(result[0])))
        
        quality_results.append({
            'check_name': check['name'],
            'result_value': result[0],
            'passed': passed,
            'assertion': check['assertion'],
            'description': check['description']
        })
        
        # Log quality check result
        print(f"Quality check '{check['name']}': {'PASSED' if passed else 'FAILED'} (value: {result[0]})")
    
    # Create data quality facets for lineage
    quality_metrics = {
        'rowCount': hook.get_first('SELECT COUNT(*) FROM analytics.customer_summary')[0],
        'nullValues': {
            'customer_id': hook.get_first('SELECT COUNT(*) FROM analytics.customer_summary WHERE customer_id IS NULL')[0],
            'full_name': hook.get_first('SELECT COUNT(*) FROM analytics.customer_summary WHERE full_name IS NULL')[0]
        },
        'uniqueValues': {
            'customer_id': hook.get_first('SELECT COUNT(DISTINCT customer_id) FROM analytics.customer_summary')[0]
        },
        'qualityScore': sum(1 for r in quality_results if r['passed']) / len(quality_results)
    }
    
    return {
        'quality_results': quality_results,
        'quality_metrics': quality_metrics,
        'dataset': 'analytics.customer_summary'
    }

default_args = {
    'owner': 'data-quality-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'data_quality_lineage',
    default_args=default_args,
    description='Data quality checks with lineage tracking',
    schedule_interval=timedelta(hours=6),
    catchup=False,
    tags=['data-quality', 'lineage']
)

quality_checks = PythonOperator(
    task_id='run_quality_checks',
    python_callable=run_data_quality_checks,
    dag=dag
)

Store quality results with lineage

store_results = PostgresOperator( task_id='store_quality_results', postgres_conn_id='postgres_default', sql=""" INSERT INTO data_quality.check_results ( table_name, check_date, total_checks, passed_checks, quality_score, details ) SELECT 'analytics.customer_summary' as table_name, '{{ ds }}' as check_date, {{ ti.xcom_pull(task_ids='run_quality_checks', key='return_value')["quality_results"]|length }} as total_checks, {{ ti.xcom_pull(task_ids='run_quality_checks', key='return_value')["quality_results"]|selectattr('passed')|list|length }} as passed_checks, {{ ti.xcom_pull(task_ids='run_quality_checks', key='return_value')["quality_metrics"]["qualityScore"] }} as quality_score, '{{ ti.xcom_pull(task_ids='run_quality_checks', key='return_value')|tojson }}' as details; """, dag=dag ) quality_checks >> store_results

Set up lineage monitoring and alerts

Configure monitoring for lineage collection and create alerts for lineage failures.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
import requests
import json

def check_lineage_health(**context):
    """
    Check the health of lineage collection and Marquez backend
    """
    health_status = {
        'marquez_api': False,
        'lineage_events': False,
        'recent_datasets': 0,
        'recent_jobs': 0,
        'issues': []
    }
    
    # Check Marquez API health
    try:
        response = requests.get('http://localhost:3000/api/v1/namespaces', timeout=10)
        if response.status_code == 200:
            health_status['marquez_api'] = True
        else:
            health_status['issues'].append(f"Marquez API returned status {response.status_code}")
    except Exception as e:
        health_status['issues'].append(f"Cannot connect to Marquez API: {str(e)}")
    
    # Check recent lineage events
    try:
        # Query Marquez for recent datasets and jobs
        response = requests.get('http://localhost:3000/api/v1/namespaces/production_airflow/datasets', timeout=10)
        if response.status_code == 200:
            datasets = response.json()['datasets']
            health_status['recent_datasets'] = len([d for d in datasets if 
                datetime.fromisoformat(d.get('updatedAt', '2020-01-01T00:00:00')).date() >= 
                (datetime.now() - timedelta(days=1)).date()])
        
        response = requests.get('http://localhost:3000/api/v1/namespaces/production_airflow/jobs', timeout=10)
        if response.status_code == 200:
            jobs = response.json()['jobs']
            health_status['recent_jobs'] = len([j for j in jobs if 
                datetime.fromisoformat(j.get('updatedAt', '2020-01-01T00:00:00')).date() >= 
                (datetime.now() - timedelta(days=1)).date()])
            
        if health_status['recent_datasets'] > 0 and health_status['recent_jobs'] > 0:
            health_status['lineage_events'] = True
        else:
            health_status['issues'].append(f"Low lineage activity: {health_status['recent_datasets']} datasets, {health_status['recent_jobs']} jobs updated in last 24h")
            
    except Exception as e:
        health_status['issues'].append(f"Error checking lineage events: {str(e)}")
    
    # Check for stale datasets (not updated in 48 hours)
    try:
        response = requests.get('http://localhost:3000/api/v1/namespaces/production_airflow/datasets', timeout=10)
        if response.status_code == 200:
            datasets = response.json()['datasets']
            stale_datasets = [d['name'] for d in datasets if 
                datetime.fromisoformat(d.get('updatedAt', '2020-01-01T00:00:00')).date() < 
                (datetime.now() - timedelta(days=2)).date()]
            
            if stale_datasets:
                health_status['issues'].append(f"Stale datasets (>48h): {', '.join(stale_datasets[:5])}{'...' if len(stale_datasets) > 5 else ''}")
                
    except Exception as e:
        health_status['issues'].append(f"Error checking stale datasets: {str(e)}")
    
    # Store health check results
    hook = PostgresHook(postgres_conn_id='postgres_default')
    hook.run("""
        INSERT INTO monitoring.lineage_health_checks 
        (check_date, marquez_api_status, lineage_events_status, recent_datasets, recent_jobs, issues)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, parameters=[
        datetime.now().date(),
        health_status['marquez_api'],
        health_status['lineage_events'],
        health_status['recent_datasets'],
        health_status['recent_jobs'],
        json.dumps(health_status['issues'])
    ])
    
    # Determine if we need to send an alert
    critical_issues = not health_status['marquez_api'] or len(health_status['issues']) > 2
    
    return {
        'health_status': health_status,
        'needs_alert': critical_issues,
        'summary': f"Marquez: {'OK' if health_status['marquez_api'] else 'DOWN'}, Events: {'OK' if health_status['lineage_events'] else 'LOW'}, Issues: {len(health_status['issues'])}"
    }

def generate_lineage_report(**context):
    """
    Generate a summary report of lineage collection
    """
    try:
        # Get namespace overview
        response = requests.get('http://localhost:3000/api/v1/namespaces/production_airflow', timeout=10)
        namespace_info = response.json() if response.status_code == 200 else {}
        
        # Get dataset count
        response = requests.get('http://localhost:3000/api/v1/namespaces/production_airflow/datasets', timeout=10)
        datasets = response.json().get('datasets', []) if response.status_code == 200 else []
        
        # Get job count
        response = requests.get('http://localhost:3000/api/v1/namespaces/production_airflow/jobs', timeout=10)
        jobs = response.json().get('jobs', []) if response.status_code == 200 else []
        
        report = {
            'namespace': 'production_airflow',
            'total_datasets': len(datasets),
            'total_jobs': len(jobs),
            'recent_datasets': len([d for d in datasets if 
                datetime.fromisoformat(d.get('updatedAt', '2020-01-01T00:00:00')).date() >= 
                (datetime.now() - timedelta(days=7)).date()]),
            'recent_jobs': len([j for j in jobs if 
                datetime.fromisoformat(j.get('updatedAt', '2020-01-01T00:00:00')).date() >= 
                (datetime.now() - timedelta(days=7)).date()]),
            'top_datasets': sorted(datasets, key=lambda x: x.get('updatedAt', ''), reverse=True)[:10]
        }
        
        return report
        
    except Exception as e:
        return {'error': f"Failed to generate report: {str(e)}"}

default_args = {
    'owner': 'data-ops',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'lineage_monitoring',
    default_args=default_args,
    description='Monitor lineage collection health and generate reports',
    schedule_interval=timedelta(hours=4),
    catchup=False,
    tags=['monitoring', 'lineage']
)

health_check = PythonOperator(
    task_id='check_lineage_health',
    python_callable=check_lineage_health,
    dag=dag
)

generate_report = PythonOperator(
    task_id='generate_lineage_report',
    python_callable=generate_lineage_report,
    dag=dag
)

Conditional email alert

send_alert = EmailOperator( task_id='send_alert_email', to=['data-ops@example.com'], subject='Lineage System Alert - {{ ds }}', html_content="""

Lineage System Alert

Date: {{ ds }}

Status: {{ ti.xcom_pull(task_ids='check_lineage_health', key='return_value')["summary"] }}

Issues Detected:

    {% for issue in ti.xcom_pull(task_ids='check_lineage_health', key='return_value')["health_status"]["issues"] %}
  • {{ issue }}
  • {% endfor %}

Lineage Report:

  • Total Datasets: {{ ti.xcom_pull(task_ids='generate_lineage_report', key='return_value')["total_datasets"] }}
  • Total Jobs: {{ ti.xcom_pull(task_ids='generate_lineage_report', key='return_value')["total_jobs"] }}
  • Recent Datasets (7d): {{ ti.xcom_pull(task_ids='generate_lineage_report', key='return_value')["recent_datasets"] }}
  • Recent Jobs (7d): {{ ti.xcom_pull(task_ids='generate_lineage_report', key='return_value')["recent_jobs"] }}

Check the Marquez UI for detailed lineage visualization.

""", dag=dag )

Only send alert if there are critical issues

send_alert.set_upstream(health_check) send_alert.set_upstream(generate_report)

Add condition to only run alert if needed

from airflow.operators.dummy_operator import DummyOperator from airflow.utils.trigger_rule import TriggerRule alert_condition = DummyOperator( task_id='alert_condition', dag=dag ) health_check >> alert_condition generate_report >> alert_condition alert_condition >> send_alert

Verify your setup

Test the lineage configuration by running a sample DAG and checking that events appear in Marquez.

# Check Airflow is running with OpenLineage
sudo systemctl status airflow-scheduler airflow-webserver

Verify OpenLineage configuration

cat /opt/airflow/openlineage.yml

Check Marquez is accessible

curl -s http://localhost:3000/api/v1/namespaces | jq .

Test lineage event submission

curl -X POST http://localhost:3000/api/v1/lineage \ -H "Content-Type: application/json" \ -d '{"eventType": "START", "eventTime": "2024-01-01T12:00:00.000Z", "run": {"runId": "test-run"}, "job": {"namespace": "test", "name": "test-job"}, "inputs": [], "outputs": [], "producer": "test"}'

Access the Marquez web UI at http://localhost:3000 to view lineage graphs and explore dataset relationships. You should see your Airflow DAGs appear as jobs with input/output datasets when they run.

Note: If lineage events aren't appearing, check the Airflow logs for OpenLineage errors and verify the Marquez database connection is working properly.

Common issues

SymptomCauseFix
No lineage events in MarquezOpenLineage config not loadedVerify /opt/airflow/openlineage.yml path in airflow.cfg and restart services
Marquez UI shows connection errorDatabase not initializedRun docker exec marquez ./entrypoint.sh db migrate
Custom extractors not workingPlugin path not configuredEnsure custom extractors are in /opt/airflow/plugins/ directory
Column lineage not appearingSQL parsing failedReview custom extractor regex patterns and test SQL parsing logic
Performance issues with lineageToo many lineage eventsConfigure sampling in OpenLineage config or filter events by DAG tags
Authentication errors to MarquezMissing API keyAdd proper authentication to OpenLineage transport configuration

Next steps

Running this in production?

Want this handled for you? Setting up data lineage tracking once is straightforward. Keeping it scaled, monitored, and integrated across your entire data stack is the harder part. See how we run infrastructure like this for European data teams.

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.