Set up OpenLineage with Apache Airflow to track data lineage across workflows, providing comprehensive observability into data transformations, dependencies, and quality issues in production environments.
Prerequisites
- Apache Airflow with PostgreSQL backend
- Docker for Marquez deployment
- Python 3.8+ with pip
- Network access for package downloads
What this solves
Data lineage tracking helps you understand how data flows through your Airflow workflows, identifying dependencies between datasets and transformations. OpenLineage provides standardized lineage collection across different data processing tools, while Marquez offers visualization capabilities. This setup enables data governance, impact analysis, and debugging of complex data pipelines.
Prerequisites
You'll need a working Apache Airflow installation with PostgreSQL backend. If you don't have this setup, follow our Apache Airflow installation guide first.
Ensure your Airflow installation has internet access for downloading OpenLineage packages and connecting to external lineage backends.
Step-by-step configuration
Install OpenLineage Python packages
Install the OpenLineage client and Airflow integration packages in your Airflow environment.
pip install openlineage-airflow openlineage-pythonInstall and configure Marquez backend
Set up Marquez as the OpenLineage backend for storing and visualizing lineage data.
docker run -d \
--name marquez \
-p 3000:3000 \
-p 5432:5432 \
-e MARQUEZ_CONFIG=/usr/src/app/marquez.yml \
marquezproject/marquez:latestCreate OpenLineage configuration
Configure OpenLineage to send lineage events to your Marquez instance.
transport:
type: http
url: http://localhost:3000
endpoint: /api/v1/lineage
timeout: 5000
auth:
type: api_key
api_key: your-api-key-here
facets:
spark_version: true
spark_logicalPlan: true
processing_engine: true
schema: true
datasource: true
lifecycle: true
ownership: true
columnLineage: true
dataset:
namespaceResolvers:
- type: hostname
hosts:
- localhost
- 127.0.0.1Configure Airflow for OpenLineage
Add OpenLineage configuration to your Airflow configuration file to enable automatic lineage collection.
[openlineage]
openlineage_config_path = /opt/airflow/openlineage.yml
extracts = true
log_level = INFO
namespace = production_airflow
transport = http
url = http://localhost:3000
[core]
dags_folder = /opt/airflow/dags
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:password@localhost:5432/airflowInstall OpenLineage Airflow provider
Install the official OpenLineage provider package for enhanced Airflow integration.
pip install apache-airflow-providers-openlineageConfigure environment variables
Set up environment variables for OpenLineage configuration that Airflow can use.
OPENLINEAGE_CONFIG=/opt/airflow/openlineage.yml
OPENLINEAGE_NAMESPACE=production_airflow
OPENLINEAGE_URL=http://localhost:3000
AIRFLOW__OPENLINEAGE__CONFIG_PATH=/opt/airflow/openlineage.yml
AIRFLOW__OPENLINEAGE__TRANSPORT_TYPE=httpCreate lineage-enabled DAG example
Create a sample DAG that demonstrates data lineage tracking with various operators.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from openlineage.airflow.extractors import OperatorLineage
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'data_lineage_example',
default_args=default_args,
description='DAG demonstrating data lineage tracking',
schedule_interval=timedelta(days=1),
catchup=False,
tags=['lineage', 'example']
)
SQL transformation with lineage
create_staging_table = PostgresOperator(
task_id='create_staging_table',
postgres_conn_id='postgres_default',
sql="""
CREATE TABLE IF NOT EXISTS staging.customer_metrics AS
SELECT
customer_id,
COUNT(*) as order_count,
SUM(total_amount) as total_spent,
AVG(total_amount) as avg_order_value,
MAX(order_date) as last_order_date
FROM raw.orders
WHERE order_date >= '{{ ds }}'
GROUP BY customer_id;
""",
dag=dag
)
def process_customer_data(**context):
"""
Python function with explicit lineage metadata
"""
hook = PostgresHook(postgres_conn_id='postgres_default')
# Input dataset information for lineage
input_datasets = [
{'namespace': 'postgresql://localhost:5432', 'name': 'staging.customer_metrics'}
]
# Output dataset information
output_datasets = [
{'namespace': 'postgresql://localhost:5432', 'name': 'analytics.customer_segments'}
]
# Perform data processing
sql = """
INSERT INTO analytics.customer_segments
SELECT
customer_id,
CASE
WHEN total_spent > 1000 THEN 'high_value'
WHEN total_spent > 500 THEN 'medium_value'
ELSE 'low_value'
END as segment,
order_count,
total_spent,
avg_order_value,
last_order_date,
'{{ ds }}' as processing_date
FROM staging.customer_metrics;
"""
hook.run(sql)
return {
'input_datasets': input_datasets,
'output_datasets': output_datasets,
'transformation': 'customer_segmentation'
}
process_segments = PythonOperator(
task_id='process_customer_segments',
python_callable=process_customer_data,
dag=dag
)
Data quality check with lineage
quality_check = PostgresOperator(
task_id='data_quality_check',
postgres_conn_id='postgres_default',
sql="""
INSERT INTO data_quality.check_results
SELECT
'customer_segments' as table_name,
COUNT(*) as record_count,
COUNT(CASE WHEN segment IS NULL THEN 1 END) as null_segments,
'{{ ds }}' as check_date
FROM analytics.customer_segments
WHERE processing_date = '{{ ds }}';
""",
dag=dag
)
create_staging_table >> process_segments >> quality_checkConfigure custom extractors for operators
Create custom lineage extractors for operators that don't have built-in lineage support.
from typing import List, Optional, Union
from openlineage.airflow.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.facets import (
SchemaDatasetFacet,
SchemaField,
DataSourceDatasetFacet,
LifecycleStateChangeDatasetFacet,
LifecycleStateChange
)
from openlineage.client.run import Dataset
from airflow.models import BaseOperator
class CustomPostgresExtractor(BaseExtractor):
"""
Custom extractor for PostgreSQL operations with enhanced lineage
"""
@classmethod
def get_operator_classnames(cls) -> List[str]:
return ['PostgresOperator', 'PostgresInsertOperator']
def extract(self) -> Optional[OperatorLineage]:
"""
Extract lineage information from PostgreSQL operators
"""
if not hasattr(self.operator, 'sql'):
return None
sql = self.operator.sql
if not sql:
return None
# Parse SQL to identify input/output tables
inputs = self._parse_input_tables(sql)
outputs = self._parse_output_tables(sql)
input_datasets = []
output_datasets = []
# Create input datasets
for table in inputs:
dataset = Dataset(
namespace=f"postgresql://{self._get_connection_host()}",
name=table,
facets={
'dataSource': DataSourceDatasetFacet(
name='postgresql',
uri=f"postgresql://{self._get_connection_host()}"
),
'schema': self._get_table_schema(table)
}
)
input_datasets.append(dataset)
# Create output datasets
for table in outputs:
dataset = Dataset(
namespace=f"postgresql://{self._get_connection_host()}",
name=table,
facets={
'dataSource': DataSourceDatasetFacet(
name='postgresql',
uri=f"postgresql://{self._get_connection_host()}"
),
'lifecycleStateChange': LifecycleStateChangeDatasetFacet(
lifecycleStateChange=LifecycleStateChange.CREATE
),
'schema': self._get_table_schema(table)
}
)
output_datasets.append(dataset)
return OperatorLineage(
inputs=input_datasets,
outputs=output_datasets
)
def _parse_input_tables(self, sql: str) -> List[str]:
"""Parse SQL to extract input table names"""
import re
# Simple regex to find tables in FROM and JOIN clauses
pattern = r'(?:FROM|JOIN)\s+([\w\.]+)'
matches = re.findall(pattern, sql, re.IGNORECASE)
return list(set(matches))
def _parse_output_tables(self, sql: str) -> List[str]:
"""Parse SQL to extract output table names"""
import re
# Simple regex to find tables in INSERT INTO and CREATE TABLE
pattern = r'(?:INSERT\s+INTO|CREATE\s+TABLE(?:\s+IF\s+NOT\s+EXISTS)?)\s+([\w\.]+)'
matches = re.findall(pattern, sql, re.IGNORECASE)
return list(set(matches))
def _get_connection_host(self) -> str:
"""Get database host from connection"""
from airflow.hooks.base import BaseHook
try:
conn = BaseHook.get_connection(self.operator.postgres_conn_id)
return f"{conn.host}:{conn.port or 5432}"
except:
return "localhost:5432"
def _get_table_schema(self, table_name: str) -> Optional[SchemaDatasetFacet]:
"""Get table schema information"""
try:
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id=self.operator.postgres_conn_id)
# Get column information
sql = """
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = %s
ORDER BY ordinal_position
"""
table_only = table_name.split('.')[-1] # Remove schema prefix if present
result = hook.get_records(sql, parameters=[table_only])
if not result:
return None
fields = []
for row in result:
field = SchemaField(
name=row[0],
type=row[1],
description=f"Column {row[0]} ({'nullable' if row[2] == 'YES' else 'not null'})"
)
fields.append(field)
return SchemaDatasetFacet(fields=fields)
except Exception:
return NoneSet up Marquez UI configuration
Configure the Marquez web UI for better lineage visualization and navigation.
server:
applicationConnectors:
- type: http
port: 3000
bindHost: 0.0.0.0
adminConnectors:
- type: http
port: 3001
bindHost: 0.0.0.0
database:
driverClass: org.postgresql.Driver
url: jdbc:postgresql://localhost:5432/marquez
user: marquez
password: marquez_password
maxWaitForConnection: 1s
validationQuery: SELECT 1
validationQueryTimeout: 3s
minSize: 8
maxSize: 32
checkConnectionWhileIdle: false
evictionInterval: 10s
minIdleTime: 1m
logging:
level: INFO
loggers:
marquez: INFO
org.eclipse.jetty: WARN
appenders:
- type: console
threshold: ALL
timeZone: UTC
target: stdout
logFormat: "%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n"
migrateOnStartup: true
lineage:
writeEnabled: true
readEnabled: trueConfigure lineage for custom operators
Create configuration for tracking lineage in custom operators and external integrations.
from openlineage.airflow import DAGLineageExtractor
from openlineage.client.run import RunEvent, RunState, Job, Run
from openlineage.client.facets import JobFacet, RunFacet
from typing import List, Dict, Any
from datetime import datetime
class CustomLineageConfig:
"""
Configuration class for custom lineage tracking
"""
def __init__(self, namespace: str = "airflow"):
self.namespace = namespace
def create_job_facets(self,
task_id: str,
dag_id: str,
owner: str = None,
tags: List[str] = None) -> Dict[str, JobFacet]:
"""
Create job facets with metadata
"""
facets = {}
if owner:
facets['ownership'] = JobFacet(
_producer="custom_lineage",
_schemaURL="custom_schema",
properties={'owner': owner, 'dag_id': dag_id}
)
if tags:
facets['tags'] = JobFacet(
_producer="custom_lineage",
_schemaURL="custom_schema",
properties={'tags': tags}
)
return facets
def create_run_facets(self,
execution_date: datetime,
task_instance: Any = None) -> Dict[str, RunFacet]:
"""
Create run facets with execution metadata
"""
facets = {}
facets['processing'] = RunFacet(
_producer="custom_lineage",
_schemaURL="custom_schema",
properties={
'execution_date': execution_date.isoformat(),
'processing_engine': 'airflow'
}
)
if task_instance:
facets['performance'] = RunFacet(
_producer="custom_lineage",
_schemaURL="custom_schema",
properties={
'start_time': task_instance.start_date.isoformat() if task_instance.start_date else None,
'end_time': task_instance.end_date.isoformat() if task_instance.end_date else None,
'duration_seconds': (task_instance.end_date - task_instance.start_date).total_seconds() if task_instance.end_date and task_instance.start_date else None
}
)
return facets
def create_lineage_event(self,
job_name: str,
run_id: str,
run_state: RunState,
inputs: List[Dict] = None,
outputs: List[Dict] = None,
job_facets: Dict[str, JobFacet] = None,
run_facets: Dict[str, RunFacet] = None) -> RunEvent:
"""
Create a complete lineage event
"""
job = Job(
namespace=self.namespace,
name=job_name,
facets=job_facets or {}
)
run = Run(
runId=run_id,
facets=run_facets or {}
)
event = RunEvent(
eventType=run_state,
eventTime=datetime.utcnow().isoformat(),
run=run,
job=job,
inputs=inputs or [],
outputs=outputs or [],
producer="custom_airflow_lineage",
schemaURL="https://openlineage.io/spec/1.0.0/OpenLineage.json"
)
return event
Global configuration instance
lineage_config = CustomLineageConfig(namespace="production_airflow")Restart Airflow services
Restart Airflow to load the OpenLineage configuration and custom extractors.
sudo systemctl stop airflow-webserver
sudo systemctl stop airflow-scheduler
sudo systemctl start airflow-scheduler
sudo systemctl start airflow-webserver
sudo systemctl status airflow-webserver airflow-schedulerConfigure advanced lineage features
Set up column-level lineage
Configure detailed column-level lineage tracking for better data governance and impact analysis.
from openlineage.client.facets import ColumnLineageDatasetFacet, Fields, InputField
from openlineage.client.run import Dataset
from typing import List, Dict
def create_column_lineage_facet(
input_fields: List[Dict[str, str]],
output_fields: List[Dict[str, str]],
transformations: Dict[str, List[str]]
) -> ColumnLineageDatasetFacet:
"""
Create column-level lineage facet
Args:
input_fields: List of {"name": "field_name", "namespace": "dataset_namespace"}
output_fields: List of {"name": "field_name"}
transformations: Dict mapping output fields to input fields
"""
fields = {}
for output_field in output_fields:
field_name = output_field["name"]
if field_name in transformations:
input_field_names = transformations[field_name]
input_fields_lineage = []
for input_field_name in input_field_names:
# Find the corresponding input field with namespace
for input_field in input_fields:
if input_field["name"] == input_field_name:
input_fields_lineage.append(
InputField(
namespace=input_field["namespace"],
name=input_field["name"],
field=input_field_name
)
)
break
fields[field_name] = Fields(inputFields=input_fields_lineage)
return ColumnLineageDatasetFacet(fields=fields)
Example usage in DAG
def create_customer_summary_with_column_lineage(**context):
"""
Example function demonstrating column-level lineage
"""
# Define input and output field mappings
input_fields = [
{"name": "customer_id", "namespace": "postgresql://localhost:5432/raw.customers"},
{"name": "first_name", "namespace": "postgresql://localhost:5432/raw.customers"},
{"name": "last_name", "namespace": "postgresql://localhost:5432/raw.customers"},
{"name": "order_id", "namespace": "postgresql://localhost:5432/raw.orders"},
{"name": "total_amount", "namespace": "postgresql://localhost:5432/raw.orders"},
{"name": "order_date", "namespace": "postgresql://localhost:5432/raw.orders"}
]
output_fields = [
{"name": "customer_id"},
{"name": "full_name"},
{"name": "total_orders"},
{"name": "total_spent"},
{"name": "last_order_date"}
]
# Define transformations (which input fields contribute to each output field)
transformations = {
"customer_id": ["customer_id"],
"full_name": ["first_name", "last_name"],
"total_orders": ["order_id"],
"total_spent": ["total_amount"],
"last_order_date": ["order_date"]
}
# Create column lineage facet
column_lineage_facet = create_column_lineage_facet(
input_fields=input_fields,
output_fields=output_fields,
transformations=transformations
)
# Execute the actual SQL transformation
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='postgres_default')
sql = """
INSERT INTO analytics.customer_summary
SELECT
c.customer_id,
CONCAT(c.first_name, ' ', c.last_name) as full_name,
COUNT(o.order_id) as total_orders,
COALESCE(SUM(o.total_amount), 0) as total_spent,
MAX(o.order_date) as last_order_date
FROM raw.customers c
LEFT JOIN raw.orders o ON c.customer_id = o.customer_id
WHERE o.order_date >= '{{ ds }}'
GROUP BY c.customer_id, c.first_name, c.last_name;
"""
hook.run(sql)
return {
'column_lineage': column_lineage_facet,
'input_datasets': ['raw.customers', 'raw.orders'],
'output_datasets': ['analytics.customer_summary']
}Configure data quality lineage
Set up lineage tracking for data quality checks and validation processes.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from openlineage.client.facets import DataQualityMetricsInputDatasetFacet, DataQualityAssertionsDatasetFacet
from openlineage.client.run import Dataset
def run_data_quality_checks(**context):
"""
Run data quality checks with lineage tracking
"""
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='postgres_default')
# Define quality checks
quality_checks = [
{
'name': 'null_customer_id_check',
'sql': 'SELECT COUNT(*) FROM analytics.customer_summary WHERE customer_id IS NULL',
'assertion': 'value = 0',
'description': 'Customer ID should never be null'
},
{
'name': 'negative_total_spent_check',
'sql': 'SELECT COUNT(*) FROM analytics.customer_summary WHERE total_spent < 0',
'assertion': 'value = 0',
'description': 'Total spent should not be negative'
},
{
'name': 'future_order_date_check',
'sql': 'SELECT COUNT(*) FROM analytics.customer_summary WHERE last_order_date > CURRENT_DATE',
'assertion': 'value = 0',
'description': 'Order dates should not be in the future'
}
]
quality_results = []
for check in quality_checks:
result = hook.get_first(check['sql'])
passed = eval(check['assertion'].replace('value', str(result[0])))
quality_results.append({
'check_name': check['name'],
'result_value': result[0],
'passed': passed,
'assertion': check['assertion'],
'description': check['description']
})
# Log quality check result
print(f"Quality check '{check['name']}': {'PASSED' if passed else 'FAILED'} (value: {result[0]})")
# Create data quality facets for lineage
quality_metrics = {
'rowCount': hook.get_first('SELECT COUNT(*) FROM analytics.customer_summary')[0],
'nullValues': {
'customer_id': hook.get_first('SELECT COUNT(*) FROM analytics.customer_summary WHERE customer_id IS NULL')[0],
'full_name': hook.get_first('SELECT COUNT(*) FROM analytics.customer_summary WHERE full_name IS NULL')[0]
},
'uniqueValues': {
'customer_id': hook.get_first('SELECT COUNT(DISTINCT customer_id) FROM analytics.customer_summary')[0]
},
'qualityScore': sum(1 for r in quality_results if r['passed']) / len(quality_results)
}
return {
'quality_results': quality_results,
'quality_metrics': quality_metrics,
'dataset': 'analytics.customer_summary'
}
default_args = {
'owner': 'data-quality-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'data_quality_lineage',
default_args=default_args,
description='Data quality checks with lineage tracking',
schedule_interval=timedelta(hours=6),
catchup=False,
tags=['data-quality', 'lineage']
)
quality_checks = PythonOperator(
task_id='run_quality_checks',
python_callable=run_data_quality_checks,
dag=dag
)
Store quality results with lineage
store_results = PostgresOperator(
task_id='store_quality_results',
postgres_conn_id='postgres_default',
sql="""
INSERT INTO data_quality.check_results (
table_name,
check_date,
total_checks,
passed_checks,
quality_score,
details
)
SELECT
'analytics.customer_summary' as table_name,
'{{ ds }}' as check_date,
{{ ti.xcom_pull(task_ids='run_quality_checks', key='return_value')["quality_results"]|length }} as total_checks,
{{ ti.xcom_pull(task_ids='run_quality_checks', key='return_value')["quality_results"]|selectattr('passed')|list|length }} as passed_checks,
{{ ti.xcom_pull(task_ids='run_quality_checks', key='return_value')["quality_metrics"]["qualityScore"] }} as quality_score,
'{{ ti.xcom_pull(task_ids='run_quality_checks', key='return_value')|tojson }}' as details;
""",
dag=dag
)
quality_checks >> store_resultsSet up lineage monitoring and alerts
Configure monitoring for lineage collection and create alerts for lineage failures.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
import requests
import json
def check_lineage_health(**context):
"""
Check the health of lineage collection and Marquez backend
"""
health_status = {
'marquez_api': False,
'lineage_events': False,
'recent_datasets': 0,
'recent_jobs': 0,
'issues': []
}
# Check Marquez API health
try:
response = requests.get('http://localhost:3000/api/v1/namespaces', timeout=10)
if response.status_code == 200:
health_status['marquez_api'] = True
else:
health_status['issues'].append(f"Marquez API returned status {response.status_code}")
except Exception as e:
health_status['issues'].append(f"Cannot connect to Marquez API: {str(e)}")
# Check recent lineage events
try:
# Query Marquez for recent datasets and jobs
response = requests.get('http://localhost:3000/api/v1/namespaces/production_airflow/datasets', timeout=10)
if response.status_code == 200:
datasets = response.json()['datasets']
health_status['recent_datasets'] = len([d for d in datasets if
datetime.fromisoformat(d.get('updatedAt', '2020-01-01T00:00:00')).date() >=
(datetime.now() - timedelta(days=1)).date()])
response = requests.get('http://localhost:3000/api/v1/namespaces/production_airflow/jobs', timeout=10)
if response.status_code == 200:
jobs = response.json()['jobs']
health_status['recent_jobs'] = len([j for j in jobs if
datetime.fromisoformat(j.get('updatedAt', '2020-01-01T00:00:00')).date() >=
(datetime.now() - timedelta(days=1)).date()])
if health_status['recent_datasets'] > 0 and health_status['recent_jobs'] > 0:
health_status['lineage_events'] = True
else:
health_status['issues'].append(f"Low lineage activity: {health_status['recent_datasets']} datasets, {health_status['recent_jobs']} jobs updated in last 24h")
except Exception as e:
health_status['issues'].append(f"Error checking lineage events: {str(e)}")
# Check for stale datasets (not updated in 48 hours)
try:
response = requests.get('http://localhost:3000/api/v1/namespaces/production_airflow/datasets', timeout=10)
if response.status_code == 200:
datasets = response.json()['datasets']
stale_datasets = [d['name'] for d in datasets if
datetime.fromisoformat(d.get('updatedAt', '2020-01-01T00:00:00')).date() <
(datetime.now() - timedelta(days=2)).date()]
if stale_datasets:
health_status['issues'].append(f"Stale datasets (>48h): {', '.join(stale_datasets[:5])}{'...' if len(stale_datasets) > 5 else ''}")
except Exception as e:
health_status['issues'].append(f"Error checking stale datasets: {str(e)}")
# Store health check results
hook = PostgresHook(postgres_conn_id='postgres_default')
hook.run("""
INSERT INTO monitoring.lineage_health_checks
(check_date, marquez_api_status, lineage_events_status, recent_datasets, recent_jobs, issues)
VALUES (%s, %s, %s, %s, %s, %s)
""", parameters=[
datetime.now().date(),
health_status['marquez_api'],
health_status['lineage_events'],
health_status['recent_datasets'],
health_status['recent_jobs'],
json.dumps(health_status['issues'])
])
# Determine if we need to send an alert
critical_issues = not health_status['marquez_api'] or len(health_status['issues']) > 2
return {
'health_status': health_status,
'needs_alert': critical_issues,
'summary': f"Marquez: {'OK' if health_status['marquez_api'] else 'DOWN'}, Events: {'OK' if health_status['lineage_events'] else 'LOW'}, Issues: {len(health_status['issues'])}"
}
def generate_lineage_report(**context):
"""
Generate a summary report of lineage collection
"""
try:
# Get namespace overview
response = requests.get('http://localhost:3000/api/v1/namespaces/production_airflow', timeout=10)
namespace_info = response.json() if response.status_code == 200 else {}
# Get dataset count
response = requests.get('http://localhost:3000/api/v1/namespaces/production_airflow/datasets', timeout=10)
datasets = response.json().get('datasets', []) if response.status_code == 200 else []
# Get job count
response = requests.get('http://localhost:3000/api/v1/namespaces/production_airflow/jobs', timeout=10)
jobs = response.json().get('jobs', []) if response.status_code == 200 else []
report = {
'namespace': 'production_airflow',
'total_datasets': len(datasets),
'total_jobs': len(jobs),
'recent_datasets': len([d for d in datasets if
datetime.fromisoformat(d.get('updatedAt', '2020-01-01T00:00:00')).date() >=
(datetime.now() - timedelta(days=7)).date()]),
'recent_jobs': len([j for j in jobs if
datetime.fromisoformat(j.get('updatedAt', '2020-01-01T00:00:00')).date() >=
(datetime.now() - timedelta(days=7)).date()]),
'top_datasets': sorted(datasets, key=lambda x: x.get('updatedAt', ''), reverse=True)[:10]
}
return report
except Exception as e:
return {'error': f"Failed to generate report: {str(e)}"}
default_args = {
'owner': 'data-ops',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'lineage_monitoring',
default_args=default_args,
description='Monitor lineage collection health and generate reports',
schedule_interval=timedelta(hours=4),
catchup=False,
tags=['monitoring', 'lineage']
)
health_check = PythonOperator(
task_id='check_lineage_health',
python_callable=check_lineage_health,
dag=dag
)
generate_report = PythonOperator(
task_id='generate_lineage_report',
python_callable=generate_lineage_report,
dag=dag
)
Conditional email alert
send_alert = EmailOperator(
task_id='send_alert_email',
to=['data-ops@example.com'],
subject='Lineage System Alert - {{ ds }}',
html_content="""
Lineage System Alert
Date: {{ ds }}
Status: {{ ti.xcom_pull(task_ids='check_lineage_health', key='return_value')["summary"] }}
Issues Detected:
{% for issue in ti.xcom_pull(task_ids='check_lineage_health', key='return_value')["health_status"]["issues"] %}
- {{ issue }}
{% endfor %}
Lineage Report:
- Total Datasets: {{ ti.xcom_pull(task_ids='generate_lineage_report', key='return_value')["total_datasets"] }}
- Total Jobs: {{ ti.xcom_pull(task_ids='generate_lineage_report', key='return_value')["total_jobs"] }}
- Recent Datasets (7d): {{ ti.xcom_pull(task_ids='generate_lineage_report', key='return_value')["recent_datasets"] }}
- Recent Jobs (7d): {{ ti.xcom_pull(task_ids='generate_lineage_report', key='return_value')["recent_jobs"] }}
Check the Marquez UI for detailed lineage visualization.
""",
dag=dag
)
Only send alert if there are critical issues
send_alert.set_upstream(health_check)
send_alert.set_upstream(generate_report)
Add condition to only run alert if needed
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
alert_condition = DummyOperator(
task_id='alert_condition',
dag=dag
)
health_check >> alert_condition
generate_report >> alert_condition
alert_condition >> send_alertVerify your setup
Test the lineage configuration by running a sample DAG and checking that events appear in Marquez.
# Check Airflow is running with OpenLineage
sudo systemctl status airflow-scheduler airflow-webserver
Verify OpenLineage configuration
cat /opt/airflow/openlineage.yml
Check Marquez is accessible
curl -s http://localhost:3000/api/v1/namespaces | jq .
Test lineage event submission
curl -X POST http://localhost:3000/api/v1/lineage \
-H "Content-Type: application/json" \
-d '{"eventType": "START", "eventTime": "2024-01-01T12:00:00.000Z", "run": {"runId": "test-run"}, "job": {"namespace": "test", "name": "test-job"}, "inputs": [], "outputs": [], "producer": "test"}'Access the Marquez web UI at http://localhost:3000 to view lineage graphs and explore dataset relationships. You should see your Airflow DAGs appear as jobs with input/output datasets when they run.
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| No lineage events in Marquez | OpenLineage config not loaded | Verify /opt/airflow/openlineage.yml path in airflow.cfg and restart services |
| Marquez UI shows connection error | Database not initialized | Run docker exec marquez ./entrypoint.sh db migrate |
| Custom extractors not working | Plugin path not configured | Ensure custom extractors are in /opt/airflow/plugins/ directory |
| Column lineage not appearing | SQL parsing failed | Review custom extractor regex patterns and test SQL parsing logic |
| Performance issues with lineage | Too many lineage events | Configure sampling in OpenLineage config or filter events by DAG tags |
| Authentication errors to Marquez | Missing API key | Add proper authentication to OpenLineage transport configuration |
Next steps
- Configure Airflow monitoring with Prometheus alerts to track lineage system health
- Set up Airflow DAG security policies and data governance with RBAC and audit logging
- Implement comprehensive Airflow DAG testing and validation strategies including lineage validation
- Configure Airflow data quality monitoring with Great Expectations for automated data validation
- Set up Marquez high availability clustering for production lineage backend