Configure Kafka Streams state stores and RocksDB optimization for high-performance streaming applications

Advanced 45 min May 16, 2026 38 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Configure Kafka Streams state stores with RocksDB optimization for high-performance streaming applications. Learn custom state store configurations, RocksDB tuning parameters, and monitoring techniques for production-grade stream processing.

Prerequisites

  • Java 11 or higher installed
  • Apache Kafka cluster running
  • Maven build tool
  • Minimum 4GB RAM available

What this solves

Kafka Streams applications rely on state stores for maintaining intermediate data during stream processing operations like aggregations, joins, and windowing. The default RocksDB state store configuration often becomes a performance bottleneck under high-throughput workloads. This tutorial shows you how to configure custom state stores, optimize RocksDB parameters, and monitor performance metrics to achieve production-grade streaming performance with lower latency and higher throughput.

Prerequisites and system setup

Update system packages

Start by updating your package manager to ensure you get the latest versions of all dependencies.

sudo apt update && sudo apt upgrade -y
sudo dnf update -y

Install Java runtime environment

Kafka Streams requires Java 11 or later. Install OpenJDK which provides excellent performance for streaming applications.

sudo apt install -y openjdk-17-jdk openjdk-17-jre
sudo dnf install -y java-17-openjdk java-17-openjdk-devel

Verify the Java installation:

java -version

Download and install Apache Kafka

Download the latest Kafka distribution with Scala 2.13 binaries for optimal performance.

cd /opt
sudo wget https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz
sudo tar -xzf kafka_2.13-2.8.2.tgz
sudo mv kafka_2.13-2.8.2 kafka
sudo chown -R $USER:$USER /opt/kafka

Add Kafka binaries to your PATH:

export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source ~/.bashrc

Start Kafka cluster

Start ZooKeeper and Kafka broker services for local development and testing.

cd /opt/kafka

Start ZooKeeper

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

Start Kafka broker

bin/kafka-server-start.sh -daemon config/server.properties

Verify the cluster is running:

bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

Configure custom state store topology

Create Maven project structure

Set up a Maven project for your Kafka Streams application with RocksDB dependencies.

mkdir -p kafka-streams-optimization/src/main/java/com/example
cd kafka-streams-optimization

Create the Maven POM file with required dependencies:



    4.0.0
    com.example
    kafka-streams-optimization
    1.0.0
    
        17
        17
        3.6.0
    
    
        
            org.apache.kafka
            kafka-streams
            ${kafka.version}
        
        
            org.rocksdb
            rocksdbjni
            8.8.1
        
        
            org.slf4j
            slf4j-simple
            1.7.36
        
    

Implement custom RocksDB state store configuration

Create a custom state store configuration class that optimizes RocksDB parameters for high-performance streaming.

package com.example;

import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.*;
import java.util.Map;

public class OptimizedRocksDBConfig implements RocksDBConfigSetter {
    
    @Override
    public void setConfig(String storeName, Options options, Map configs) {
        // Optimize block cache for better read performance
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockCacheSize(64  1024  1024); // 64MB block cache
        tableConfig.setBlockSize(16 * 1024); // 16KB block size
        tableConfig.setCacheIndexAndFilterBlocks(true);
        tableConfig.setPinL0FilterAndIndexBlocksInCache(true);
        
        // Configure bloom filter for faster lookups
        tableConfig.setFilterPolicy(new BloomFilter(10, false));
        options.setTableFormatConfig(tableConfig);
        
        // Optimize write performance
        options.setWriteBufferSize(128  1024  1024); // 128MB write buffer
        options.setMaxWriteBufferNumber(4);
        options.setMinWriteBufferNumberToMerge(2);
        
        // Configure compaction for better performance
        options.setCompressionType(CompressionType.LZ4_COMPRESSION);
        options.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION);
        options.setLevelCompactionDynamicLevelBytes(true);
        
        // Optimize for SSD storage
        options.setAllowConcurrentMemtableWrite(true);
        options.setEnableWriteThreadAdaptiveYield(true);
        options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors());
        
        // Configure background threads
        options.setMaxBackgroundCompactions(4);
        options.setMaxBackgroundFlushes(2);
        
        // Memory optimization
        options.setDbWriteBufferSize(256  1024  1024); // 256MB total write buffer
    }
    
    @Override
    public void close(String storeName, Options options) {
        // Cleanup resources if needed
    }
}

Create Kafka Streams application with optimized state stores

