Set up Spark Streaming with Kafka and Delta Lake for real-time analytics

Advanced 45 min Jun 04, 2026 156 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Configure Apache Spark 3.5 with Kafka integration and Delta Lake support for building production-grade real-time analytics pipelines with ACID transactions and streaming capabilities.

Prerequisites

  • Root or sudo access
  • Minimum 8GB RAM
  • At least 4 CPU cores
  • 20GB free disk space
  • Java 11 or higher

What this solves

Real-time analytics requires processing streaming data as it arrives while maintaining data consistency and reliability. Apache Spark Streaming with Kafka and Delta Lake provides a complete solution for ingesting, processing, and storing streaming data with ACID transactions, schema evolution, and time travel capabilities.

This tutorial sets up a production-ready streaming analytics pipeline that can handle high-throughput data ingestion, real-time processing, and reliable storage with automatic checkpoint recovery and exactly-once processing semantics.

Step-by-step installation

Update system packages

Start by updating your package manager and installing essential build tools for compiling Spark dependencies.

sudo apt update && sudo apt upgrade -y
sudo apt install -y wget curl gnupg2 software-properties-common ca-certificates lsb-release
sudo dnf update -y
sudo dnf install -y wget curl gnupg2 ca-certificates which

Install Java Development Kit

Apache Spark requires Java 8 or higher. Install OpenJDK 11 for optimal compatibility with Spark 3.5.

sudo apt install -y openjdk-11-jdk
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64' | sudo tee -a /etc/environment
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
sudo dnf install -y java-11-openjdk-devel
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk' | sudo tee -a /etc/environment
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk

Install Apache Spark 3.5

Download and install Apache Spark 3.5 with pre-built Hadoop binaries for distributed file system support.

cd /opt
sudo wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
sudo tar -xzf spark-3.5.0-bin-hadoop3.tgz
sudo mv spark-3.5.0-bin-hadoop3 spark
sudo chown -R $(whoami):$(whoami) /opt/spark
echo 'export SPARK_HOME=/opt/spark' | sudo tee -a /etc/environment
echo 'export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin' | sudo tee -a /etc/environment
source /etc/environment

Download Delta Lake and Kafka dependencies

Download the required JAR files for Delta Lake integration and Kafka streaming support.

cd /opt/spark/jars
sudo wget https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.4.0/delta-core_2.12-2.4.0.jar
sudo wget https://repo1.maven.org/maven2/io/delta/delta-storage/2.4.0/delta-storage-2.4.0.jar
sudo wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.0/spark-sql-kafka-0-10_2.12-3.5.0.jar
sudo wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.5.0/kafka-clients-3.5.0.jar
sudo wget https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.5.0/spark-token-provider-kafka-0-10_2.12-3.5.0.jar

Configure Spark for Delta Lake

Create Spark configuration to enable Delta Lake extensions and optimize streaming performance.

spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.streaming.checkpointLocation=/opt/spark/checkpoints
spark.sql.streaming.schemaInference=true
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.streaming.forceDeleteTempCheckpointLocation=true
spark.databricks.delta.retentionDurationCheck.enabled=false

Install and configure Apache Kafka

Set up Kafka for streaming data ingestion with proper configuration for Spark integration.

cd /opt
sudo wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz
sudo tar -xzf kafka_2.13-2.8.2.tgz
sudo mv kafka_2.13-2.8.2 kafka
sudo chown -R $(whoami):$(whoami) /opt/kafka
echo 'export KAFKA_HOME=/opt/kafka' | sudo tee -a /etc/environment
source /etc/environment

Configure Kafka server properties

Optimize Kafka configuration for streaming workloads with proper retention and performance settings.

broker.id=0
listeners=PLAINTEXT://localhost:9092
log.dirs=/opt/kafka/kafka-logs
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

Create checkpoint and data directories

Set up directories for Spark streaming checkpoints and Delta Lake storage with proper permissions.

sudo mkdir -p /opt/spark/checkpoints
sudo mkdir -p /opt/spark/delta-lake
sudo mkdir -p /opt/kafka/kafka-logs
sudo chown -R $(whoami):$(whoami) /opt/spark/checkpoints /opt/spark/delta-lake
sudo chown -R $(whoami):$(whoami) /opt/kafka/kafka-logs
chmod 755 /opt/spark/checkpoints /opt/spark/delta-lake /opt/kafka/kafka-logs
Never use chmod 777. It gives every user on the system full access to your files. The 755 permissions provide read and execute access for all users while restricting write access to the owner.

