Implement Kafka Streams applications for real-time data processing with Java and Scala

Advanced 45 min Apr 04, 2026 12 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Build production-grade Kafka Streams applications for real-time data processing using Java and Scala. Configure stream processing topologies, implement windowing operations, and deploy scalable streaming applications with monitoring.

Prerequisites

  • Running Apache Kafka cluster
  • Java 11 or later installed
  • Maven and SBT build tools
  • Sufficient memory (4GB+ recommended)
  • Understanding of stream processing concepts

What this solves

Kafka Streams enables you to build real-time stream processing applications that transform, aggregate, and analyze continuous data flows. This tutorial shows you how to implement production-ready Kafka Streams applications in both Java and Scala with proper topology configuration, windowing operations, and monitoring.

Step-by-step installation

Update system packages

Start by updating your package manager to ensure you get the latest versions of all dependencies.

sudo apt update && sudo apt upgrade -y
sudo dnf update -y

Install Java Development Kit

Kafka Streams requires Java 11 or later for optimal performance and compatibility.

sudo apt install -y openjdk-21-jdk openjdk-21-jre
export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
echo 'export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64' >> ~/.bashrc
sudo dnf install -y java-21-openjdk java-21-openjdk-devel
export JAVA_HOME=/usr/lib/jvm/java-21-openjdk
echo 'export JAVA_HOME=/usr/lib/jvm/java-21-openjdk' >> ~/.bashrc

Install Scala build tools

Install SBT (Scala Build Tool) for compiling and managing Scala-based Kafka Streams applications.

echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key add
sudo apt update
sudo apt install -y sbt scala
sudo dnf install -y dnf-plugins-core
sudo dnf copr enable -y bintray/rpm
curl -L https://github.com/sbt/sbt/releases/download/v1.9.7/sbt-1.9.7.tgz | sudo tar -xz -C /opt/
sudo ln -s /opt/sbt/bin/sbt /usr/local/bin/sbt

Install Apache Maven

Maven handles Java project dependencies and builds for Kafka Streams applications.

sudo apt install -y maven
sudo dnf install -y maven

Create application directory structure

Set up organized directories for both Java and Scala Kafka Streams applications.

mkdir -p ~/kafka-streams-apps/{java-app,scala-app}
cd ~/kafka-streams-apps

Create Java Kafka Streams application

Initialize Maven project structure

Create a complete Maven project with the proper directory layout for Java development.

cd java-app
mvn archetype:generate -DgroupId=com.example.streams \
  -DartifactId=kafka-streams-java \
  -DarchetypeArtifactId=maven-archetype-quickstart \
  -DinteractiveMode=false
cd kafka-streams-java

Configure Maven dependencies

Add Kafka Streams dependencies and configure Java version in the POM file.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example.streams</groupId>
  <artifactId>kafka-streams-java</artifactId>
  <version>1.0-SNAPSHOT</version>
  <properties>
    <maven.compiler.source>21</maven.compiler.source>
    <maven.compiler.target>21</maven.compiler.target>
    <kafka.version>3.6.0</kafka.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>${kafka.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>2.0.9</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.16.0</version>
    </dependency>
  </dependencies>
</project>

Implement stream processing topology

Create a Java class that demonstrates stream transformations, aggregations, and windowing operations.

package com.example.streams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class StreamProcessor {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-processor-java");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10  1024  1024L);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Read from input topic
        KStream<String, String> source = builder.stream("input-events");
        
        // Transform and filter data
        KStream<String, String> processed = source
            .filter((key, value) -> value != null && value.length() > 0)
            .mapValues(value -> value.toUpperCase())
            .selectKey((key, value) -> extractUserId(value));
        
        // Create windowed aggregations
        KTable<Windowed<String>, Long> windowedCounts = processed
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
            .count();
        
        // Output aggregated results
        windowedCounts.toStream()
            .map((windowedKey, count) -> {
                String key = windowedKey.key() + "@" + windowedKey.window().start();
                return KeyValue.pair(key, count.toString());
            })
            .to("aggregated-counts");
        
        // Branch streams for different processing paths
        KStream<String, String>[] branches = processed.branch(
            (key, value) -> value.contains("ERROR"),
            (key, value) -> value.contains("WARN"),
            (key, value) -> true
        );
        
        branches[0].to("error-events");
        branches[1].to("warning-events");
        branches[2].to("info-events");
        
        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, props);
        
        CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        
        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
    
    private static String extractUserId(String value) {
        // Simple extraction - in production, use proper JSON parsing
        int start = value.indexOf("userId:") + 7;
        int end = value.indexOf(",", start);
        if (end == -1) end = value.length();
        return start > 6 ? value.substring(start, end).trim() : "unknown";
    }
}

Create Scala Kafka Streams application

Initialize SBT project

Set up a Scala project with SBT build configuration and Kafka Streams dependencies.

