Configure Apache Spark 3.5 with Kafka integration and Delta Lake support for building production-grade real-time analytics pipelines with ACID transactions and streaming capabilities.
Prerequisites
- Root or sudo access
- Minimum 8GB RAM
- At least 4 CPU cores
- 20GB free disk space
- Java 11 or higher
What this solves
Real-time analytics requires processing streaming data as it arrives while maintaining data consistency and reliability. Apache Spark Streaming with Kafka and Delta Lake provides a complete solution for ingesting, processing, and storing streaming data with ACID transactions, schema evolution, and time travel capabilities.
This tutorial sets up a production-ready streaming analytics pipeline that can handle high-throughput data ingestion, real-time processing, and reliable storage with automatic checkpoint recovery and exactly-once processing semantics.
Step-by-step installation
Update system packages
Start by updating your package manager and installing essential build tools for compiling Spark dependencies.
sudo apt update && sudo apt upgrade -y
sudo apt install -y wget curl gnupg2 software-properties-common ca-certificates lsb-release
Install Java Development Kit
Apache Spark requires Java 8 or higher. Install OpenJDK 11 for optimal compatibility with Spark 3.5.
sudo apt install -y openjdk-11-jdk
echo 'export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64' | sudo tee -a /etc/environment
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
Install Apache Spark 3.5
Download and install Apache Spark 3.5 with pre-built Hadoop binaries for distributed file system support.
cd /opt
sudo wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
sudo tar -xzf spark-3.5.0-bin-hadoop3.tgz
sudo mv spark-3.5.0-bin-hadoop3 spark
sudo chown -R $(whoami):$(whoami) /opt/spark
echo 'export SPARK_HOME=/opt/spark' | sudo tee -a /etc/environment
echo 'export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin' | sudo tee -a /etc/environment
source /etc/environment
Download Delta Lake and Kafka dependencies
Download the required JAR files for Delta Lake integration and Kafka streaming support.
cd /opt/spark/jars
sudo wget https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.4.0/delta-core_2.12-2.4.0.jar
sudo wget https://repo1.maven.org/maven2/io/delta/delta-storage/2.4.0/delta-storage-2.4.0.jar
sudo wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.0/spark-sql-kafka-0-10_2.12-3.5.0.jar
sudo wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.5.0/kafka-clients-3.5.0.jar
sudo wget https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.5.0/spark-token-provider-kafka-0-10_2.12-3.5.0.jar
Configure Spark for Delta Lake
Create Spark configuration to enable Delta Lake extensions and optimize streaming performance.
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.streaming.checkpointLocation=/opt/spark/checkpoints
spark.sql.streaming.schemaInference=true
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.streaming.forceDeleteTempCheckpointLocation=true
spark.databricks.delta.retentionDurationCheck.enabled=false
Install and configure Apache Kafka
Set up Kafka for streaming data ingestion with proper configuration for Spark integration.
cd /opt
sudo wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz
sudo tar -xzf kafka_2.13-2.8.2.tgz
sudo mv kafka_2.13-2.8.2 kafka
sudo chown -R $(whoami):$(whoami) /opt/kafka
echo 'export KAFKA_HOME=/opt/kafka' | sudo tee -a /etc/environment
source /etc/environment
Configure Kafka server properties
Optimize Kafka configuration for streaming workloads with proper retention and performance settings.
broker.id=0
listeners=PLAINTEXT://localhost:9092
log.dirs=/opt/kafka/kafka-logs
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
Create checkpoint and data directories
Set up directories for Spark streaming checkpoints and Delta Lake storage with proper permissions.
sudo mkdir -p /opt/spark/checkpoints
sudo mkdir -p /opt/spark/delta-lake
sudo mkdir -p /opt/kafka/kafka-logs
sudo chown -R $(whoami):$(whoami) /opt/spark/checkpoints /opt/spark/delta-lake
sudo chown -R $(whoami):$(whoami) /opt/kafka/kafka-logs
chmod 755 /opt/spark/checkpoints /opt/spark/delta-lake /opt/kafka/kafka-logs
Start Zookeeper and Kafka services
Launch the required services for Kafka message streaming.
# Start Zookeeper in background
/opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
Wait for Zookeeper to start
sleep 5
Start Kafka server in background
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
Verify services are running
jps
Create Kafka topic for streaming
Set up a Kafka topic with appropriate partitioning for parallel processing.
/opt/kafka/bin/kafka-topics.sh --create \
--topic analytics-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
Verify topic creation
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Create streaming analytics application
Build a Python application that demonstrates real-time data processing with Spark Streaming, Kafka, and Delta Lake.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import *
Create Spark session with Delta Lake configuration
builder = SparkSession.builder.appName("KafkaDeltaStreaming") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("WARN")
Define schema for incoming JSON data
schema = StructType([
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("value", DoubleType(), True)
])
Read streaming data from Kafka
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "analytics-events") \
.option("startingOffsets", "latest") \
.load()
Parse JSON messages and apply transformations
parsed_df = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
Add processing timestamp and calculate hourly aggregations
processed_df = parsed_df \
.withColumn("processing_time", current_timestamp()) \
.withColumn("hour", date_trunc("hour", col("timestamp"))) \
.groupBy("hour", "event_type") \
.agg(
count("*").alias("event_count"),
avg("value").alias("avg_value"),
max("value").alias("max_value"),
min("value").alias("min_value")
)
Write to Delta Lake with checkpointing
query = processed_df \
.writeStream \
.format("delta") \
.outputMode("complete") \
.option("checkpointLocation", "/opt/spark/checkpoints/analytics-checkpoint") \
.start("/opt/spark/delta-lake/analytics_aggregations")
print("Streaming job started. Press Ctrl+C to stop.")
query.awaitTermination()
Install Python dependencies
Install required Python packages for Delta Lake and PySpark integration.
sudo apt install -y python3-pip python3-venv
python3 -m venv /opt/spark/venv
source /opt/spark/venv/bin/activate
pip install pyspark==3.5.0 delta-spark==2.4.0 kafka-python
Create data producer script
Build a test data producer to generate sample events for the streaming pipeline.
import json
import time
import random
from datetime import datetime
from kafka import KafkaProducer
Create Kafka producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def generate_event():
"""Generate sample analytics event"""
user_ids = [f"user_{i}" for i in range(1, 101)]
event_types = ["click", "view", "purchase", "signup"]
return {
"user_id": random.choice(user_ids),
"event_type": random.choice(event_types),
"timestamp": datetime.now().isoformat(),
"value": round(random.uniform(1.0, 100.0), 2)
}
if __name__ == "__main__":
print("Starting data producer. Press Ctrl+C to stop.")
try:
while True:
event = generate_event()
producer.send('analytics-events', event)
print(f"Sent: {event}")
time.sleep(1) # Send one event per second
except KeyboardInterrupt:
print("Stopping producer...")
finally:
producer.close()
Create systemd service for monitoring
Set up monitoring scripts to track the health of your streaming pipeline.
#!/usr/bin/env python3
import subprocess
import time
import json
from pathlib import Path
def check_kafka_status():
"""Check if Kafka is running"""
try:
result = subprocess.run(['jps'], capture_output=True, text=True)
return 'Kafka' in result.stdout
except:
return False
def check_delta_table():
"""Check Delta Lake table status"""
delta_path = Path('/opt/spark/delta-lake/analytics_aggregations')
return delta_path.exists() and any(delta_path.iterdir())
def get_checkpoint_status():
"""Get streaming checkpoint information"""
checkpoint_path = Path('/opt/spark/checkpoints/analytics-checkpoint')
if checkpoint_path.exists():
return {'exists': True, 'files': len(list(checkpoint_path.rglob('*')))}
return {'exists': False, 'files': 0}
if __name__ == "__main__":
status = {
'timestamp': time.time(),
'kafka_running': check_kafka_status(),
'delta_table_exists': check_delta_table(),
'checkpoint_status': get_checkpoint_status()
}
print(json.dumps(status, indent=2))
chmod +x /opt/spark/monitor_pipeline.py
Configure production monitoring
Set up Spark metrics configuration
Configure Spark to expose metrics for monitoring and observability.
*.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheus.path=/metrics
*.source.jvm.class=org.apache.spark.metrics.source.JvmSource
*.source.streaming.class=org.apache.spark.metrics.source.StreamingSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
Create performance tuning script
Build a script to optimize Spark configuration based on workload characteristics.
#!/usr/bin/env python3
import psutil
import math
def calculate_spark_config():
"""Calculate optimal Spark configuration based on system resources"""
# Get system information
total_memory = psutil.virtual_memory().total
cpu_cores = psutil.cpu_count()
# Calculate optimal settings
memory_gb = total_memory // (1024**3)
executor_memory = max(1, math.floor(memory_gb * 0.7)) # 70% of total memory
executor_cores = min(5, cpu_cores - 1) # Leave one core for OS
config = {
'spark.executor.memory': f'{executor_memory}g',
'spark.executor.cores': str(executor_cores),
'spark.sql.shuffle.partitions': str(cpu_cores * 2),
'spark.default.parallelism': str(cpu_cores * 2),
'spark.sql.streaming.maxBatchesToRetainInMemory': '5',
'spark.sql.streaming.minBatchesToRetain': '10'
}
return config
if __name__ == "__main__":
config = calculate_spark_config()
print("# Recommended Spark configuration:")
for key, value in config.items():
print(f"{key}={value}")
chmod +x /opt/spark/tune_performance.py
python3 /opt/spark/tune_performance.py
Verify your setup
# Check Java installation
java -version
Verify Spark installation
/opt/spark/bin/spark-submit --version
Check Kafka services
jps
Test Delta Lake integration
source /opt/spark/venv/bin/activate
python3 -c "from delta import configure_spark_with_delta_pip; print('Delta Lake integration OK')"
Monitor pipeline status
python3 /opt/spark/monitor_pipeline.py
Check checkpoint directory
ls -la /opt/spark/checkpoints/
Test the streaming pipeline
# Terminal 1: Start the streaming application
source /opt/spark/venv/bin/activate
cd /opt/spark
python3 streaming_analytics.py
Terminal 2: Start the data producer
source /opt/spark/venv/bin/activate
cd /opt/spark
python3 data_producer.py
Terminal 3: Monitor the Delta Lake output
watch -n 5 'ls -la /opt/spark/delta-lake/analytics_aggregations/'
Query Delta Lake data directly
source /opt/spark/venv/bin/activate
pyspark --packages io.delta:delta-core_2.12:2.4.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
In PySpark shell:
df = spark.read.format("delta").load("/opt/spark/delta-lake/analytics_aggregations")
df.show()
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Kafka connection timeout | Kafka not started or wrong port | jps to check processes, restart with /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties |
| Delta Lake JAR not found | Missing Delta dependencies | Verify JARs in /opt/spark/jars/ and restart Spark application |
| Checkpoint permission denied | Wrong directory ownership | sudo chown -R $(whoami):$(whoami) /opt/spark/checkpoints |
| Out of memory errors | Insufficient executor memory | Increase spark.executor.memory in spark-defaults.conf or run tuning script |
| Streaming job fails silently | Schema mismatch or topic not found | Check Kafka topic exists and verify JSON schema in producer |
| High checkpoint lag | Too many shuffle partitions | Reduce spark.sql.shuffle.partitions for small datasets |
Performance optimization tips
- Use appropriate batch intervals (1-10 seconds) based on latency requirements
- Configure
maxOffsetsPerTriggerto control processing rate - Enable adaptive query execution for dynamic optimization
- Use Delta Lake OPTIMIZE command for compaction
- Monitor Spark UI at http://localhost:4040 during streaming
- Consider Spark SQL performance optimization for complex analytics queries
Next steps
- Set up Kafka Schema Registry with Avro for schema evolution
- Configure Prometheus monitoring for Spark streaming metrics
- Implement backpressure and rate limiting for production workloads
- Set up Delta Lake table optimization and maintenance procedures
- Configure exactly-once processing semantics for critical data pipelines
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Spark Streaming with Kafka and Delta Lake Installation Script
# Production-ready setup for real-time analytics pipeline
# Colors for output
readonly RED='\033[0;31m'
readonly GREEN='\033[0;32m'
readonly YELLOW='\033[1;33m'
readonly BLUE='\033[0;34m'
readonly NC='\033[0m' # No Color
# Configuration
readonly SPARK_VERSION="3.5.0"
readonly DELTA_VERSION="2.4.0"
readonly KAFKA_VERSION="2.8.2"
readonly INSTALL_DIR="/opt"
readonly SPARK_USER="spark"
# Function to print colored output
print_status() {
echo -e "${BLUE}[INFO]${NC} $1"
}
print_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Cleanup function for rollback
cleanup() {
print_error "Installation failed. Cleaning up..."
sudo rm -rf /opt/spark /opt/kafka 2>/dev/null || true
sudo userdel -r "$SPARK_USER" 2>/dev/null || true
exit 1
}
trap cleanup ERR
# Usage message
usage() {
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " --kafka-ip IP Kafka server IP (default: localhost)"
echo " --help Show this help message"
exit 1
}
# Parse arguments
KAFKA_IP="localhost"
while [[ $# -gt 0 ]]; do
case $1 in
--kafka-ip)
KAFKA_IP="$2"
shift 2
;;
--help)
usage
;;
*)
print_error "Unknown option: $1"
usage
;;
esac
done
# Check prerequisites
if [[ $EUID -eq 0 ]]; then
print_error "This script should not be run as root. Use a user with sudo privileges."
exit 1
fi
if ! command -v sudo >/dev/null; then
print_error "sudo is required but not installed."
exit 1
fi
# Auto-detect distribution
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update -y"
PKG_INSTALL="apt install -y"
JAVA_HOME_PATH="/usr/lib/jvm/java-11-openjdk-amd64"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
JAVA_HOME_PATH="/usr/lib/jvm/java-11-openjdk"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
JAVA_HOME_PATH="/usr/lib/jvm/java-11-openjdk"
;;
*)
print_error "Unsupported distribution: $ID"
exit 1
;;
esac
else
print_error "Cannot detect distribution. /etc/os-release not found."
exit 1
fi
print_status "Detected distribution: $PRETTY_NAME"
print_status "Using package manager: $PKG_MGR"
# Step 1: Update system packages
print_status "[1/8] Updating system packages..."
sudo $PKG_UPDATE
if [[ "$PKG_MGR" == "apt" ]]; then
sudo $PKG_INSTALL wget curl gnupg2 software-properties-common ca-certificates lsb-release openjdk-11-jdk
else
sudo $PKG_INSTALL wget curl gnupg2 ca-certificates which java-11-openjdk-devel
fi
print_success "System packages updated"
# Step 2: Configure Java environment
print_status "[2/8] Configuring Java environment..."
if ! java -version &>/dev/null; then
print_error "Java installation failed"
exit 1
fi
echo "export JAVA_HOME=$JAVA_HOME_PATH" | sudo tee -a /etc/environment >/dev/null
export JAVA_HOME="$JAVA_HOME_PATH"
print_success "Java configured: $(java -version 2>&1 | head -n1)"
# Step 3: Create spark user
print_status "[3/8] Creating spark user..."
if ! id "$SPARK_USER" &>/dev/null; then
sudo useradd -r -m -s /bin/bash "$SPARK_USER"
fi
print_success "Spark user created"
# Step 4: Install Apache Spark
print_status "[4/8] Installing Apache Spark $SPARK_VERSION..."
cd /tmp
wget -q "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz"
sudo tar -xzf "spark-${SPARK_VERSION}-bin-hadoop3.tgz" -C "$INSTALL_DIR"
sudo mv "${INSTALL_DIR}/spark-${SPARK_VERSION}-bin-hadoop3" "${INSTALL_DIR}/spark"
sudo chown -R ${SPARK_USER}:${SPARK_USER} "${INSTALL_DIR}/spark"
sudo chmod -R 755 "${INSTALL_DIR}/spark"
# Configure Spark environment
echo "export SPARK_HOME=${INSTALL_DIR}/spark" | sudo tee -a /etc/environment >/dev/null
echo "export PATH=\$PATH:\$SPARK_HOME/bin:\$SPARK_HOME/sbin" | sudo tee -a /etc/environment >/dev/null
export SPARK_HOME="${INSTALL_DIR}/spark"
export PATH="$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin"
print_success "Apache Spark installed"
# Step 5: Download Delta Lake and Kafka dependencies
print_status "[5/8] Downloading Delta Lake and Kafka dependencies..."
cd "${INSTALL_DIR}/spark/jars"
sudo -u "$SPARK_USER" wget -q "https://repo1.maven.org/maven2/io/delta/delta-core_2.12/${DELTA_VERSION}/delta-core_2.12-${DELTA_VERSION}.jar"
sudo -u "$SPARK_USER" wget -q "https://repo1.maven.org/maven2/io/delta/delta-storage/${DELTA_VERSION}/delta-storage-${DELTA_VERSION}.jar"
sudo -u "$SPARK_USER" wget -q "https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/${SPARK_VERSION}/spark-sql-kafka-0-10_2.12-${SPARK_VERSION}.jar"
sudo -u "$SPARK_USER" wget -q "https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.5.0/kafka-clients-3.5.0.jar"
sudo -u "$SPARK_USER" wget -q "https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/${SPARK_VERSION}/spark-token-provider-kafka-0-10_2.12-${SPARK_VERSION}.jar"
print_success "Dependencies downloaded"
# Step 6: Configure Spark for Delta Lake
print_status "[6/8] Configuring Spark for Delta Lake..."
sudo mkdir -p "${INSTALL_DIR}/spark/conf" "${INSTALL_DIR}/spark/checkpoints"
sudo chown -R ${SPARK_USER}:${SPARK_USER} "${INSTALL_DIR}/spark/checkpoints"
sudo chmod 755 "${INSTALL_DIR}/spark/checkpoints"
sudo tee "${INSTALL_DIR}/spark/conf/spark-defaults.conf" >/dev/null <<EOF
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.streaming.checkpointLocation=${INSTALL_DIR}/spark/checkpoints
spark.sql.streaming.schemaInference=true
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.streaming.forceDeleteTempCheckpointLocation=true
spark.databricks.delta.retentionDurationCheck.enabled=false
EOF
sudo chown ${SPARK_USER}:${SPARK_USER} "${INSTALL_DIR}/spark/conf/spark-defaults.conf"
sudo chmod 644 "${INSTALL_DIR}/spark/conf/spark-defaults.conf"
print_success "Spark configured for Delta Lake"
# Step 7: Install and configure Apache Kafka
print_status "[7/8] Installing Apache Kafka $KAFKA_VERSION..."
cd /tmp
wget -q "https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_2.13-${KAFKA_VERSION}.tgz"
sudo tar -xzf "kafka_2.13-${KAFKA_VERSION}.tgz" -C "$INSTALL_DIR"
sudo mv "${INSTALL_DIR}/kafka_2.13-${KAFKA_VERSION}" "${INSTALL_DIR}/kafka"
sudo chown -R ${SPARK_USER}:${SPARK_USER} "${INSTALL_DIR}/kafka"
sudo chmod -R 755 "${INSTALL_DIR}/kafka"
# Configure Kafka
sudo mkdir -p /var/kafka-logs
sudo chown ${SPARK_USER}:${SPARK_USER} /var/kafka-logs
sudo chmod 755 /var/kafka-logs
sudo tee "${INSTALL_DIR}/kafka/config/server.properties" >/dev/null <<EOF
broker.id=0
listeners=PLAINTEXT://${KAFKA_IP}:9092
advertised.listeners=PLAINTEXT://${KAFKA_IP}:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
sudo chown ${SPARK_USER}:${SPARK_USER} "${INSTALL_DIR}/kafka/config/server.properties"
echo "export KAFKA_HOME=${INSTALL_DIR}/kafka" | sudo tee -a /etc/environment >/dev/null
print_success "Apache Kafka installed and configured"
# Step 8: Create systemd services
print_status "[8/8] Creating systemd services..."
# Zookeeper service
sudo tee /etc/systemd/system/zookeeper.service >/dev/null <<EOF
[Unit]
Description=Apache Zookeeper
After=network.target
[Service]
Type=forking
User=${SPARK_USER}
ExecStart=${INSTALL_DIR}/kafka/bin/zookeeper-server-start.sh -daemon ${INSTALL_DIR}/kafka/config/zookeeper.properties
ExecStop=${INSTALL_DIR}/kafka/bin/zookeeper-server-stop.sh
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
# Kafka service
sudo tee /etc/systemd/system/kafka.service >/dev/null <<EOF
[Unit]
Description=Apache Kafka
After=network.target zookeeper.service
Requires=zookeeper.service
[Service]
Type=forking
User=${SPARK_USER}
Environment="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G"
ExecStart=${INSTALL_DIR}/kafka/bin/kafka-server-start.sh -daemon ${INSTALL_DIR}/kafka/config/server.properties
ExecStop=${INSTALL_DIR}/kafka/bin/kafka-server-stop.sh
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl daemon-reload
sudo systemctl enable zookeeper kafka
print_success "Systemd services created"
# Verification
print_status "Verifying installation..."
# Check Java
if ! command -v java >/dev/null; then
print_error "Java verification failed"
exit 1
fi
# Check Spark
if [[ ! -f "${INSTALL_DIR}/spark/bin/spark-submit" ]]; then
print_error "Spark installation verification failed"
exit 1
fi
# Check Kafka
if [[ ! -f "${INSTALL_DIR}/kafka/bin/kafka-server-start.sh" ]]; then
print_error "Kafka installation verification failed"
exit 1
fi
print_success "All components verified successfully!"
print_status "Installation Summary:"
echo " - Apache Spark $SPARK_VERSION installed in ${INSTALL_DIR}/spark"
echo " - Apache Kafka $KAFKA_VERSION installed in ${INSTALL_DIR}/kafka"
echo " - Delta Lake $DELTA_VERSION dependencies added"
echo " - Services: zookeeper.service, kafka.service"
echo " - User: $SPARK_USER"
echo ""
print_status "To start services:"
echo " sudo systemctl start zookeeper"
echo " sudo systemctl start kafka"
echo ""
print_status "To use Spark with Delta Lake:"
echo " sudo -u $SPARK_USER ${INSTALL_DIR}/spark/bin/spark-shell"
Review the script before running. Execute with: bash install.sh