Start Zookeeper and Kafka services

Launch the required services for Kafka message streaming.

# Start Zookeeper in background
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

Wait for Zookeeper to start

sleep 5

Start Kafka server in background

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

Verify services are running

jps

Create Kafka topic for streaming

Set up a Kafka topic with appropriate partitioning for parallel processing.

/opt/kafka/bin/kafka-topics.sh --create \
  --topic analytics-events \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 1

Verify topic creation

/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Create streaming analytics application

Build a Python application that demonstrates real-time data processing with Spark Streaming, Kafka, and Delta Lake.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import *

Create Spark session with Delta Lake configuration

builder = SparkSession.builder.appName("KafkaDeltaStreaming") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") spark = configure_spark_with_delta_pip(builder).getOrCreate() spark.sparkContext.setLogLevel("WARN")

Define schema for incoming JSON data

schema = StructType([ StructField("user_id", StringType(), True), StructField("event_type", StringType(), True), StructField("timestamp", TimestampType(), True), StructField("value", DoubleType(), True) ])

Read streaming data from Kafka

df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "analytics-events") \ .option("startingOffsets", "latest") \ .load()

Parse JSON messages and apply transformations

parsed_df = df.select( from_json(col("value").cast("string"), schema).alias("data") ).select("data.*")

Add processing timestamp and calculate hourly aggregations

processed_df = parsed_df \ .withColumn("processing_time", current_timestamp()) \ .withColumn("hour", date_trunc("hour", col("timestamp"))) \ .groupBy("hour", "event_type") \ .agg( count("*").alias("event_count"), avg("value").alias("avg_value"), max("value").alias("max_value"), min("value").alias("min_value") )

Write to Delta Lake with checkpointing

query = processed_df \ .writeStream \ .format("delta") \ .outputMode("complete") \ .option("checkpointLocation", "/opt/spark/checkpoints/analytics-checkpoint") \ .start("/opt/spark/delta-lake/analytics_aggregations") print("Streaming job started. Press Ctrl+C to stop.") query.awaitTermination()

Install Python dependencies

Install required Python packages for Delta Lake and PySpark integration.

sudo apt install -y python3-pip python3-venv
python3 -m venv /opt/spark/venv
source /opt/spark/venv/bin/activate
pip install pyspark==3.5.0 delta-spark==2.4.0 kafka-python
sudo dnf install -y python3-pip python3-venv
python3 -m venv /opt/spark/venv
source /opt/spark/venv/bin/activate
pip install pyspark==3.5.0 delta-spark==2.4.0 kafka-python

Create data producer script

Build a test data producer to generate sample events for the streaming pipeline.

import json
import time
import random
from datetime import datetime
from kafka import KafkaProducer

Create Kafka producer

producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) def generate_event(): """Generate sample analytics event""" user_ids = [f"user_{i}" for i in range(1, 101)] event_types = ["click", "view", "purchase", "signup"] return { "user_id": random.choice(user_ids), "event_type": random.choice(event_types), "timestamp": datetime.now().isoformat(), "value": round(random.uniform(1.0, 100.0), 2) } if __name__ == "__main__": print("Starting data producer. Press Ctrl+C to stop.") try: while True: event = generate_event() producer.send('analytics-events', event) print(f"Sent: {event}") time.sleep(1) # Send one event per second except KeyboardInterrupt: print("Stopping producer...") finally: producer.close()

Create systemd service for monitoring

Set up monitoring scripts to track the health of your streaming pipeline.

#!/usr/bin/env python3
import subprocess
import time
import json
from pathlib import Path

def check_kafka_status():
    """Check if Kafka is running"""
    try:
        result = subprocess.run(['jps'], capture_output=True, text=True)
        return 'Kafka' in result.stdout
    except:
        return False

def check_delta_table():
    """Check Delta Lake table status"""
    delta_path = Path('/opt/spark/delta-lake/analytics_aggregations')
    return delta_path.exists() and any(delta_path.iterdir())

def get_checkpoint_status():
    """Get streaming checkpoint information"""
    checkpoint_path = Path('/opt/spark/checkpoints/analytics-checkpoint')
    if checkpoint_path.exists():
        return {'exists': True, 'files': len(list(checkpoint_path.rglob('*')))}
    return {'exists': False, 'files': 0}

if __name__ == "__main__":
    status = {
        'timestamp': time.time(),
        'kafka_running': check_kafka_status(),
        'delta_table_exists': check_delta_table(),
        'checkpoint_status': get_checkpoint_status()
    }
    
    print(json.dumps(status, indent=2))