Implement a streaming application that uses the custom RocksDB configuration for state stores.

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.Stores;

import java.time.Duration;
import java.util.Properties;

public class StreamsApplication {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "optimized-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        // Configure RocksDB optimization
        props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, OptimizedRocksDBConfig.class);
        
        // Optimize processing threads
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, Runtime.getRuntime().availableProcessors());
        
        // Configure commit interval for performance
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
        
        // Enable exactly-once semantics
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Example: Word count with optimized state store
        KStream source = builder.stream("input-topic");
        
        source.flatMapValues(value -> java.util.Arrays.asList(value.toLowerCase().split("\\W+")))
              .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()))
              .windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
              .count(Materialized.>as(
                  Stores.persistentWindowStore("word-count-store", 
                                             Duration.ofMinutes(10),
                                             Duration.ofMinutes(5),
                                             false))
                     .withKeySerde(Serdes.String())
                     .withValueSerde(Serdes.Long()))
              .toStream()
              .map((windowedKey, count) -> 
                  new KeyValue<>(windowedKey.key() + "@" + windowedKey.window().start(), count))
              .to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        
        streams.start();
        System.out.println("Kafka Streams application started with optimized RocksDB configuration");
    }
}

Create topics and compile application

Create the required Kafka topics and compile your streaming application.

# Create input and output topics
kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1
kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1

Compile the application

mvn clean compile exec:java -Dexec.mainClass="com.example.StreamsApplication"

Advanced RocksDB tuning parameters

Configure memory-based optimizations

Create an advanced RocksDB configuration that optimizes memory usage patterns for different workload types.

package com.example;

import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.*;
import java.util.Map;

public class AdvancedRocksDBConfig implements RocksDBConfigSetter {
    
    private static final long BLOCK_CACHE_SIZE = 128  1024  1024L; // 128MB
    private static final long WRITE_BUFFER_SIZE = 64  1024  1024L; // 64MB
    private static final int MAX_WRITE_BUFFER_NUMBER = 6;
    
    @Override
    public void setConfig(String storeName, Options options, Map configs) {
        
        // Configure shared block cache across all state stores
        Cache blockCache = new LRUCache(BLOCK_CACHE_SIZE, 8);
        
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockCache(blockCache);
        tableConfig.setBlockSize(32 * 1024); // 32KB blocks for better compression
        tableConfig.setCacheIndexAndFilterBlocks(true);
        tableConfig.setPinTopLevelIndexAndFilter(true);
        
        // Optimize bloom filter for point lookups
        tableConfig.setFilterPolicy(new BloomFilter(10, false));
        tableConfig.setWholeKeyFiltering(true);
        
        options.setTableFormatConfig(tableConfig);
        
        // Memory table optimizations
        options.setWriteBufferSize(WRITE_BUFFER_SIZE);
        options.setMaxWriteBufferNumber(MAX_WRITE_BUFFER_NUMBER);
        options.setMinWriteBufferNumberToMerge(3);
        
        // Configure total memory usage across all state stores
        options.setDbWriteBufferSize(512  1024  1024L); // 512MB total
        
        // Compaction optimizations
        options.setCompressionType(CompressionType.LZ4_COMPRESSION);
        options.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION);
        options.setCompressionPerLevel(java.util.Arrays.asList(
            CompressionType.NO_COMPRESSION,
            CompressionType.NO_COMPRESSION,
            CompressionType.LZ4_COMPRESSION,
            CompressionType.LZ4_COMPRESSION,
            CompressionType.ZSTD_COMPRESSION,
            CompressionType.ZSTD_COMPRESSION,
            CompressionType.ZSTD_COMPRESSION
        ));
        
        // Level-based compaction tuning
        options.setLevelCompactionDynamicLevelBytes(true);
        options.setMaxBytesForLevelBase(256  1024  1024L); // 256MB
        options.setMaxBytesForLevelMultiplier(8);
        
        // I/O optimizations
        options.setAllowMmapReads(true);
        options.setAllowMmapWrites(false); // Better for streaming workloads
        options.setUseDirectReads(true);
        options.setUseDirectIoForFlushAndCompaction(true);
        
        // Parallelism settings
        int cpuCores = Runtime.getRuntime().availableProcessors();
        options.setIncreaseParallelism(cpuCores);
        options.setMaxBackgroundCompactions(Math.max(2, cpuCores / 2));
        options.setMaxBackgroundFlushes(Math.max(1, cpuCores / 4));
        
