Configure Kafka Streams for real-time data processing and analytics

Intermediate 45 min Apr 24, 2026 90 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Set up Kafka Streams applications with Java development environment to build real-time data processing pipelines for analytics and monitoring workloads.

Prerequisites

  • Java 11 or later
  • Apache Maven
  • Running Kafka cluster
  • Basic Java programming knowledge

What this solves

Kafka Streams enables you to build real-time stream processing applications that transform, aggregate, and analyze data as it flows through Apache Kafka topics. This tutorial shows you how to set up the complete Kafka Streams development environment and implement processing topologies for analytics and monitoring use cases.

Step-by-step installation

Update system packages

Start by updating your package manager to ensure you get the latest versions of all dependencies.

sudo apt update && sudo apt upgrade -y
sudo dnf update -y

Install Java Development Kit

Kafka Streams requires Java 11 or later. Install OpenJDK which provides the complete development environment.

sudo apt install -y openjdk-17-jdk maven wget curl
sudo dnf install -y java-17-openjdk-devel maven wget curl

Verify Java installation

Check that Java and Maven are properly installed and accessible from the command line.

java -version
mvn -version

Download and install Apache Kafka

Download the latest Kafka binary distribution which includes the Kafka Streams library and all necessary dependencies.

cd /opt
sudo wget https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz
sudo tar -xzf kafka_2.13-2.8.2.tgz
sudo mv kafka_2.13-2.8.2 kafka
sudo chown -R $USER:$USER /opt/kafka

Configure environment variables

Set up environment variables to make Kafka commands available system-wide and configure Java heap settings for optimal performance.

export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
source ~/.bashrc

Start ZooKeeper and Kafka broker

Start the ZooKeeper service first, then the Kafka broker. These provide the foundation for Kafka Streams applications.

cd $KAFKA_HOME
bin/zookeeper-server-start.sh config/zookeeper.properties &
sleep 10
bin/kafka-server-start.sh config/server.properties &

Create development topics

Create input and output topics for your Kafka Streams applications. These topics will hold the raw data and processed results.

bin/kafka-topics.sh --create --topic user-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
bin/kafka-topics.sh --create --topic processed-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
bin/kafka-topics.sh --create --topic analytics-results --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

Set up Kafka Streams development environment

Create Maven project structure

Set up a Maven project with the necessary directory structure for developing Kafka Streams applications.

mkdir -p ~/kafka-streams-project/src/main/java/com/example/streams
cd ~/kafka-streams-project

Configure Maven dependencies

Create a pom.xml file with Kafka Streams dependencies and build configuration for Java 17.



    4.0.0
    
    com.example
    kafka-streams-analytics
    1.0-SNAPSHOT
    
    
        17
        17
        3.5.1
        1.7.36
    
    
    
        
            org.apache.kafka
            kafka-streams
            ${kafka.version}
        
        
            org.slf4j
            slf4j-simple
            ${slf4j.version}
        
        
            com.fasterxml.jackson.core
            jackson-databind
            2.15.2
        
    
    
    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.11.0
            
        
    

Install project dependencies

Download and install all Maven dependencies required for Kafka Streams development.

mvn clean compile

Implement real-time data processing topologies

Create event data model

Define a Java class to represent the events that will be processed by your Kafka Streams application.

package com.example.streams;

import com.fasterxml.jackson.annotation.JsonProperty;

public class UserEvent {
    @JsonProperty("user_id")
    private String userId;
    
    @JsonProperty("event_type")
    private String eventType;
    
    @JsonProperty("timestamp")
    private long timestamp;
    
    @JsonProperty("value")
    private double value;
    
    public UserEvent() {}
    
    public UserEvent(String userId, String eventType, long timestamp, double value) {
        this.userId = userId;
        this.eventType = eventType;
        this.timestamp = timestamp;
        this.value = value;
    }
    
    // Getters and setters
    public String getUserId() { return userId; }
    public void setUserId(String userId) { this.userId = userId; }
    
    public String getEventType() { return eventType; }
    public void setEventType(String eventType) { this.eventType = eventType; }
    
    public long getTimestamp() { return timestamp; }
    public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
    
