Optimize Apache Spark 3.5 SQL performance using Catalyst optimizer with advanced query tuning, adaptive query execution, and production-grade configuration for high-throughput analytics workloads.
Prerequisites
- 8GB RAM minimum
- 4 CPU cores
- 20GB free disk space
- Java 11 or later
What this solves
Spark SQL performance optimization is critical for production analytics workloads processing large datasets. The Catalyst optimizer in Apache Spark 3.5 provides rule-based optimization, cost-based optimization (CBO), and adaptive query execution (AQE) to automatically improve query performance. This tutorial implements advanced Catalyst tuning techniques, memory optimization, and benchmarking strategies for enterprise-scale SQL analytics.
Step-by-step installation
Install Java and system dependencies
Apache Spark requires Java 8 or later and specific system libraries for optimal performance.
sudo apt update
sudo apt install -y openjdk-11-jdk wget curl python3 python3-pip
sudo apt install -y build-essential libssl-dev zlib1g-dev
Download and install Apache Spark 3.5
Download Spark 3.5 with Hadoop support for distributed file system access and cluster deployment.
cd /opt
sudo wget https://downloads.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
sudo tar -xzf spark-3.5.1-bin-hadoop3.tgz
sudo mv spark-3.5.1-bin-hadoop3 spark
sudo chown -R $USER:$USER /opt/spark
sudo ln -s /opt/spark/bin/spark-shell /usr/local/bin/
sudo ln -s /opt/spark/bin/spark-sql /usr/local/bin/
sudo ln -s /opt/spark/bin/pyspark /usr/local/bin/
Configure Spark environment variables
Set environment variables for Java heap, Spark home, and Python path for consistent performance across sessions.
JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
SPARK_HOME=/opt/spark
PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip
PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
source /etc/environment
Configure Spark defaults with Catalyst optimizations
Enable Catalyst optimizer features including adaptive query execution, cost-based optimization, and advanced memory management.
# Catalyst Optimizer Configuration
spark.sql.adaptive.enabled true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.skewJoin.enabled true
spark.sql.adaptive.localShuffleReader.enabled true
spark.sql.adaptive.advisoryPartitionSizeInBytes 64MB
Cost-Based Optimization
spark.sql.cbo.enabled true
spark.sql.cbo.joinReorder.enabled true
spark.sql.cbo.planStats.enabled true
spark.sql.cbo.starSchemaDetection true
Memory and Performance Tuning
spark.executor.memory 4g
spark.executor.cores 2
spark.executor.instances 4
spark.driver.memory 2g
spark.driver.cores 2
spark.sql.shuffle.partitions 200
Catalyst Query Optimization
spark.sql.optimizer.excludedRules
spark.sql.optimizer.maxIterations 100
spark.sql.codegen.wholeStage true
spark.sql.codegen.splitConsumeFuncByOperator true
Advanced Catalyst Features
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 0.2
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0
Configure advanced memory management
Optimize JVM garbage collection and Spark memory allocation for large-scale analytics workloads.
#!/usr/bin/env bash
Java Options for Catalyst Optimization
export SPARK_DRIVER_OPTS="-XX:+UseG1GC -XX:+UseStringDeduplication -XX:+OptimizeStringConcat"
export SPARK_EXECUTOR_OPTS="-XX:+UseG1GC -XX:+UseStringDeduplication -XX:MaxGCPauseMillis=200"
Catalyst Memory Configuration
export SPARK_DRIVER_MEMORY="2g"
export SPARK_EXECUTOR_MEMORY="4g"
export SPARK_EXECUTOR_CORES="2"
Advanced JVM Tuning
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/opt/spark/recovery"
Python Configuration for PySpark
export PYSPARK_PYTHON=/usr/bin/python3
export PYSPARK_DRIVER_PYTHON=/usr/bin/python3
sudo chmod +x /opt/spark/conf/spark-env.sh
Configure Catalyst query compilation settings
Enable whole-stage code generation and advanced compilation optimizations for maximum SQL performance.
# Code Generation Optimization
spark.sql.codegen.aggregate.map.twolevel.enabled true
spark.sql.codegen.aggregate.splitAggregateFunc.enabled true
spark.sql.codegen.comments true
spark.sql.codegen.factoryMode CODEGEN_ONLY
spark.sql.codegen.hugeMethodLimit 65535
spark.sql.codegen.maxFields 100
spark.sql.codegen.methodSplitThreshold 1024
spark.sql.codegen.splitConsumeFuncByOperator true
spark.sql.codegen.useIdInClassName true
spark.sql.codegen.wholeStage true
Catalyst Rule-Based Optimization
spark.sql.optimizer.collapseProjectAlwaysInline false
spark.sql.optimizer.constraintPropagation.enabled true
spark.sql.optimizer.dynamicPartitionPruning.enabled true
spark.sql.optimizer.dynamicPartitionPruning.useStats true
spark.sql.optimizer.excludedRules
spark.sql.optimizer.inSetConversionThreshold 10
spark.sql.optimizer.joinReorderDPThreshold 12
spark.sql.optimizer.joinReorderEnabled true
spark.sql.optimizer.maxIterations 100
spark.sql.optimizer.nestedSchemaPruning.enabled true
spark.sql.optimizer.planChangeLog.level WARN
spark.sql.optimizer.replaceExceptWithFilter true
Advanced Query Planning
spark.sql.adaptive.enabled true
spark.sql.adaptive.forceOptimizeSkewedJoin false
spark.sql.adaptive.logLevel INFO
spark.sql.adaptive.maxNumPostShufflePartitions 500
spark.sql.adaptive.minNumPostShufflePartitions 1
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 67108864
spark.sql.adaptive.shuffle.targetPostShuffleRowCount 20000000
Install and configure performance monitoring
Set up Spark metrics collection and JMX monitoring for real-time performance tracking and Catalyst optimizer analysis.
# Spark Metrics Configuration
*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
*.sink.console.period=30
*.sink.console.unit=seconds
JMX Metrics Export
*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
Driver Metrics
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.dag.class=org.apache.spark.sql.execution.ui.SQLMetricsSource
Executor Metrics
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
Create Catalyst optimization test database
Generate test datasets to benchmark Catalyst optimizer performance improvements across different query patterns.
mkdir -p /opt/spark/test-data
cd /opt/spark/test-data
Create test data generation script
cat > generate_test_data.py << 'EOF'
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import random
spark = SparkSession.builder \
.appName("CatalystTestDataGenerator") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.cbo.enabled", "true") \
.getOrCreate()
Generate sales data
schema = StructType([
StructField("sale_id", IntegerType(), False),
StructField("customer_id", IntegerType(), False),
StructField("product_id", IntegerType(), False),
StructField("quantity", IntegerType(), False),
StructField("unit_price", DoubleType(), False),
StructField("sale_date", DateType(), False),
StructField("region", StringType(), False)
])
Create large dataset for optimization testing
data = [(i,
random.randint(1, 10000),
random.randint(1, 1000),
random.randint(1, 100),
round(random.uniform(10, 1000), 2),
None,
random.choice(["North", "South", "East", "West", "Central"])
) for i in range(1000000)]
df = spark.createDataFrame(data, schema)
df = df.withColumn("sale_date",
F.date_add(F.lit("2023-01-01"),
F.expr("CAST(rand() * 365 AS INT)")))
df.write.mode("overwrite").parquet("/opt/spark/test-data/sales")
Generate customer data
customers = [(i, f"Customer_{i}", random.choice(["Premium", "Standard", "Basic"]))
for i in range(1, 10001)]
cust_df = spark.createDataFrame(customers, ["customer_id", "customer_name", "tier"])
cust_df.write.mode("overwrite").parquet("/opt/spark/test-data/customers")
print("Test data generated successfully")
spark.stop()
EOF
python3 generate_test_data.py
Start Spark cluster with optimized configuration
Launch Spark master and worker nodes with Catalyst optimizer settings for distributed query processing.
# Start Spark master
$SPARK_HOME/sbin/start-master.sh
Start Spark worker (adjust memory based on your system)
$SPARK_HOME/sbin/start-worker.sh spark://localhost:7077
Verify cluster status
curl -s http://localhost:8080 | grep -o "Workers.*[0-9]"
curl -s http://localhost:4040 || echo "Spark UI will be available when applications run"
Implement advanced Catalyst optimizer techniques
Enable cost-based optimization with statistics
Configure automatic statistics collection and cost-based query planning for optimal join ordering and predicate pushdown.
# Launch Spark SQL shell with optimization flags
spark-sql --master spark://localhost:7077 \
--conf spark.sql.cbo.enabled=true \
--conf spark.sql.cbo.joinReorder.enabled=true \
--conf spark.sql.statistics.autoUpdate.enabled=true
-- Create optimized tables with statistics
CREATE TABLE sales
USING PARQUET
LOCATION '/opt/spark/test-data/sales';
CREATE TABLE customers
USING PARQUET
LOCATION '/opt/spark/test-data/customers';
-- Analyze tables for cost-based optimization
ANALYZE TABLE sales COMPUTE STATISTICS;
ANALYZE TABLE customers COMPUTE STATISTICS;
-- Collect column-level statistics for better optimization
ANALYZE TABLE sales COMPUTE STATISTICS FOR COLUMNS customer_id, product_id, sale_date;
ANALYZE TABLE customers COMPUTE STATISTICS FOR COLUMNS customer_id, tier;
Configure adaptive query execution
Enable dynamic partition coalescing, skew join optimization, and runtime query plan adjustments based on actual data statistics.
-- Enable all adaptive query execution features
SET spark.sql.adaptive.enabled = true;
SET spark.sql.adaptive.coalescePartitions.enabled = true;
SET spark.sql.adaptive.skewJoin.enabled = true;
SET spark.sql.adaptive.localShuffleReader.enabled = true;
SET spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled = true;
-- Configure adaptive thresholds
SET spark.sql.adaptive.advisoryPartitionSizeInBytes = 64MB;
SET spark.sql.adaptive.coalescePartitions.minPartitionSize = 1MB;
SET spark.sql.adaptive.coalescePartitions.initialPartitionNum = 200;
SET spark.sql.adaptive.shuffle.targetPostShuffleInputSize = 67108864;
Implement advanced join optimization
Configure broadcast joins, bucketing, and join reordering for complex analytical queries with multiple table joins.
-- Configure broadcast join thresholds
SET spark.sql.autoBroadcastJoinThreshold = 10MB;
SET spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin = 0.2;
-- Enable join reordering
SET spark.sql.cbo.joinReorder.enabled = true;
SET spark.sql.cbo.joinReorder.dp.threshold = 12;
SET spark.sql.cbo.starSchemaDetection = true;
-- Test complex join optimization
EXPLAIN EXTENDED
SELECT c.customer_name, c.tier,
SUM(s.quantity * s.unit_price) as total_sales,
COUNT(*) as order_count
FROM sales s
JOIN customers c ON s.customer_id = c.customer_id
WHERE s.sale_date >= '2023-06-01'
AND c.tier = 'Premium'
GROUP BY c.customer_name, c.tier
ORDER BY total_sales DESC
LIMIT 100;
Configure dynamic partition pruning
Enable runtime partition elimination and predicate pushdown optimization for partitioned datasets.
-- Enable dynamic partition pruning
SET spark.sql.optimizer.dynamicPartitionPruning.enabled = true;
SET spark.sql.optimizer.dynamicPartitionPruning.useStats = true;
SET spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio = 0.5;
-- Create partitioned table for testing
CREATE TABLE sales_partitioned
USING PARQUET
PARTITIONED BY (region)
AS SELECT * FROM sales;
-- Test partition pruning optimization
EXPLAIN EXTENDED
SELECT region, AVG(unit_price) as avg_price
FROM sales_partitioned
WHERE region IN ('North', 'South')
AND sale_date >= '2023-09-01'
GROUP BY region;
Monitor and benchmark performance
Set up query performance monitoring
Configure Spark metrics collection and query execution monitoring to track Catalyst optimizer effectiveness.
# Create performance monitoring script
cat > /opt/spark/monitor_performance.py << 'EOF'
from pyspark.sql import SparkSession
import time
import json
spark = SparkSession.builder \
.appName("CatalystPerformanceMonitor") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.cbo.enabled", "true") \
.config("spark.eventLog.enabled", "true") \
.config("spark.eventLog.dir", "/opt/spark/logs") \
.getOrCreate()
Load test tables
sales_df = spark.read.parquet("/opt/spark/test-data/sales")
customers_df = spark.read.parquet("/opt/spark/test-data/customers")
sales_df.createOrReplaceTempView("sales")
customers_df.createOrReplaceTempView("customers")
Benchmark query with timing
def benchmark_query(sql_query, description):
print(f"\n=== {description} ===")
start_time = time.time()
result = spark.sql(sql_query)
result.show(10)
execution_time = time.time() - start_time
print(f"Execution time: {execution_time:.2f} seconds")
# Show execution plan
print("\nExecution Plan:")
result.explain(True)
return execution_time
Test queries for optimization analysis
queries = [
("""SELECT region, COUNT(*) as sales_count, AVG(unit_price) as avg_price
FROM sales
WHERE sale_date >= '2023-06-01'
GROUP BY region
ORDER BY sales_count DESC""", "Regional Sales Analysis"),
("""SELECT c.tier, SUM(s.quantity * s.unit_price) as total_revenue
FROM sales s
JOIN customers c ON s.customer_id = c.customer_id
WHERE s.sale_date >= '2023-01-01'
GROUP BY c.tier
ORDER BY total_revenue DESC""", "Revenue by Customer Tier"),
("""WITH monthly_sales AS (
SELECT DATE_TRUNC('month', sale_date) as month,
SUM(quantity * unit_price) as monthly_revenue
FROM sales
GROUP BY DATE_TRUNC('month', sale_date)
)
SELECT month, monthly_revenue,
LAG(monthly_revenue) OVER (ORDER BY month) as prev_month,
(monthly_revenue - LAG(monthly_revenue) OVER (ORDER BY month)) /
LAG(monthly_revenue) OVER (ORDER BY month) * 100 as growth_rate
FROM monthly_sales
ORDER BY month""", "Monthly Revenue Trend Analysis")
]
Run benchmark tests
for query, description in queries:
benchmark_query(query, description)
spark.stop()
EOF
Run performance monitoring
python3 /opt/spark/monitor_performance.py
Analyze Catalyst optimizer statistics
Examine query execution plans and optimizer decisions to validate performance improvements and identify bottlenecks.
# Create Catalyst analysis script
cat > /opt/spark/analyze_catalyst.py << 'EOF'
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import pandas as pd
spark = SparkSession.builder \
.appName("CatalystOptimizationAnalysis") \
.getOrCreate()
Enable detailed query planning logs
spark.sparkContext.setLogLevel("INFO")
Load data
spark.read.parquet("/opt/spark/test-data/sales").createOrReplaceTempView("sales")
spark.read.parquet("/opt/spark/test-data/customers").createOrReplaceTempView("customers")
Test optimization with/without features
print("=== Testing Catalyst Optimizations ===")
Test 1: Without CBO
spark.conf.set("spark.sql.cbo.enabled", "false")
print("\nWithout Cost-Based Optimization:")
test_query = """
SELECT c.tier, COUNT(*) as customer_count, AVG(s.unit_price) as avg_price
FROM sales s
JOIN customers c ON s.customer_id = c.customer_id
GROUP BY c.tier
"""
result = spark.sql(test_query)
result.explain()
Test 2: With CBO
spark.conf.set("spark.sql.cbo.enabled", "true")
print("\nWith Cost-Based Optimization:")
result = spark.sql(test_query)
result.explain()
Test 3: Adaptive Query Execution analysis
print("\n=== Adaptive Query Execution Analysis ===")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.logLevel", "DEBUG")
complex_query = """
SELECT s.region, c.tier,
COUNT(*) as transaction_count,
SUM(s.quantity * s.unit_price) as total_revenue,
AVG(s.unit_price) as avg_unit_price
FROM sales s
JOIN customers c ON s.customer_id = c.customer_id
WHERE s.sale_date >= '2023-06-01'
AND s.quantity > 5
GROUP BY s.region, c.tier
HAVING COUNT(*) > 100
ORDER BY total_revenue DESC
"""
print("Complex Query with AQE:")
result = spark.sql(complex_query)
result.show()
result.explain("extended")
print("\n=== Query Statistics ===")
Show table statistics
spark.sql("DESCRIBE EXTENDED sales").show(100, False)
spark.sql("SHOW TBLPROPERTIES sales").show()
spark.stop()
EOF
python3 /opt/spark/analyze_catalyst.py
Configure production optimization
Set up cluster resource management
Configure dynamic resource allocation and executor scaling for production workloads with varying query complexity.
# Dynamic Resource Allocation
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 1
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.initialExecutors 3
spark.dynamicAllocation.executorIdleTimeout 60s
spark.dynamicAllocation.cachedExecutorIdleTimeout 300s
spark.dynamicAllocation.schedulerBacklogTimeout 1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
Memory Management
spark.executor.memoryFraction 0.8
spark.executor.memoryStorageFraction 0.5
spark.driver.maxResultSize 2g
spark.network.timeout 800s
spark.sql.execution.arrow.pyspark.enabled true
Shuffle Optimization
spark.shuffle.compress true
spark.shuffle.spill.compress true
spark.io.compression.codec zstd
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 67108864b
Catalyst Advanced Configuration
spark.sql.optimizer.maxIterations 100
spark.sql.optimizer.joinReorderDPThreshold 12
spark.sql.optimizer.constraintPropagation.enabled true
spark.sql.optimizer.nestedSchemaPruning.enabled true
Create automated performance tuning script
Implement automatic query plan analysis and configuration adjustment based on workload patterns and performance metrics.
#!/usr/bin/env python3
from pyspark.sql import SparkSession
import psutil
import os
def get_system_resources():
"""Get system memory and CPU info for optimization"""
memory_gb = psutil.virtual_memory().total // (1024**3)
cpu_cores = psutil.cpu_count(logical=False)
return memory_gb, cpu_cores
def optimize_spark_config():
"""Generate optimal Spark configuration based on system resources"""
memory_gb, cpu_cores = get_system_resources()
# Calculate optimal executor configuration
executor_memory = min(memory_gb // 4, 8) # Max 8GB per executor
executor_cores = min(cpu_cores // 2, 4) # Max 4 cores per executor
num_executors = max(1, cpu_cores // executor_cores)
# Calculate shuffle partitions
shuffle_partitions = num_executors executor_cores 4
config = {
"spark.executor.memory": f"{executor_memory}g",
"spark.executor.cores": str(executor_cores),
"spark.executor.instances": str(num_executors),
"spark.sql.shuffle.partitions": str(shuffle_partitions),
"spark.driver.memory": f"{min(memory_gb // 4, 4)}g",
"spark.sql.adaptive.coalescePartitions.minPartitionSize": "1MB",
"spark.sql.adaptive.advisoryPartitionSizeInBytes": f"{64 1024 1024}",
}
return config
def create_tuning_session():
"""Create optimized Spark session"""
config = optimize_spark_config()
builder = SparkSession.builder.appName("AutoTunedSpark")
# Apply optimized configuration
for key, value in config.items():
builder = builder.config(key, value)
# Enable all Catalyst optimizations
builder = builder.config("spark.sql.adaptive.enabled", "true")
builder = builder.config("spark.sql.cbo.enabled", "true")
builder = builder.config("spark.sql.codegen.wholeStage", "true")
return builder.getOrCreate()
if __name__ == "__main__":
print("=== Auto-tuning Spark Configuration ===")
memory_gb, cpu_cores = get_system_resources()
print(f"System: {memory_gb}GB RAM, {cpu_cores} CPU cores")
config = optimize_spark_config()
print("\nOptimized Configuration:")
for key, value in config.items():
print(f"{key} = {value}")
# Test the configuration
spark = create_tuning_session()
print("\n=== Configuration Applied Successfully ===")
print(f"Spark Version: {spark.version}")
print(f"Executor Memory: {spark.conf.get('spark.executor.memory')}")
print(f"Executor Cores: {spark.conf.get('spark.executor.cores')}")
print(f"Shuffle Partitions: {spark.conf.get('spark.sql.shuffle.partitions')}")
spark.stop()
chmod +x /opt/spark/auto_tune.py
python3 /opt/spark/auto_tune.py
Verify your setup
# Check Spark installation
spark-submit --version
Verify cluster is running
curl -s http://localhost:8080 | grep -i "spark master"
jps | grep -E "(Master|Worker)"
Test Catalyst optimizer
spark-sql --master spark://localhost:7077 -e "SET spark.sql.adaptive.enabled"
spark-sql --master spark://localhost:7077 -e "SET spark.sql.cbo.enabled"
Check performance monitoring
ls -la /opt/spark/logs/
ls -la /opt/spark/test-data/
Verify test data
spark-sql --master spark://localhost:7077 -e "SELECT COUNT(*) FROM parquet.\"/opt/spark/test-data/sales\""
Test query execution
time spark-sql --master spark://localhost:7077 -e "SELECT region, COUNT(*) FROM parquet.\"/opt/spark/test-data/sales\" GROUP BY region"
Check system resources
free -h
nproc
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| OutOfMemoryError in driver | Insufficient driver memory for large datasets | Increase spark.driver.memory and spark.driver.maxResultSize |
| Slow join performance | Missing table statistics for CBO | Run ANALYZE TABLE table_name COMPUTE STATISTICS |
| Many small partitions after shuffle | Default partition count too high | Enable spark.sql.adaptive.coalescePartitions.enabled=true |
| Spark SQL queries fail | Catalyst optimizer rule conflicts | Check spark.sql.optimizer.excludedRules configuration |
| Poor code generation performance | Whole-stage codegen disabled | Enable spark.sql.codegen.wholeStage=true |
| Executor crashes with large datasets | Insufficient executor memory | Increase spark.executor.memory or reduce executor.cores |
| Skewed partition processing | Data skew in join keys | Enable spark.sql.adaptive.skewJoin.enabled=true |
| Statistics collection fails | Insufficient permissions on data | Check file permissions and Hadoop configuration |
Next steps
- Configure Spark Kubernetes Operator with MinIO for cloud-native analytics
- Implement Apache Spark 3.5 cluster with YARN and HDFS for distributed computing
- Set up Spark 3.5 Delta Lake with MinIO for ACID transactions and big data analytics
- Optimize Spark SQL query performance with advanced partitioning strategies
- Implement Spark streaming with Kafka and MinIO for real-time analytics
- Monitor Apache Spark cluster performance with Prometheus and Grafana dashboards
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
# Configuration
SPARK_VERSION="3.5.1"
SPARK_HOME="/opt/spark"
JAVA_VERSION="11"
# Usage function
usage() {
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " -h, --help Show this help message"
echo " -v, --version Spark version to install (default: ${SPARK_VERSION})"
exit 1
}
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
-h|--help)
usage
;;
-v|--version)
SPARK_VERSION="$2"
shift 2
;;
*)
echo -e "${RED}Unknown option: $1${NC}"
usage
;;
esac
done
# Cleanup on failure
cleanup() {
echo -e "${RED}Installation failed. Cleaning up...${NC}"
sudo rm -rf /opt/spark-* 2>/dev/null || true
sudo rm -f /usr/local/bin/spark-* /usr/local/bin/pyspark 2>/dev/null || true
}
trap cleanup ERR
# Check if running as root or with sudo
if [[ $EUID -eq 0 ]]; then
echo -e "${YELLOW}Running as root. This is acceptable for system installation.${NC}"
SUDO_CMD=""
else
if ! command -v sudo &> /dev/null; then
echo -e "${RED}This script requires sudo privileges. Please install sudo or run as root.${NC}"
exit 1
fi
SUDO_CMD="sudo"
fi
echo -e "${GREEN}[1/8] Detecting distribution and package manager...${NC}"
# Auto-detect distribution
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update"
JAVA_PKG="openjdk-${JAVA_VERSION}-jdk"
JAVA_HOME="/usr/lib/jvm/java-${JAVA_VERSION}-openjdk-amd64"
BUILD_TOOLS="build-essential libssl-dev zlib1g-dev"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
JAVA_PKG="java-${JAVA_VERSION}-openjdk-devel"
JAVA_HOME="/usr/lib/jvm/java-${JAVA_VERSION}-openjdk"
BUILD_TOOLS="gcc gcc-c++ make openssl-devel zlib-devel"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
JAVA_PKG="java-${JAVA_VERSION}-openjdk-devel"
JAVA_HOME="/usr/lib/jvm/java-${JAVA_VERSION}-openjdk"
BUILD_TOOLS="gcc gcc-c++ make openssl-devel zlib-devel"
;;
*)
echo -e "${RED}Unsupported distribution: $ID${NC}"
exit 1
;;
esac
echo -e "${GREEN}Detected: $PRETTY_NAME using $PKG_MGR${NC}"
else
echo -e "${RED}Cannot detect distribution. /etc/os-release not found.${NC}"
exit 1
fi
echo -e "${GREEN}[2/8] Installing system dependencies and Java...${NC}"
# Update package manager and install dependencies
$SUDO_CMD $PKG_UPDATE
$SUDO_CMD $PKG_INSTALL $JAVA_PKG wget curl python3 python3-pip $BUILD_TOOLS
# Verify Java installation
if ! java -version 2>&1 | grep -q "openjdk version"; then
echo -e "${RED}Java installation failed${NC}"
exit 1
fi
echo -e "${GREEN}[3/8] Downloading Apache Spark ${SPARK_VERSION}...${NC}"
# Download Spark
SPARK_ARCHIVE="spark-${SPARK_VERSION}-bin-hadoop3.tgz"
SPARK_URL="https://downloads.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_ARCHIVE}"
cd /tmp
wget -q "$SPARK_URL" || {
echo -e "${RED}Failed to download Spark from $SPARK_URL${NC}"
exit 1
}
echo -e "${GREEN}[4/8] Installing Spark to ${SPARK_HOME}...${NC}"
# Extract and install Spark
$SUDO_CMD tar -xzf "$SPARK_ARCHIVE" -C /opt/
$SUDO_CMD mv "/opt/spark-${SPARK_VERSION}-bin-hadoop3" "$SPARK_HOME"
$SUDO_CMD chown -R "$USER:$USER" "$SPARK_HOME"
# Create symbolic links
$SUDO_CMD ln -sf "$SPARK_HOME/bin/spark-shell" /usr/local/bin/
$SUDO_CMD ln -sf "$SPARK_HOME/bin/spark-sql" /usr/local/bin/
$SUDO_CMD ln -sf "$SPARK_HOME/bin/pyspark" /usr/local/bin/
$SUDO_CMD ln -sf "$SPARK_HOME/bin/spark-submit" /usr/local/bin/
echo -e "${GREEN}[5/8] Configuring environment variables...${NC}"
# Create environment configuration
cat << EOF | $SUDO_CMD tee /etc/profile.d/spark.sh > /dev/null
export JAVA_HOME=$JAVA_HOME
export SPARK_HOME=$SPARK_HOME
export PYTHONPATH=\$SPARK_HOME/python:\$SPARK_HOME/python/lib/py4j-*.zip
export PATH=\$PATH:\$SPARK_HOME/bin:\$SPARK_HOME/sbin
EOF
$SUDO_CMD chmod 644 /etc/profile.d/spark.sh
echo -e "${GREEN}[6/8] Configuring Spark defaults with Catalyst optimizations...${NC}"
# Create Spark configuration directory and files
$SUDO_CMD mkdir -p "$SPARK_HOME/conf"
# Spark defaults configuration
cat << 'EOF' | $SUDO_CMD tee "$SPARK_HOME/conf/spark-defaults.conf" > /dev/null
# Catalyst Optimizer Configuration
spark.sql.adaptive.enabled true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.skewJoin.enabled true
spark.sql.adaptive.localShuffleReader.enabled true
spark.sql.adaptive.advisoryPartitionSizeInBytes 64MB
# Cost-Based Optimization
spark.sql.cbo.enabled true
spark.sql.cbo.joinReorder.enabled true
spark.sql.cbo.planStats.enabled true
spark.sql.cbo.starSchemaDetection true
# Memory and Performance Tuning
spark.executor.memory 4g
spark.executor.cores 2
spark.executor.instances 4
spark.driver.memory 2g
spark.driver.cores 2
spark.sql.shuffle.partitions 200
# Catalyst Query Optimization
spark.sql.optimizer.maxIterations 100
spark.sql.codegen.wholeStage true
spark.sql.codegen.splitConsumeFuncByOperator true
# Advanced Catalyst Features
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin 0.2
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0
EOF
echo -e "${GREEN}[7/8] Configuring advanced memory management...${NC}"
# Spark environment configuration
cat << 'EOF' | $SUDO_CMD tee "$SPARK_HOME/conf/spark-env.sh" > /dev/null
#!/usr/bin/env bash
# Java Options for Catalyst Optimization
export SPARK_DRIVER_OPTS="-XX:+UseG1GC -XX:+UseStringDeduplication -XX:+OptimizeStringConcat"
export SPARK_EXECUTOR_OPTS="-XX:+UseG1GC -XX:+UseStringDeduplication -XX:MaxGCPauseMillis=200"
# Catalyst Memory Configuration
export SPARK_DRIVER_MEMORY="2g"
export SPARK_EXECUTOR_MEMORY="4g"
export SPARK_EXECUTOR_CORES="2"
# Advanced JVM Tuning
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/opt/spark/recovery"
EOF
$SUDO_CMD chmod 755 "$SPARK_HOME/conf/spark-env.sh"
$SUDO_CMD chown -R "$USER:$USER" "$SPARK_HOME/conf"
# Create recovery directory
$SUDO_CMD mkdir -p /opt/spark/recovery
$SUDO_CMD chown -R "$USER:$USER" /opt/spark/recovery
$SUDO_CMD chmod 755 /opt/spark/recovery
echo -e "${GREEN}[8/8] Performing verification checks...${NC}"
# Source the environment
source /etc/profile.d/spark.sh
# Verify installations
if ! command -v spark-shell &> /dev/null; then
echo -e "${RED}Spark shell not found in PATH${NC}"
exit 1
fi
if ! command -v pyspark &> /dev/null; then
echo -e "${RED}PySpark not found in PATH${NC}"
exit 1
fi
# Test Spark installation
echo -e "${YELLOW}Testing Spark installation...${NC}"
if ! spark-shell --version 2>&1 | grep -q "version ${SPARK_VERSION}"; then
echo -e "${RED}Spark version verification failed${NC}"
exit 1
fi
# Cleanup temporary files
rm -f /tmp/"$SPARK_ARCHIVE"
echo -e "${GREEN}✅ Apache Spark ${SPARK_VERSION} with Catalyst optimizer installed successfully!${NC}"
echo -e "${GREEN}Configuration files:${NC}"
echo -e " - Environment: /etc/profile.d/spark.sh"
echo -e " - Spark config: $SPARK_HOME/conf/spark-defaults.conf"
echo -e " - Spark environment: $SPARK_HOME/conf/spark-env.sh"
echo ""
echo -e "${YELLOW}To use Spark, either restart your shell or run:${NC}"
echo -e " source /etc/profile.d/spark.sh"
echo ""
echo -e "${YELLOW}Available commands:${NC}"
echo -e " - spark-shell (Scala REPL)"
echo -e " - pyspark (Python REPL)"
echo -e " - spark-sql (SQL shell)"
echo -e " - spark-submit (Submit applications)"
Review the script before running. Execute with: bash install.sh