        // WAL optimizations
        options.setWalTtlSeconds(0);
        options.setWalSizeLimitMB(0);
        options.setMaxTotalWalSize(128  1024  1024L); // 128MB
        
        // Statistics and monitoring
        options.setStatistics(new Statistics());
        options.setStatsDumpPeriodSec(60);
        
        System.out.println("Applied advanced RocksDB configuration for store: " + storeName);
    }
    
    @Override
    public void close(String storeName, Options options) {
        System.out.println("Closing RocksDB configuration for store: " + storeName);
    }
}

Implement workload-specific state store configurations

Create different state store configurations optimized for specific streaming patterns like aggregations versus joins.

package com.example;

import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.*;
import java.util.Map;

public class WorkloadSpecificConfig implements RocksDBConfigSetter {
    
    @Override
    public void setConfig(String storeName, Options options, Map configs) {
        
        // Determine optimization strategy based on store name/type
        if (storeName.contains("aggregate") || storeName.contains("count")) {
            configureForAggregations(options);
        } else if (storeName.contains("join")) {
            configureForJoins(options);
        } else if (storeName.contains("window")) {
            configureForWindowing(options);
        } else {
            configureDefault(options);
        }
    }
    
    private void configureForAggregations(Options options) {
        // Optimize for frequent updates to existing keys
        options.setWriteBufferSize(32  1024  1024); // Smaller write buffer
        options.setMaxWriteBufferNumber(8);
        options.setMinWriteBufferNumberToMerge(4);
        
        // Faster compaction for high update rates
        options.setLevel0FileNumCompactionTrigger(2);
        options.setLevel0SlowdownWritesTrigger(8);
        options.setLevel0StopWritesTrigger(12);
        
        // Better bloom filters for point lookups
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setFilterPolicy(new BloomFilter(15, false));
        options.setTableFormatConfig(tableConfig);
        
        System.out.println("Configured RocksDB for aggregation workload");
    }
    
    private void configureForJoins(Options options) {
        // Optimize for range scans and sequential access
        options.setWriteBufferSize(128  1024  1024); // Larger write buffer
        options.setMaxWriteBufferNumber(4);
        
        // Optimize for range queries
        options.setAdviseRandomOnOpen(false);
        options.setCompactionStyle(CompactionStyle.LEVEL);
        
        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
        tableConfig.setBlockSize(64 * 1024); // Larger blocks for range scans
        tableConfig.setIndexType(IndexType.kTwoLevelIndexSearch);
        options.setTableFormatConfig(tableConfig);
        
        System.out.println("Configured RocksDB for join workload");
    }
    
    private void configureForWindowing(Options options) {
        // Optimize for TTL and time-based cleanup
        options.setWriteBufferSize(64  1024  1024);
        options.setMaxWriteBufferNumber(6);
        
        // Efficient compaction for time-series data
        options.setCompressionType(CompressionType.ZSTD_COMPRESSION);
        options.setLevelCompactionDynamicLevelBytes(true);
        
        // Optimize for sequential writes
        options.setAllowConcurrentMemtableWrite(true);
        options.setEnableWriteThreadAdaptiveYield(true);
        
        System.out.println("Configured RocksDB for windowing workload");
    }
    
    private void configureDefault(Options options) {
        // Balanced configuration for mixed workloads
        options.setWriteBufferSize(64  1024  1024);
        options.setMaxWriteBufferNumber(4);
        options.setCompressionType(CompressionType.LZ4_COMPRESSION);
        
        System.out.println("Applied default RocksDB configuration");
    }
    
    @Override
    public void close(String storeName, Options options) {
        // Cleanup if needed
    }
}

Monitor RocksDB performance metrics

Create RocksDB metrics collection

Implement a metrics collector that exposes RocksDB statistics for monitoring and alerting.