    public double getValue() { return value; }
    public void setValue(double value) { this.value = value; }
}

Create JSON serialization utilities

Implement serializers and deserializers for converting between Java objects and JSON for Kafka message processing.

package com.example.streams;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class JsonSerde implements Serde {
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final Class type;
    
    public JsonSerde(Class type) {
        this.type = type;
    }
    
    @Override
    public Serializer serializer() {
        return new JsonSerializer<>();
    }
    
    @Override
    public Deserializer deserializer() {
        return new JsonDeserializer<>(type);
    }
    
    private class JsonSerializer implements Serializer {
        @Override
        public byte[] serialize(String topic, T data) {
            try {
                return objectMapper.writeValueAsBytes(data);
            } catch (Exception e) {
                throw new RuntimeException("Error serializing JSON", e);
            }
        }
    }
    
    private class JsonDeserializer implements Deserializer {
        private final Class type;
        
        public JsonDeserializer(Class type) {
            this.type = type;
        }
        
        @Override
        public T deserialize(String topic, byte[] data) {
            try {
                return objectMapper.readValue(data, type);
            } catch (Exception e) {
                throw new RuntimeException("Error deserializing JSON", e);
            }
        }
    }
}

Implement real-time analytics processor

Create the main Kafka Streams application that processes events in real-time, performing filtering, aggregation, and windowing operations.

package com.example.streams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;
import java.util.Properties;

public class RealTimeAnalytics {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "real-time-analytics");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        JsonSerde userEventSerde = new JsonSerde<>(UserEvent.class);
        
        // Read events from input topic
        KStream events = builder.stream(
            "user-events",
            Consumed.with(Serdes.String(), userEventSerde)
        );
        
        // Filter high-value events
        KStream highValueEvents = events
            .filter((key, event) -> event.getValue() > 100.0);
        
        // Send filtered events to processed topic
        highValueEvents.to(
            "processed-events",
            Produced.with(Serdes.String(), userEventSerde)
        );
        
        // Aggregate events by user and event type in 5-minute windows
        KTable, Double> aggregatedValues = events
            .groupBy((key, event) -> event.getUserId() + ":" + event.getEventType())
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .aggregate(
                () -> 0.0,
                (key, event, aggregate) -> aggregate + event.getValue(),
                Materialized.with(Serdes.String(), Serdes.Double())
            );
        
        // Convert windowed results to stream and send to analytics topic
        aggregatedValues
            .toStream()
            .map((windowedKey, value) -> KeyValue.pair(
                windowedKey.key(),
                String.format("{\"window_start\": %d, \"window_end\": %d, \"total_value\": %.2f}",
                    windowedKey.window().start(),
                    windowedKey.window().end(),
                    value)
            ))
            .to("analytics-results", Produced.with(Serdes.String(), Serdes.String()));
        
        // Build and start the streams application
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        // Add shutdown hook for graceful termination
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        
        streams.start();
        System.out.println("Real-time analytics application started");
    }
}

Configure stream processing for analytics and monitoring

Create monitoring topology

Implement a separate Kafka Streams application focused on monitoring metrics and alerting on anomalies.

package com.example.streams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;
import java.util.Properties;

public class MonitoringProcessor {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "monitoring-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        JsonSerde userEventSerde = new JsonSerde<>(UserEvent.class);
        
        KStream events = builder.stream(
            "user-events",
            Consumed.with(Serdes.String(), userEventSerde)
        );
        
        // Count events per minute for rate monitoring
        KTable, Long> eventCounts = events
            .groupBy((key, event) -> event.getEventType())
            .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
            .count();
        
        // Detect anomalies (events exceeding threshold)
        eventCounts
            .toStream()
            .filter((windowedKey, count) -> count > 1000)
            .map((windowedKey, count) -> KeyValue.pair(
                windowedKey.key(),
                String.format("{\"alert\": \"high_event_rate\", \"event_type\": \"%s\", \"count\": %d, \"timestamp\": %d}",
                    windowedKey.key(),
                    count,
                    windowedKey.window().start())
            ))
            .peek((key, value) -> System.out.println("ALERT: " + value))
            .to("monitoring-alerts", Produced.with(Serdes.String(), Serdes.String()));
        