cd ~/kafka-streams-apps/scala-app
mkdir -p src/main/scala/com/example/streams
mkdir project

Configure SBT build file

Define project dependencies and Scala version for the streams application.

name := "kafka-streams-scala"
version := "1.0"
scalaVersion := "2.13.12"

libraryDependencies ++= Seq(
  "org.apache.kafka" % "kafka-streams" % "3.6.0",
  "org.apache.kafka" %% "kafka-streams-scala" % "3.6.0",
  "org.apache.kafka" % "kafka-clients" % "3.6.0",
  "org.slf4j" % "slf4j-simple" % "2.0.9",
  "com.fasterxml.jackson.core" % "jackson-databind" % "2.16.0",
  "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.0"
)

Configure SBT plugins

Add plugins for better Scala development experience.

addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.16")

Implement Scala stream processor

Create a Scala implementation with functional programming patterns and type safety.

package com.example.streams

import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig, Topology}
import org.apache.kafka.streams.kstream.{TimeWindows, Windowed}

import java.time.Duration
import java.util.Properties
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

object StreamProcessor extends App {
  import org.apache.kafka.streams.scala.ImplicitConversions._
  import org.apache.kafka.streams.scala.serialization.Serdes._
  
  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-processor-scala")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10  1024  1024L: java.lang.Long)
    p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000: java.lang.Integer)
    p
  }
  
  val builder = new StreamsBuilder()
  
  // Define the topology
  val sourceStream: KStream[String, String] = builder.streamString, String
  
  // Data transformation pipeline
  val processedStream: KStream[String, String] = sourceStream
    .filter((_, value) => Option(value).exists(_.nonEmpty))
    .mapValues(_.toUpperCase)
    .selectKey((_, value) => extractUserId(value))
  
  // Windowed aggregations with functional approach
  val windowedCounts: KTable[Windowed[String], Long] = processedStream
    .groupByKey
    .windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
    .count()
  
  // Transform windowed results
  windowedCounts.toStream
    .map { case (windowedKey, count) =>
      val key = s"${windowedKey.key()}@${windowedKey.window().start()}"
      (key, count.toString)
    }
    .to("aggregated-counts")
  
  // Pattern matching for stream branching
  val branches = processedStream.branch(
    (_, value) => value.contains("ERROR"),
    (_, value) => value.contains("WARN"),
    (_, _) => true
  )
  
  branches(0).to("error-events")
  branches(1).to("warning-events")
  branches(2).to("info-events")
  
  // Create and run streams
  val topology: Topology = builder.build()
  val streams = new KafkaStreams(topology, props)
  
  // Graceful shutdown
  sys.addShutdownHook {
    streams.close(Duration.ofSeconds(30))
  }
  
  Try {
    streams.start()
    // Keep the application running
    Thread.currentThread().join()
  } match {
    case Success(_) => println("Stream processing completed successfully")
    case Failure(exception) =>
      println(s"Stream processing failed: ${exception.getMessage}")
      sys.exit(1)
  }
  
  private def extractUserId(value: String): String = {
    val pattern = "userId:(\\w+)".r
    pattern.findFirstMatchIn(value)
      .map(_.group(1))
      .getOrElse("unknown")
  }
}

Configure application properties

Create streams configuration file

Define reusable configuration properties for different environments.

# Kafka Streams Application Configuration
application.id=production-streams-app
bootstrap.servers=localhost:9092

Performance tuning

num.stream.threads=4 cache.max.bytes.buffering=16777216 commit.interval.ms=1000 default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler

Security (uncomment for SSL)

security.protocol=SSL

ssl.truststore.location=/path/to/kafka.client.truststore.jks

ssl.truststore.password=truststore-password

ssl.keystore.location=/path/to/kafka.client.keystore.jks

ssl.keystore.password=keystore-password

Monitoring

metric.reporters=org.apache.kafka.common.metrics.JmxReporter auto.include.jmx.reporter=true

Create logging configuration

Set up structured logging for monitoring and debugging stream processing applications.

# Root logger
log4j.rootLogger=INFO, stdout, file

Console appender

log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

File appender

log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=/var/log/kafka-streams/streams.log log4j.appender.file.MaxFileSize=100MB log4j.appender.file.MaxBackupIndex=10 log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

Kafka Streams specific logging

log4j.logger.org.apache.kafka.streams=INFO log4j.logger.org.apache.kafka.clients=WARN

Build and deploy applications

Compile Java application

Build the Java Kafka Streams application with Maven.

cd ~/kafka-streams-apps/java-app/kafka-streams-java
mvn clean compile
mvn package

Compile Scala application

Build the Scala application using SBT with optimization flags.

cd ~/kafka-streams-apps/scala-app
sbt clean compile
sbt assembly

Create systemd service files

Set up systemd services for production deployment with proper resource management.