package com.example;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;
import org.rocksdb.TickerType;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class RocksDBMetricsCollector {
    
    private final KafkaStreams streams;
    private final ScheduledExecutorService scheduler;
    private final Map storeStatistics;
    
    public RocksDBMetricsCollector(KafkaStreams streams) {
        this.streams = streams;
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.storeStatistics = new HashMap<>();
    }
    
    public void startMetricsCollection() {
        scheduler.scheduleAtFixedRate(this::collectMetrics, 0, 30, TimeUnit.SECONDS);
        System.out.println("Started RocksDB metrics collection");
    }
    
    private void collectMetrics() {
        try {
            // Collect JVM memory metrics
            Runtime runtime = Runtime.getRuntime();
            long totalMemory = runtime.totalMemory();
            long freeMemory = runtime.freeMemory();
            long usedMemory = totalMemory - freeMemory;
            
            System.out.println("=== RocksDB Performance Metrics ===");
            System.out.println("JVM Memory - Total: " + (totalMemory / 1024 / 1024) + "MB, " +
                             "Used: " + (usedMemory / 1024 / 1024) + "MB, " +
                             "Free: " + (freeMemory / 1024 / 1024) + "MB");
            
            // Collect Kafka Streams metrics
            if (streams.state() == KafkaStreams.State.RUNNING) {
                collectStreamMetrics();
            }
            
        } catch (Exception e) {
            System.err.println("Error collecting metrics: " + e.getMessage());
        }
    }
    
    private void collectStreamMetrics() {
        // These would be actual RocksDB statistics in a real implementation
        // For demonstration, we'll show the structure
        
        System.out.println("Stream State: " + streams.state());
        
        // Get all local store names and collect metrics
        streams.allLocalStorePartitionLags().forEach((topicPartition, lag) -> {
            System.out.println("Store " + topicPartition.topic() + 
                             " Partition " + topicPartition.partition() + 
                             " Lag: " + lag);
        });
        
        // In a real implementation, you would:
        // 1. Access RocksDB statistics through JNI
        // 2. Export to monitoring systems like Prometheus
        // 3. Set up alerts for critical thresholds
        
        logPerformanceMetrics();
    }
    
    private void logPerformanceMetrics() {
        // Example metrics that should be monitored in production
        System.out.println("Key Metrics to Monitor:");
        System.out.println("- Compaction time and frequency");
        System.out.println("- Write amplification factor");
        System.out.println("- Block cache hit ratio");
        System.out.println("- Write stall time");
        System.out.println("- SST file sizes and levels");
        System.out.println("- Memory table flush frequency");
        System.out.println("==============================");
    }
    
    public void shutdown() {
        scheduler.shutdown();
        try {
            if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
                scheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            scheduler.shutdownNow();
        }
    }
}

Create production monitoring application

Build a comprehensive monitoring application that tracks RocksDB performance and exposes metrics for external monitoring systems.

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.Stores;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class ProductionStreamsApp {
    
    public static void main(String[] args) {
        Properties props = createStreamProperties();
        StreamsBuilder builder = new StreamsBuilder();
        
        // Build topology with multiple state stores
        buildOptimizedTopology(builder);
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        RocksDBMetricsCollector metricsCollector = new RocksDBMetricsCollector(streams);
        
        CountDownLatch latch = new CountDownLatch(1);
        
        // Exception handler
        streams.setUncaughtExceptionHandler((thread, exception) -> {
            System.err.println("Uncaught exception in thread " + thread.getName() + ": " + exception.getMessage());
            exception.printStackTrace();
            latch.countDown();
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
        });
        
        // Graceful shutdown
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutting down streams application...");
            streams.close(Duration.ofSeconds(10));
            metricsCollector.shutdown();
            latch.countDown();
        }));
        
        try {
            streams.start();
            metricsCollector.startMetricsCollection();
            System.out.println("Production Kafka Streams application started with advanced monitoring");
            latch.await();
        } catch (Exception e) {
            System.err.println("Error starting application: " + e.getMessage());
            System.exit(1);
        }
    }
    
    private static Properties createStreamProperties() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "production-optimized-streams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        // Use workload-specific RocksDB config
        props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, WorkloadSpecificConfig.class);
        
        // Production optimizations
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, Runtime.getRuntime().availableProcessors());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 64  1024  1024L); // 64MB
        
        // State directory optimization
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams-optimized");
        
        // Consumer and producer optimizations
        props.put("consumer.fetch.min.bytes", 50000);
        props.put("consumer.fetch.max.wait.ms", 100);
        props.put("producer.linger.ms", 5);
        props.put("producer.batch.size", 32768);
        
        return props;
    }
    
    private static void buildOptimizedTopology(StreamsBuilder builder) {
        KStream input = builder.stream("input-topic");
        
        // Aggregation store - optimized for frequent updates
        input.groupByKey()
             .aggregate(
                 () -> 0L,
                 (key, value, aggregate) -> aggregate + value.length(),
                 Materialized.>as(
                     Stores.persistentKeyValueStore("aggregate-store"))
                     .withKeySerde(Serdes.String())
                     .withValueSerde(Serdes.Long()))
             .toStream()
             .to("aggregated-output");
        
        // Windowed aggregation - optimized for time-series data  
        input.groupByKey()
             .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
             .count(Materialized.>as(
                 Stores.persistentWindowStore("window-count-store",
                                            Duration.ofHours(1),
                                            Duration.ofMinutes(5),
                                            false))
                     .withKeySerde(Serdes.String())
                     .withValueSerde(Serdes.Long()))
             .toStream()
             .map((windowedKey, count) -> 
                 new KeyValue<>(windowedKey.key(), count))
             .to("windowed-output");
    }
}