        // Calculate average event values for trend analysis
        KTable, Double> averageValues = events
            .groupBy((key, event) -> "global")
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .aggregate(
                () -> new AverageAccumulator(),
                (key, event, accumulator) -> {
                    accumulator.addValue(event.getValue());
                    return accumulator;
                },
                Materialized.with(Serdes.String(), new JsonSerde<>(AverageAccumulator.class))
            )
            .mapValues(AverageAccumulator::getAverage);
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        
        streams.start();
        System.out.println("Monitoring processor started");
    }
    
    public static class AverageAccumulator {
        private double sum = 0.0;
        private int count = 0;
        
        public AverageAccumulator() {}
        
        public void addValue(double value) {
            sum += value;
            count++;
        }
        
        public double getAverage() {
            return count > 0 ? sum / count : 0.0;
        }
        
        // Getters and setters for JSON serialization
        public double getSum() { return sum; }
        public void setSum(double sum) { this.sum = sum; }
        
        public int getCount() { return count; }
        public void setCount(int count) { this.count = count; }
    }
}

Create monitoring alerts topic

Create a dedicated topic for monitoring alerts that will be consumed by alerting systems.

cd $KAFKA_HOME
bin/kafka-topics.sh --create --topic monitoring-alerts --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Build and run the applications

Compile your Kafka Streams applications and run them to start processing real-time data.

cd ~/kafka-streams-project
mvn clean package

Run the real-time analytics application

java -cp target/classes:$(mvn dependency:build-classpath -Dmdep.outputFile=/dev/stdout -q) com.example.streams.RealTimeAnalytics &

Run the monitoring processor

java -cp target/classes:$(mvn dependency:build-classpath -Dmdep.outputFile=/dev/stdout -q) com.example.streams.MonitoringProcessor &

Configure application properties

Create production-ready configuration for your Kafka Streams applications with optimized settings for performance and reliability.

# Kafka Streams Configuration
application.id=kafka-streams-analytics
bootstrap.servers=localhost:9092

Performance tuning

num.stream.threads=4 processing.guarantee=exactly_once_v2 replication.factor=3

State store configuration

state.dir=/tmp/kafka-streams commit.interval.ms=1000

Consumer configuration

auto.offset.reset=earliest session.timeout.ms=30000 heartbeat.interval.ms=10000

Producer configuration

acks=all retries=2147483647 max.in.flight.requests.per.connection=5 enable.idempotence=true

Verify your setup

Test your Kafka Streams applications by producing sample events and monitoring the output topics.

# Produce sample events
echo '{"user_id": "user1", "event_type": "click", "timestamp": 1702891200000, "value": 150.0}' | bin/kafka-console-producer.sh --topic user-events --bootstrap-server localhost:9092
echo '{"user_id": "user2", "event_type": "purchase", "timestamp": 1702891260000, "value": 250.0}' | bin/kafka-console-producer.sh --topic user-events --bootstrap-server localhost:9092

Monitor processed events

bin/kafka-console-consumer.sh --topic processed-events --bootstrap-server localhost:9092 --from-beginning

Monitor analytics results

bin/kafka-console-consumer.sh --topic analytics-results --bootstrap-server localhost:9092 --from-beginning

Check application status

jps | grep java

Common issues

Symptom Cause Fix
Application fails to start Kafka broker not running Start ZooKeeper and Kafka: bin/zookeeper-server-start.sh config/zookeeper.properties
JSON deserialization errors Message format mismatch Check event structure matches UserEvent class fields
No output in result topics Stream processing not triggered Verify topics exist and produce test messages to input topic
High memory usage Large state stores Configure KAFKA_HEAP_OPTS="-Xmx2G" and tune window sizes
Rebalancing issues Consumer group instability Increase session.timeout.ms and heartbeat.interval.ms

Next steps

Running this in production?

Want this handled for you? Setting up Kafka Streams once is straightforward. Keeping it patched, monitored, backed up and tuned across environments is the harder part. See how we run infrastructure like this for European SaaS and e-commerce teams.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.