chmod +x /opt/spark/monitor_pipeline.py

Configure production monitoring

Set up Spark metrics configuration

Configure Spark to expose metrics for monitoring and observability.

*.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheus.path=/metrics
*.source.jvm.class=org.apache.spark.metrics.source.JvmSource
*.source.streaming.class=org.apache.spark.metrics.source.StreamingSource

driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

Create performance tuning script

Build a script to optimize Spark configuration based on workload characteristics.

#!/usr/bin/env python3
import psutil
import math

def calculate_spark_config():
    """Calculate optimal Spark configuration based on system resources"""
    # Get system information
    total_memory = psutil.virtual_memory().total
    cpu_cores = psutil.cpu_count()
    
    # Calculate optimal settings
    memory_gb = total_memory // (1024**3)
    executor_memory = max(1, math.floor(memory_gb * 0.7))  # 70% of total memory
    executor_cores = min(5, cpu_cores - 1)  # Leave one core for OS
    
    config = {
        'spark.executor.memory': f'{executor_memory}g',
        'spark.executor.cores': str(executor_cores),
        'spark.sql.shuffle.partitions': str(cpu_cores * 2),
        'spark.default.parallelism': str(cpu_cores * 2),
        'spark.sql.streaming.maxBatchesToRetainInMemory': '5',
        'spark.sql.streaming.minBatchesToRetain': '10'
    }
    
    return config

if __name__ == "__main__":
    config = calculate_spark_config()
    print("# Recommended Spark configuration:")
    for key, value in config.items():
        print(f"{key}={value}")
chmod +x /opt/spark/tune_performance.py
python3 /opt/spark/tune_performance.py

Verify your setup

# Check Java installation
java -version

Verify Spark installation

/opt/spark/bin/spark-submit --version

Check Kafka services

jps

Test Delta Lake integration

source /opt/spark/venv/bin/activate python3 -c "from delta import configure_spark_with_delta_pip; print('Delta Lake integration OK')"

Monitor pipeline status

python3 /opt/spark/monitor_pipeline.py

Check checkpoint directory

ls -la /opt/spark/checkpoints/
Note: The verification commands should show Java 11, Spark 3.5.0, running Kafka and Zookeeper processes, and successful Delta Lake import. If you're planning to integrate this with monitoring solutions, check out our advanced Grafana dashboards and alerting tutorial.

Test the streaming pipeline

# Terminal 1: Start the streaming application
source /opt/spark/venv/bin/activate
cd /opt/spark
python3 streaming_analytics.py

Terminal 2: Start the data producer

source /opt/spark/venv/bin/activate cd /opt/spark python3 data_producer.py

Terminal 3: Monitor the Delta Lake output

watch -n 5 'ls -la /opt/spark/delta-lake/analytics_aggregations/'

Query Delta Lake data directly

source /opt/spark/venv/bin/activate pyspark --packages io.delta:delta-core_2.12:2.4.0 \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

In PySpark shell:

df = spark.read.format("delta").load("/opt/spark/delta-lake/analytics_aggregations")

df.show()

Common issues

SymptomCauseFix
Kafka connection timeoutKafka not started or wrong portjps to check processes, restart with /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Delta Lake JAR not foundMissing Delta dependenciesVerify JARs in /opt/spark/jars/ and restart Spark application
Checkpoint permission deniedWrong directory ownershipsudo chown -R $(whoami):$(whoami) /opt/spark/checkpoints
Out of memory errorsInsufficient executor memoryIncrease spark.executor.memory in spark-defaults.conf or run tuning script
Streaming job fails silentlySchema mismatch or topic not foundCheck Kafka topic exists and verify JSON schema in producer
High checkpoint lagToo many shuffle partitionsReduce spark.sql.shuffle.partitions for small datasets

Performance optimization tips

  • Use appropriate batch intervals (1-10 seconds) based on latency requirements
  • Configure maxOffsetsPerTrigger to control processing rate
  • Enable adaptive query execution for dynamic optimization
  • Use Delta Lake OPTIMIZE command for compaction
  • Monitor Spark UI at http://localhost:4040 during streaming
  • Consider Spark SQL performance optimization for complex analytics queries

Next steps

Running this in production?

Want this handled for you? Running this at scale adds a second layer of work: capacity planning, failover drills, cost control, and on-call. Our managed platform covers monitoring, backups and 24/7 response by default.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.