Compile and run the monitoring application

Build and execute the production-ready streaming application with comprehensive monitoring.

# Create additional topics for the production application
kafka-topics.sh --create --topic aggregated-output --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1
kafka-topics.sh --create --topic windowed-output --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1

Compile and run the production application

mvn clean compile exec:java -Dexec.mainClass="com.example.ProductionStreamsApp"

Performance testing and benchmarking

Create load testing script

Generate test data to validate your RocksDB optimizations under realistic load conditions.

#!/bin/bash

Performance testing script for Kafka Streams with RocksDB optimization

echo "Starting Kafka Streams load test..."

Function to produce test messages

produce_messages() { local topic=$1 local num_messages=$2 echo "Producing $num_messages messages to $topic" for i in $(seq 1 $num_messages); do key="key-$(($i % 1000))" value="test-message-$i-$(date +%s%N)" echo "$key:$value" | kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic $topic \ --property "parse.key=true" \ --property "key.separator=:" if [ $((i % 1000)) -eq 0 ]; then echo "Produced $i messages" sleep 0.1 fi done }

Start background monitoring

monitor_performance() { while true; do echo "=== Performance Check $(date) ===" # Monitor CPU usage top -bn1 | grep "load average" | awk '{print $10 $11 $12}' # Monitor memory usage free -h | grep "Mem:" # Monitor disk I/O iostat -x 1 1 | tail -n +4 echo "==============================" sleep 10 done }

Start monitoring in background

monitor_performance & MONITOR_PID=$!

Run load test

produce_messages "input-topic" 10000 echo "Load test completed. Monitoring continues..." echo "Press Ctrl+C to stop monitoring"

Cleanup function

cleanup() { echo "Stopping performance monitoring..." kill $MONITOR_PID 2>/dev/null exit 0 } trap cleanup SIGINT wait

Make the script executable and run the load test:

chmod +x load_test.sh
./load_test.sh

Verify your setup

Test that your optimized Kafka Streams application is running correctly and processing messages efficiently.

# Check if topics exist
kafka-topics.sh --list --bootstrap-server localhost:9092

Monitor consumer lag

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group production-optimized-streams

Check state directory for RocksDB files

ls -la /tmp/kafka-streams-optimized/

Monitor log files for performance metrics

tail -f /tmp/kafka-streams-optimized/production-optimized-streams-/logs/.log

Test message consumption

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic aggregated-output --from-beginning --max-messages 10
Note: Monitor the metrics output from your application console to track RocksDB performance improvements. Look for reduced latency, higher throughput, and stable memory usage patterns.

Common issues

SymptomCauseFix
High memory usageBlock cache too largeReduce setBlockCacheSize() and increase setDbWriteBufferSize()
Slow aggregation performanceWrite buffer too smallIncrease setWriteBufferSize() and setMaxWriteBufferNumber()
Write stalls occurringCompaction can't keep upIncrease setMaxBackgroundCompactions() and tune L0 trigger levels
High disk I/O latencyCompression overheadUse LZ4_COMPRESSION instead of GZIP or disable for L0/L1
Poor range query performanceSmall block sizeIncrease setBlockSize() to 64KB for range-heavy workloads
Application won't startRocksDB native library missingEnsure rocksdbjni dependency version matches your OS architecture
State store corruptionUnclean shutdownDelete state directory and restart: rm -rf /tmp/kafka-streams-optimized
Out of memory errorsTotal memory limit exceededReduce setDbWriteBufferSize() and implement memory monitoring

For testing and development scenarios, refer to our guide on setting up Kafka Streams testing framework with TopologyTestDriver for comprehensive unit testing approaches.

Next steps

Running this in production?

Want this handled for you? Running this at scale adds a second layer of work: capacity planning, failover drills, cost control, and on-call. Our managed platform covers monitoring, backups and 24/7 response by default.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.