[Unit]
Description=Kafka Streams Java Application
After=network.target kafka.service
Requires=kafka.service

[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/usr/bin/java -Xms1g -Xmx2g -XX:+UseG1GC \
  -Djava.awt.headless=true \
  -Dlog4j.configuration=file:/opt/kafka-streams/log4j.properties \
  -cp /opt/kafka-streams/java/target/kafka-streams-java-1.0-SNAPSHOT.jar \
  com.example.streams.StreamProcessor
WorkingDirectory=/opt/kafka-streams
Restart=on-failure
RestartSec=30
TimeoutStartSec=60
TimeoutStopSec=30

Resource limits

LimitNOFILE=65536 LimitNPROC=32768 [Install] WantedBy=multi-user.target

Create Scala service configuration

Configure systemd service for the Scala streams application.

[Unit]
Description=Kafka Streams Scala Application
After=network.target kafka.service
Requires=kafka.service

[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/usr/bin/java -Xms1g -Xmx2g -XX:+UseG1GC \
  -Djava.awt.headless=true \
  -Dlog4j.configuration=file:/opt/kafka-streams/log4j.properties \
  -jar /opt/kafka-streams/scala/target/scala-2.13/kafka-streams-scala-assembly-1.0.jar
WorkingDirectory=/opt/kafka-streams
Restart=on-failure
RestartSec=30
TimeoutStartSec=60
TimeoutStopSec=30

Resource limits

LimitNOFILE=65536 LimitNPROC=32768 [Install] WantedBy=multi-user.target

Deploy applications to production directory

Copy built applications and configuration files to the production directory structure.

sudo mkdir -p /opt/kafka-streams/{java,scala}
sudo mkdir -p /var/log/kafka-streams
sudo cp ~/kafka-streams-apps/java-app/kafka-streams-java/target/kafka-streams-java-1.0-SNAPSHOT.jar /opt/kafka-streams/java/
sudo cp ~/kafka-streams-apps/scala-app/target/scala-2.13/*.jar /opt/kafka-streams/scala/
sudo cp streams.properties /opt/kafka-streams/
sudo cp log4j.properties /opt/kafka-streams/
sudo chown -R kafka:kafka /opt/kafka-streams /var/log/kafka-streams
sudo chmod 755 /opt/kafka-streams
sudo chmod 644 /opt/kafka-streams/*.properties

Monitor and manage stream applications

Enable and start services

Start the Kafka Streams applications and enable them for automatic startup.

sudo systemctl daemon-reload
sudo systemctl enable kafka-streams-java kafka-streams-scala
sudo systemctl start kafka-streams-java
sudo systemctl start kafka-streams-scala

Configure JMX monitoring

Enable JMX metrics collection for monitoring stream processing performance.

#!/bin/bash

JMX Monitoring Script for Kafka Streams

echo "Kafka Streams Metrics Collection" echo "================================"

Stream processing metrics

curl -s http://localhost:8080/jolokia/read/kafka.streams:type=stream-metrics,client-id=* | jq .

Consumer lag metrics

curl -s http://localhost:8080/jolokia/read/kafka.consumer:type=consumer-fetch-manager-metrics,client-id=* | jq .

Producer metrics

curl -s http://localhost:8080/jolokia/read/kafka.producer:type=producer-metrics,client-id=* | jq .
sudo chmod +x /opt/kafka-streams/jmx-monitoring.sh

Verify your setup

Test that your Kafka Streams applications are running correctly and processing data as expected.

# Check service status
sudo systemctl status kafka-streams-java
sudo systemctl status kafka-streams-scala

Check application logs

sudo tail -f /var/log/kafka-streams/streams.log

Test with sample data

echo 'userId:user123,event:login,timestamp:2024-01-15' | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic input-events

Verify output topics

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic aggregated-counts --from-beginning

Check stream application metrics

/opt/kafka-streams/jmx-monitoring.sh

Common issues

SymptomCauseFix
Application fails to startKafka not running or wrong bootstrap serversVerify Kafka is running: sudo systemctl status kafka
High memory usageImproper JVM heap sizing or caching configurationAdjust -Xmx and cache.max.bytes.buffering values
Slow processingInsufficient stream threads or poor parallelismIncrease num.stream.threads and topic partitions
Consumer lag increasingProcessing bottleneck or downstream system issuesMonitor processing time and scale horizontally
Compilation errors in ScalaVersion mismatch between Scala and Kafka StreamsEnsure compatible versions in build.sbt
Windowing not workingIncorrect time semantics or missing timestampsCheck message timestamps and window configuration

Next steps

Automated install script

Run this to automate the entire setup

#kafka-streams #real-time-processing #java #scala #stream-processing

Need help?

Don't want to manage this yourself?

We handle infrastructure for businesses that depend on uptime. From initial setup to ongoing operations.

Talk to an engineer