Configure Kafka Streams state stores with RocksDB optimization for high-performance streaming applications. Learn custom state store configurations, RocksDB tuning parameters, and monitoring techniques for production-grade stream processing.
Prerequisites
- Java 11 or higher installed
- Apache Kafka cluster running
- Maven build tool
- Minimum 4GB RAM available
What this solves
Kafka Streams applications rely on state stores for maintaining intermediate data during stream processing operations like aggregations, joins, and windowing. The default RocksDB state store configuration often becomes a performance bottleneck under high-throughput workloads. This tutorial shows you how to configure custom state stores, optimize RocksDB parameters, and monitor performance metrics to achieve production-grade streaming performance with lower latency and higher throughput.
Prerequisites and system setup
Update system packages
Start by updating your package manager to ensure you get the latest versions of all dependencies.
sudo apt update && sudo apt upgrade -yInstall Java runtime environment
Kafka Streams requires Java 11 or later. Install OpenJDK which provides excellent performance for streaming applications.
sudo apt install -y openjdk-17-jdk openjdk-17-jreVerify the Java installation:
java -versionDownload and install Apache Kafka
Download the latest Kafka distribution with Scala 2.13 binaries for optimal performance.
cd /opt
sudo wget https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz
sudo tar -xzf kafka_2.13-2.8.2.tgz
sudo mv kafka_2.13-2.8.2 kafka
sudo chown -R $USER:$USER /opt/kafkaAdd Kafka binaries to your PATH:
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/binsource ~/.bashrcStart Kafka cluster
Start ZooKeeper and Kafka broker services for local development and testing.
cd /opt/kafka
Start ZooKeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Start Kafka broker
bin/kafka-server-start.sh -daemon config/server.propertiesVerify the cluster is running:
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092Configure custom state store topology
Create Maven project structure
Set up a Maven project for your Kafka Streams application with RocksDB dependencies.
mkdir -p kafka-streams-optimization/src/main/java/com/example
cd kafka-streams-optimizationCreate the Maven POM file with required dependencies:
4.0.0
com.example
kafka-streams-optimization
1.0.0
17
17
3.6.0
org.apache.kafka
kafka-streams
${kafka.version}
org.rocksdb
rocksdbjni
8.8.1
org.slf4j
slf4j-simple
1.7.36
Implement custom RocksDB state store configuration
Create a custom state store configuration class that optimizes RocksDB parameters for high-performance streaming.
package com.example;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.*;
import java.util.Map;
public class OptimizedRocksDBConfig implements RocksDBConfigSetter {
@Override
public void setConfig(String storeName, Options options, Map configs) {
// Optimize block cache for better read performance
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(64 1024 1024); // 64MB block cache
tableConfig.setBlockSize(16 * 1024); // 16KB block size
tableConfig.setCacheIndexAndFilterBlocks(true);
tableConfig.setPinL0FilterAndIndexBlocksInCache(true);
// Configure bloom filter for faster lookups
tableConfig.setFilterPolicy(new BloomFilter(10, false));
options.setTableFormatConfig(tableConfig);
// Optimize write performance
options.setWriteBufferSize(128 1024 1024); // 128MB write buffer
options.setMaxWriteBufferNumber(4);
options.setMinWriteBufferNumberToMerge(2);
// Configure compaction for better performance
options.setCompressionType(CompressionType.LZ4_COMPRESSION);
options.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION);
options.setLevelCompactionDynamicLevelBytes(true);
// Optimize for SSD storage
options.setAllowConcurrentMemtableWrite(true);
options.setEnableWriteThreadAdaptiveYield(true);
options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors());
// Configure background threads
options.setMaxBackgroundCompactions(4);
options.setMaxBackgroundFlushes(2);
// Memory optimization
options.setDbWriteBufferSize(256 1024 1024); // 256MB total write buffer
}
@Override
public void close(String storeName, Options options) {
// Cleanup resources if needed
}
} Create Kafka Streams application with optimized state stores
Implement a streaming application that uses the custom RocksDB configuration for state stores.
package com.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.Stores;
import java.time.Duration;
import java.util.Properties;
public class StreamsApplication {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "optimized-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Configure RocksDB optimization
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, OptimizedRocksDBConfig.class);
// Optimize processing threads
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, Runtime.getRuntime().availableProcessors());
// Configure commit interval for performance
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
// Enable exactly-once semantics
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder builder = new StreamsBuilder();
// Example: Word count with optimized state store
KStream source = builder.stream("input-topic");
source.flatMapValues(value -> java.util.Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
.count(Materialized.>as(
Stores.persistentWindowStore("word-count-store",
Duration.ofMinutes(10),
Duration.ofMinutes(5),
false))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream()
.map((windowedKey, count) ->
new KeyValue<>(windowedKey.key() + "@" + windowedKey.window().start(), count))
.to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
System.out.println("Kafka Streams application started with optimized RocksDB configuration");
}
} Create topics and compile application
Create the required Kafka topics and compile your streaming application.
# Create input and output topics
kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1
kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1
Compile the application
mvn clean compile exec:java -Dexec.mainClass="com.example.StreamsApplication"Advanced RocksDB tuning parameters
Configure memory-based optimizations
Create an advanced RocksDB configuration that optimizes memory usage patterns for different workload types.
package com.example;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.*;
import java.util.Map;
public class AdvancedRocksDBConfig implements RocksDBConfigSetter {
private static final long BLOCK_CACHE_SIZE = 128 1024 1024L; // 128MB
private static final long WRITE_BUFFER_SIZE = 64 1024 1024L; // 64MB
private static final int MAX_WRITE_BUFFER_NUMBER = 6;
@Override
public void setConfig(String storeName, Options options, Map configs) {
// Configure shared block cache across all state stores
Cache blockCache = new LRUCache(BLOCK_CACHE_SIZE, 8);
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCache(blockCache);
tableConfig.setBlockSize(32 * 1024); // 32KB blocks for better compression
tableConfig.setCacheIndexAndFilterBlocks(true);
tableConfig.setPinTopLevelIndexAndFilter(true);
// Optimize bloom filter for point lookups
tableConfig.setFilterPolicy(new BloomFilter(10, false));
tableConfig.setWholeKeyFiltering(true);
options.setTableFormatConfig(tableConfig);
// Memory table optimizations
options.setWriteBufferSize(WRITE_BUFFER_SIZE);
options.setMaxWriteBufferNumber(MAX_WRITE_BUFFER_NUMBER);
options.setMinWriteBufferNumberToMerge(3);
// Configure total memory usage across all state stores
options.setDbWriteBufferSize(512 1024 1024L); // 512MB total
// Compaction optimizations
options.setCompressionType(CompressionType.LZ4_COMPRESSION);
options.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION);
options.setCompressionPerLevel(java.util.Arrays.asList(
CompressionType.NO_COMPRESSION,
CompressionType.NO_COMPRESSION,
CompressionType.LZ4_COMPRESSION,
CompressionType.LZ4_COMPRESSION,
CompressionType.ZSTD_COMPRESSION,
CompressionType.ZSTD_COMPRESSION,
CompressionType.ZSTD_COMPRESSION
));
// Level-based compaction tuning
options.setLevelCompactionDynamicLevelBytes(true);
options.setMaxBytesForLevelBase(256 1024 1024L); // 256MB
options.setMaxBytesForLevelMultiplier(8);
// I/O optimizations
options.setAllowMmapReads(true);
options.setAllowMmapWrites(false); // Better for streaming workloads
options.setUseDirectReads(true);
options.setUseDirectIoForFlushAndCompaction(true);
// Parallelism settings
int cpuCores = Runtime.getRuntime().availableProcessors();
options.setIncreaseParallelism(cpuCores);
options.setMaxBackgroundCompactions(Math.max(2, cpuCores / 2));
options.setMaxBackgroundFlushes(Math.max(1, cpuCores / 4));
// WAL optimizations
options.setWalTtlSeconds(0);
options.setWalSizeLimitMB(0);
options.setMaxTotalWalSize(128 1024 1024L); // 128MB
// Statistics and monitoring
options.setStatistics(new Statistics());
options.setStatsDumpPeriodSec(60);
System.out.println("Applied advanced RocksDB configuration for store: " + storeName);
}
@Override
public void close(String storeName, Options options) {
System.out.println("Closing RocksDB configuration for store: " + storeName);
}
} Implement workload-specific state store configurations
Create different state store configurations optimized for specific streaming patterns like aggregations versus joins.
package com.example;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.*;
import java.util.Map;
public class WorkloadSpecificConfig implements RocksDBConfigSetter {
@Override
public void setConfig(String storeName, Options options, Map configs) {
// Determine optimization strategy based on store name/type
if (storeName.contains("aggregate") || storeName.contains("count")) {
configureForAggregations(options);
} else if (storeName.contains("join")) {
configureForJoins(options);
} else if (storeName.contains("window")) {
configureForWindowing(options);
} else {
configureDefault(options);
}
}
private void configureForAggregations(Options options) {
// Optimize for frequent updates to existing keys
options.setWriteBufferSize(32 1024 1024); // Smaller write buffer
options.setMaxWriteBufferNumber(8);
options.setMinWriteBufferNumberToMerge(4);
// Faster compaction for high update rates
options.setLevel0FileNumCompactionTrigger(2);
options.setLevel0SlowdownWritesTrigger(8);
options.setLevel0StopWritesTrigger(12);
// Better bloom filters for point lookups
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setFilterPolicy(new BloomFilter(15, false));
options.setTableFormatConfig(tableConfig);
System.out.println("Configured RocksDB for aggregation workload");
}
private void configureForJoins(Options options) {
// Optimize for range scans and sequential access
options.setWriteBufferSize(128 1024 1024); // Larger write buffer
options.setMaxWriteBufferNumber(4);
// Optimize for range queries
options.setAdviseRandomOnOpen(false);
options.setCompactionStyle(CompactionStyle.LEVEL);
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockSize(64 * 1024); // Larger blocks for range scans
tableConfig.setIndexType(IndexType.kTwoLevelIndexSearch);
options.setTableFormatConfig(tableConfig);
System.out.println("Configured RocksDB for join workload");
}
private void configureForWindowing(Options options) {
// Optimize for TTL and time-based cleanup
options.setWriteBufferSize(64 1024 1024);
options.setMaxWriteBufferNumber(6);
// Efficient compaction for time-series data
options.setCompressionType(CompressionType.ZSTD_COMPRESSION);
options.setLevelCompactionDynamicLevelBytes(true);
// Optimize for sequential writes
options.setAllowConcurrentMemtableWrite(true);
options.setEnableWriteThreadAdaptiveYield(true);
System.out.println("Configured RocksDB for windowing workload");
}
private void configureDefault(Options options) {
// Balanced configuration for mixed workloads
options.setWriteBufferSize(64 1024 1024);
options.setMaxWriteBufferNumber(4);
options.setCompressionType(CompressionType.LZ4_COMPRESSION);
System.out.println("Applied default RocksDB configuration");
}
@Override
public void close(String storeName, Options options) {
// Cleanup if needed
}
} Monitor RocksDB performance metrics
Create RocksDB metrics collection
Implement a metrics collector that exposes RocksDB statistics for monitoring and alerting.
package com.example;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;
import org.rocksdb.TickerType;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class RocksDBMetricsCollector {
private final KafkaStreams streams;
private final ScheduledExecutorService scheduler;
private final Map storeStatistics;
public RocksDBMetricsCollector(KafkaStreams streams) {
this.streams = streams;
this.scheduler = Executors.newScheduledThreadPool(1);
this.storeStatistics = new HashMap<>();
}
public void startMetricsCollection() {
scheduler.scheduleAtFixedRate(this::collectMetrics, 0, 30, TimeUnit.SECONDS);
System.out.println("Started RocksDB metrics collection");
}
private void collectMetrics() {
try {
// Collect JVM memory metrics
Runtime runtime = Runtime.getRuntime();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;
System.out.println("=== RocksDB Performance Metrics ===");
System.out.println("JVM Memory - Total: " + (totalMemory / 1024 / 1024) + "MB, " +
"Used: " + (usedMemory / 1024 / 1024) + "MB, " +
"Free: " + (freeMemory / 1024 / 1024) + "MB");
// Collect Kafka Streams metrics
if (streams.state() == KafkaStreams.State.RUNNING) {
collectStreamMetrics();
}
} catch (Exception e) {
System.err.println("Error collecting metrics: " + e.getMessage());
}
}
private void collectStreamMetrics() {
// These would be actual RocksDB statistics in a real implementation
// For demonstration, we'll show the structure
System.out.println("Stream State: " + streams.state());
// Get all local store names and collect metrics
streams.allLocalStorePartitionLags().forEach((topicPartition, lag) -> {
System.out.println("Store " + topicPartition.topic() +
" Partition " + topicPartition.partition() +
" Lag: " + lag);
});
// In a real implementation, you would:
// 1. Access RocksDB statistics through JNI
// 2. Export to monitoring systems like Prometheus
// 3. Set up alerts for critical thresholds
logPerformanceMetrics();
}
private void logPerformanceMetrics() {
// Example metrics that should be monitored in production
System.out.println("Key Metrics to Monitor:");
System.out.println("- Compaction time and frequency");
System.out.println("- Write amplification factor");
System.out.println("- Block cache hit ratio");
System.out.println("- Write stall time");
System.out.println("- SST file sizes and levels");
System.out.println("- Memory table flush frequency");
System.out.println("==============================");
}
public void shutdown() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
}
}
} Create production monitoring application
Build a comprehensive monitoring application that tracks RocksDB performance and exposes metrics for external monitoring systems.
package com.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.Stores;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class ProductionStreamsApp {
public static void main(String[] args) {
Properties props = createStreamProperties();
StreamsBuilder builder = new StreamsBuilder();
// Build topology with multiple state stores
buildOptimizedTopology(builder);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
RocksDBMetricsCollector metricsCollector = new RocksDBMetricsCollector(streams);
CountDownLatch latch = new CountDownLatch(1);
// Exception handler
streams.setUncaughtExceptionHandler((thread, exception) -> {
System.err.println("Uncaught exception in thread " + thread.getName() + ": " + exception.getMessage());
exception.printStackTrace();
latch.countDown();
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
});
// Graceful shutdown
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down streams application...");
streams.close(Duration.ofSeconds(10));
metricsCollector.shutdown();
latch.countDown();
}));
try {
streams.start();
metricsCollector.startMetricsCollection();
System.out.println("Production Kafka Streams application started with advanced monitoring");
latch.await();
} catch (Exception e) {
System.err.println("Error starting application: " + e.getMessage());
System.exit(1);
}
}
private static Properties createStreamProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "production-optimized-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Use workload-specific RocksDB config
props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, WorkloadSpecificConfig.class);
// Production optimizations
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, Runtime.getRuntime().availableProcessors());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 64 1024 1024L); // 64MB
// State directory optimization
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams-optimized");
// Consumer and producer optimizations
props.put("consumer.fetch.min.bytes", 50000);
props.put("consumer.fetch.max.wait.ms", 100);
props.put("producer.linger.ms", 5);
props.put("producer.batch.size", 32768);
return props;
}
private static void buildOptimizedTopology(StreamsBuilder builder) {
KStream input = builder.stream("input-topic");
// Aggregation store - optimized for frequent updates
input.groupByKey()
.aggregate(
() -> 0L,
(key, value, aggregate) -> aggregate + value.length(),
Materialized.>as(
Stores.persistentKeyValueStore("aggregate-store"))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream()
.to("aggregated-output");
// Windowed aggregation - optimized for time-series data
input.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count(Materialized.>as(
Stores.persistentWindowStore("window-count-store",
Duration.ofHours(1),
Duration.ofMinutes(5),
false))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream()
.map((windowedKey, count) ->
new KeyValue<>(windowedKey.key(), count))
.to("windowed-output");
}
} Compile and run the monitoring application
Build and execute the production-ready streaming application with comprehensive monitoring.
# Create additional topics for the production application
kafka-topics.sh --create --topic aggregated-output --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1
kafka-topics.sh --create --topic windowed-output --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1
Compile and run the production application
mvn clean compile exec:java -Dexec.mainClass="com.example.ProductionStreamsApp"Performance testing and benchmarking
Create load testing script
Generate test data to validate your RocksDB optimizations under realistic load conditions.
#!/bin/bash
Performance testing script for Kafka Streams with RocksDB optimization
echo "Starting Kafka Streams load test..."
Function to produce test messages
produce_messages() {
local topic=$1
local num_messages=$2
echo "Producing $num_messages messages to $topic"
for i in $(seq 1 $num_messages); do
key="key-$(($i % 1000))"
value="test-message-$i-$(date +%s%N)"
echo "$key:$value" | kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic $topic \
--property "parse.key=true" \
--property "key.separator=:"
if [ $((i % 1000)) -eq 0 ]; then
echo "Produced $i messages"
sleep 0.1
fi
done
}
Start background monitoring
monitor_performance() {
while true; do
echo "=== Performance Check $(date) ==="
# Monitor CPU usage
top -bn1 | grep "load average" | awk '{print $10 $11 $12}'
# Monitor memory usage
free -h | grep "Mem:"
# Monitor disk I/O
iostat -x 1 1 | tail -n +4
echo "=============================="
sleep 10
done
}
Start monitoring in background
monitor_performance &
MONITOR_PID=$!
Run load test
produce_messages "input-topic" 10000
echo "Load test completed. Monitoring continues..."
echo "Press Ctrl+C to stop monitoring"
Cleanup function
cleanup() {
echo "Stopping performance monitoring..."
kill $MONITOR_PID 2>/dev/null
exit 0
}
trap cleanup SIGINT
waitMake the script executable and run the load test:
chmod +x load_test.sh
./load_test.shVerify your setup
Test that your optimized Kafka Streams application is running correctly and processing messages efficiently.
# Check if topics exist
kafka-topics.sh --list --bootstrap-server localhost:9092
Monitor consumer lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group production-optimized-streams
Check state directory for RocksDB files
ls -la /tmp/kafka-streams-optimized/
Monitor log files for performance metrics
tail -f /tmp/kafka-streams-optimized/production-optimized-streams-/logs/.log
Test message consumption
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic aggregated-output --from-beginning --max-messages 10Common issues
| Symptom | Cause | Fix |
|---|---|---|
| High memory usage | Block cache too large | Reduce setBlockCacheSize() and increase setDbWriteBufferSize() |
| Slow aggregation performance | Write buffer too small | Increase setWriteBufferSize() and setMaxWriteBufferNumber() |
| Write stalls occurring | Compaction can't keep up | Increase setMaxBackgroundCompactions() and tune L0 trigger levels |
| High disk I/O latency | Compression overhead | Use LZ4_COMPRESSION instead of GZIP or disable for L0/L1 |
| Poor range query performance | Small block size | Increase setBlockSize() to 64KB for range-heavy workloads |
| Application won't start | RocksDB native library missing | Ensure rocksdbjni dependency version matches your OS architecture |
| State store corruption | Unclean shutdown | Delete state directory and restart: rm -rf /tmp/kafka-streams-optimized |
| Out of memory errors | Total memory limit exceeded | Reduce setDbWriteBufferSize() and implement memory monitoring |
For testing and development scenarios, refer to our guide on setting up Kafka Streams testing framework with TopologyTestDriver for comprehensive unit testing approaches.
Next steps
- Implement advanced Kafka Streams applications for real-time processing
- Configure Kafka monitoring with Prometheus and Grafana dashboards
- Set up Kafka Schema Registry with Avro serialization for data processing
- Implement Kafka Streams exactly-once semantics for mission-critical applications
- Configure Kafka Streams multi-datacenter replication for disaster recovery
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
# Configuration
KAFKA_VERSION="2.8.2"
SCALA_VERSION="2.13"
INSTALL_DIR="/opt"
KAFKA_USER="kafka"
PROJECT_NAME="kafka-streams-optimization"
print_status() {
echo -e "${GREEN}[INFO]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
usage() {
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " -u, --user USER Kafka user (default: kafka)"
echo " -d, --dir DIR Install directory (default: /opt)"
echo " -h, --help Show this help message"
exit 1
}
cleanup_on_error() {
print_error "Installation failed. Cleaning up..."
if systemctl is-active --quiet kafka 2>/dev/null; then
systemctl stop kafka
fi
if systemctl is-active --quiet zookeeper 2>/dev/null; then
systemctl stop zookeeper
fi
if [ -d "${INSTALL_DIR}/kafka" ]; then
rm -rf "${INSTALL_DIR}/kafka"
fi
if id "$KAFKA_USER" &>/dev/null; then
userdel -r "$KAFKA_USER" 2>/dev/null || true
fi
}
trap cleanup_on_error ERR
# Parse command line arguments
while [[ $# -gt 0 ]]; do
case $1 in
-u|--user)
KAFKA_USER="$2"
shift 2
;;
-d|--dir)
INSTALL_DIR="$2"
shift 2
;;
-h|--help)
usage
;;
*)
echo "Unknown option: $1"
usage
;;
esac
done
# Check if running as root
if [ "$EUID" -ne 0 ]; then
print_error "This script must be run as root or with sudo"
exit 1
fi
# Auto-detect distribution
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update && apt upgrade -y"
JAVA_PKG="openjdk-17-jdk"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
JAVA_PKG="java-17-openjdk java-17-openjdk-devel"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
JAVA_PKG="java-17-openjdk java-17-openjdk-devel"
;;
*)
print_error "Unsupported distribution: $ID"
exit 1
;;
esac
else
print_error "Cannot detect distribution. /etc/os-release not found."
exit 1
fi
print_status "Detected distribution: $ID"
echo "[1/9] Updating system packages..."
$PKG_UPDATE
echo "[2/9] Installing Java runtime environment..."
$PKG_INSTALL $JAVA_PKG wget tar
# Verify Java installation
java -version
echo "[3/9] Creating Kafka user..."
if ! id "$KAFKA_USER" &>/dev/null; then
useradd -r -m -s /bin/bash "$KAFKA_USER"
fi
echo "[4/9] Downloading and installing Apache Kafka..."
cd "$INSTALL_DIR"
if [ ! -f "kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" ]; then
wget "https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
fi
if [ -d "kafka" ]; then
rm -rf kafka
fi
tar -xzf "kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
mv "kafka_${SCALA_VERSION}-${KAFKA_VERSION}" kafka
chown -R "$KAFKA_USER:$KAFKA_USER" "${INSTALL_DIR}/kafka"
echo "[5/9] Configuring environment variables..."
cat > /etc/profile.d/kafka.sh << EOF
export KAFKA_HOME=${INSTALL_DIR}/kafka
export PATH=\$PATH:\$KAFKA_HOME/bin
EOF
chmod 644 /etc/profile.d/kafka.sh
source /etc/profile.d/kafka.sh
echo "[6/9] Creating systemd service files..."
# ZooKeeper service
cat > /etc/systemd/system/zookeeper.service << EOF
[Unit]
Description=Apache ZooKeeper
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=forking
User=$KAFKA_USER
Group=$KAFKA_USER
Environment=JAVA_HOME=/usr/lib/jvm/java-17-openjdk
ExecStart=${INSTALL_DIR}/kafka/bin/zookeeper-server-start.sh -daemon ${INSTALL_DIR}/kafka/config/zookeeper.properties
ExecStop=${INSTALL_DIR}/kafka/bin/zookeeper-server-stop.sh
TimeoutSec=30
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
# Kafka service
cat > /etc/systemd/system/kafka.service << EOF
[Unit]
Description=Apache Kafka
Documentation=http://kafka.apache.org
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=forking
User=$KAFKA_USER
Group=$KAFKA_USER
Environment=JAVA_HOME=/usr/lib/jvm/java-17-openjdk
ExecStart=${INSTALL_DIR}/kafka/bin/kafka-server-start.sh -daemon ${INSTALL_DIR}/kafka/config/server.properties
ExecStop=${INSTALL_DIR}/kafka/bin/kafka-server-stop.sh
TimeoutSec=30
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable zookeeper kafka
echo "[7/9] Starting Kafka cluster..."
systemctl start zookeeper
sleep 5
systemctl start kafka
sleep 10
echo "[8/9] Creating Maven project structure..."
PROJECT_DIR="/home/$KAFKA_USER/$PROJECT_NAME"
sudo -u "$KAFKA_USER" mkdir -p "$PROJECT_DIR/src/main/java/com/example"
# Create POM file
sudo -u "$KAFKA_USER" cat > "$PROJECT_DIR/pom.xml" << 'EOF'
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-streams-optimization</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<kafka.version>3.6.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>8.8.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
</dependency>
</dependencies>
</project>
EOF
# Create RocksDB configuration class
sudo -u "$KAFKA_USER" cat > "$PROJECT_DIR/src/main/java/com/example/OptimizedRocksDBConfig.java" << 'EOF'
package com.example;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.*;
import java.util.Map;
public class OptimizedRocksDBConfig implements RocksDBConfigSetter {
@Override
public void setConfig(String storeName, Options options, Map<String, Object> configs) {
// Optimize block cache for better read performance
BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
tableConfig.setBlockCacheSize(64 * 1024 * 1024); // 64MB block cache
tableConfig.setBlockSize(16 * 1024); // 16KB block size
tableConfig.setCacheIndexAndFilterBlocks(true);
// Configure bloom filter for faster lookups
tableConfig.setFilterPolicy(new BloomFilter(10, false));
options.setTableFormatConfig(tableConfig);
// Optimize write performance
options.setWriteBufferSize(128 * 1024 * 1024); // 128MB write buffer
options.setMaxWriteBufferNumber(4);
options.setMinWriteBufferNumberToMerge(2);
// Configure compaction for better performance
options.setCompressionType(CompressionType.LZ4_COMPRESSION);
options.setLevelCompactionDynamicLevelBytes(true);
options.setMaxBackgroundCompactions(4);
options.setMaxBackgroundFlushes(2);
}
}
EOF
chown -R "$KAFKA_USER:$KAFKA_USER" "$PROJECT_DIR"
echo "[9/9] Verifying installation..."
if systemctl is-active --quiet kafka && systemctl is-active --quiet zookeeper; then
print_status "ZooKeeper and Kafka services are running"
# Test Kafka connectivity
if sudo -u "$KAFKA_USER" "${INSTALL_DIR}/kafka/bin/kafka-broker-api-versions.sh" --bootstrap-server localhost:9092 >/dev/null 2>&1; then
print_status "Kafka cluster is responding"
else
print_warning "Kafka cluster might not be fully ready yet"
fi
print_status "✅ Installation completed successfully!"
echo
echo "Kafka installation details:"
echo "- Install directory: ${INSTALL_DIR}/kafka"
echo "- Kafka user: $KAFKA_USER"
echo "- Project directory: $PROJECT_DIR"
echo "- Bootstrap server: localhost:9092"
echo
echo "Services can be managed with:"
echo "- systemctl start/stop/restart zookeeper"
echo "- systemctl start/stop/restart kafka"
else
print_error "Services are not running properly"
exit 1
fi
Review the script before running. Execute with: bash install.sh