Implement Kafka Streams processing applications with Java and Scala for real-time data analytics

Intermediate 45 min Apr 19, 2026 147 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Build production-ready Kafka Streams applications using Java and Scala for real-time data processing, including stateless transformations, stateful aggregations, windowing, and stream joins with exactly-once semantics.

Prerequisites

  • Java 17 or later installed
  • Apache Kafka cluster running
  • Basic knowledge of stream processing concepts
  • Understanding of JSON data formats

What this solves

Kafka Streams enables real-time data processing directly within your applications without requiring separate stream processing clusters. This tutorial shows you how to build production-grade stream processing applications using both Java and Scala, covering stateless operations, stateful processing with windowing, stream joins, and exactly-once semantics for mission-critical data pipelines.

Step-by-step installation

Install Java development environment

Kafka Streams requires Java 8 or later. Install OpenJDK and verify the installation.

sudo apt update
sudo apt install -y openjdk-17-jdk maven gradle
java -version
mvn -version
sudo dnf update -y
sudo dnf install -y java-17-openjdk-devel maven gradle
java -version
mvn -version

Install Scala development environment

Install Scala and SBT for Scala-based Kafka Streams development.

curl -fL https://github.com/coursier/launchers/raw/master/cs-x86_64-pc-linux.gz | gzip -d > cs
chmod +x cs
sudo mv cs /usr/local/bin
cs setup --yes
echo 'export PATH="$PATH:$HOME/.local/share/coursier/bin"' >> ~/.bashrc
source ~/.bashrc
scala -version
sbt --version
curl -fL https://github.com/coursier/launchers/raw/master/cs-x86_64-pc-linux.gz | gzip -d > cs
chmod +x cs
sudo mv cs /usr/local/bin
cs setup --yes
echo 'export PATH="$PATH:$HOME/.local/share/coursier/bin"' >> ~/.bashrc
source ~/.bashrc
scala -version
sbt --version

Set up Kafka cluster

Install and configure Apache Kafka for stream processing. We'll use the binary distribution for simplicity.

cd /opt
sudo wget https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz
sudo tar -xzf kafka_2.13-2.8.2.tgz
sudo mv kafka_2.13-2.8.2 kafka
sudo chown -R $USER:$USER /opt/kafka
echo 'export KAFKA_HOME=/opt/kafka' >> ~/.bashrc
echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> ~/.bashrc
source ~/.bashrc

Configure and start Kafka services

Start Zookeeper and Kafka broker with production-ready configurations.

cd /opt/kafka

Start Zookeeper

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

Wait for Zookeeper to start

sleep 5

Start Kafka broker

bin/kafka-server-start.sh -daemon config/server.properties

Verify services are running

jps | grep -E '(QuorumPeerMain|Kafka)'

Create test topics

Create input and output topics for stream processing examples with multiple partitions for parallelism.

cd /opt/kafka

Create input topic for user events

bin/kafka-topics.sh --create --topic user-events \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1

Create output topic for processed events

bin/kafka-topics.sh --create --topic processed-events \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1

Create topic for aggregated results

bin/kafka-topics.sh --create --topic user-aggregates \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1

List created topics

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Build stateless stream processing applications

Create Java Maven project structure

Set up a Maven project for Java-based Kafka Streams applications with proper dependencies.

mkdir -p ~/kafka-streams-java/src/main/java/com/example/streams
cd ~/kafka-streams-java


    4.0.0
    
    com.example
    kafka-streams-java
    1.0.0
    jar
    
    
        17
        17
        3.6.0
        2.15.2
    
    
    
        
            org.apache.kafka
            kafka-streams
            ${kafka.version}
        
        
            com.fasterxml.jackson.core
            jackson-databind
            ${jackson.version}
        
        
            org.slf4j
            slf4j-simple
            2.0.9
        
        
            org.junit.jupiter
            junit-jupiter-engine
            5.10.0
            test
        
        
            org.apache.kafka
            kafka-streams-test-utils
            ${kafka.version}
            test
        
    
    
    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.11.0
                
                    17
                    17
                
            
            
                org.codehaus.mojo
                exec-maven-plugin
                3.1.0
                
                    com.example.streams.UserEventProcessor
                
            
        
    

Implement Java stateless stream processor

Create a stream processor that filters and transforms user events with JSON serialization.

package com.example.streams;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class UserEvent {
    @JsonProperty("userId")
    public String userId;
    
    @JsonProperty("eventType")
    public String eventType;
    
    @JsonProperty("timestamp")
    public long timestamp;
    
    @JsonProperty("value")
    public double value;
    
    @JsonProperty("metadata")
    public String metadata;
    
    private static final ObjectMapper objectMapper = new ObjectMapper();
    
    public UserEvent() {}
    
    public UserEvent(String userId, String eventType, long timestamp, double value, String metadata) {
        this.userId = userId;
        this.eventType = eventType;
        this.timestamp = timestamp;
        this.value = value;
        this.metadata = metadata;
    }
    
    public String toJson() throws JsonProcessingException {
        return objectMapper.writeValueAsString(this);
    }
    
    public static UserEvent fromJson(String json) throws JsonProcessingException {
        return objectMapper.readValue(json, UserEvent.class);
    }
    
    public boolean isValidEvent() {
        return userId != null && eventType != null && value >= 0;
    }
    
    public UserEvent enrich() {
        return new UserEvent(
            this.userId,
            this.eventType.toUpperCase(),
            this.timestamp,
            this.value,
            "processed_" + this.metadata
        );
    }
}

Create Java stream processing topology

Implement the main stream processing application with filtering, mapping, and branching logic.

package com.example.streams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class UserEventProcessor {
    private static final Logger logger = LoggerFactory.getLogger(UserEventProcessor.class);
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-event-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Read from input topic
        KStream userEvents = builder.stream("user-events");
        
        // Parse JSON and filter valid events
        KStream validEvents = userEvents
            .mapValues(value -> {
                try {
                    return UserEvent.fromJson(value);
                } catch (Exception e) {
                    logger.warn("Failed to parse event: {}", value);
                    return null;
                }
            })
            .filter((key, event) -> event != null && event.isValidEvent())
            .peek((key, event) -> logger.info("Processing event for user: {}", event.userId));
        
        // Branch streams based on event type
        @SuppressWarnings("unchecked")
        KStream[] branches = validEvents.branch(
            Named.as("high-value-"),
            (Predicate) (key, event) -> event.value > 1000.0,
            (Predicate) (key, event) -> event.eventType.equals("PURCHASE"),
            (Predicate) (key, event) -> true // catch-all
        );
        
        // Process high-value events
        branches[0]
            .mapValues(event -> event.enrich())
            .mapValues(event -> {
                try {
                    return event.toJson();
                } catch (Exception e) {
                    logger.error("Failed to serialize event", e);
                    return null;
                }
            })
            .filter((key, value) -> value != null)
            .to("processed-events");
        
        // Process purchase events
        branches[1]
            .filter((key, event) -> event.value > 10.0)
            .mapValues(event -> event.enrich())
            .mapValues(event -> {
                try {
                    return event.toJson();
                } catch (Exception e) {
                    logger.error("Failed to serialize event", e);
                    return null;
                }
            })
            .filter((key, value) -> value != null)
            .to("processed-events");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        CountDownLatch latch = new CountDownLatch(1);
        
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                logger.info("Shutting down stream processor...");
                streams.close();
                latch.countDown();
            }
        });
        
        try {
            streams.start();
            logger.info("Stream processor started");
            latch.await();
        } catch (Throwable e) {
            logger.error("Stream processing failed", e);
            System.exit(1);
        }
        
        System.exit(0);
    }
}

Implement stateful stream processing

Create Scala SBT project

Set up a Scala project for advanced stream processing with aggregations and windowing.

mkdir -p ~/kafka-streams-scala/src/main/scala/com/example/streams
cd ~/kafka-streams-scala
ThisBuild / version := "1.0.0"
ThisBuild / scalaVersion := "2.13.11"

val kafkaVersion = "3.6.0"
val jacksonVersion = "2.15.2"

lazy val root = (project in file("."))
  .settings(
    name := "kafka-streams-scala",
    libraryDependencies ++= Seq(
      "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion,
      "org.apache.kafka" % "kafka-streams" % kafkaVersion,
      "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
      "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion,
      "ch.qos.logback" % "logback-classic" % "1.4.11",
      "org.scalatest" %% "scalatest" % "3.2.17" % Test,
      "org.apache.kafka" % "kafka-streams-test-utils" % kafkaVersion % Test
    ),
    assembly / mainClass := Some("com.example.streams.UserAggregateProcessor"),
    assembly / assemblyJarName := "kafka-streams-scala-assembly.jar",
    assembly / assemblyMergeStrategy := {
      case "META-INF/services/org.apache.kafka.common.config.ConfigDef" => MergeStrategy.concat
      case PathList("META-INF", xs @ _*) => MergeStrategy.discard
      case _ => MergeStrategy.first
    }
  )

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.3")

Implement Scala stateful processor with windowing

Create an advanced stream processor that performs windowed aggregations and maintains state.

package com.example.streams

import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.streams.kstream.{TimeWindows, Windowed}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.slf4j.LoggerFactory

import java.time.Duration
import java.util.Properties
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

case class UserEvent(
  userId: String,
  eventType: String,
  timestamp: Long,
  value: Double,
  metadata: String
)

case class UserAggregate(
  userId: String,
  eventCount: Long,
  totalValue: Double,
  avgValue: Double,
  maxValue: Double,
  minValue: Double,
  lastEventTime: Long
)

object UserAggregateProcessor {
  private val logger = LoggerFactory.getLogger(this.getClass)
  
  private val objectMapper = new ObjectMapper()
  objectMapper.registerModule(DefaultScalaModule)
  
  implicit val consumed: Consumed[String, String] = Consumed.with(Serdes.stringSerde, Serdes.stringSerde)
  implicit val produced: Produced[String, String] = Produced.with(Serdes.stringSerde, Serdes.stringSerde)
  
  def parseUserEvent(json: String): Option[UserEvent] = {
    Try {
      objectMapper.readValue(json, classOf[UserEvent])
    } match {
      case Success(event) if event.userId != null && event.eventType != null => Some(event)
      case Success(_) => 
        logger.warn(s"Invalid event format: $json")
        None
      case Failure(e) => 
        logger.warn(s"Failed to parse event: $json", e)
        None
    }
  }
  
  def serializeAggregate(aggregate: UserAggregate): String = {
    objectMapper.writeValueAsString(aggregate)
  }
  
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-aggregate-processor")
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2)
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "3")
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000")
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "10240")
    
    val builder = new StreamsBuilder()
    
    val userEvents: KStream[String, String] = builder.streamString, String
    
    // Parse and key by userId
    val validEvents: KStream[String, UserEvent] = userEvents
      .flatMapValues(parseUserEvent)
      .selectKey((_, event) => event.userId)
    
    // Create 5-minute tumbling windows for aggregation
    val windowedAggregates: KTable[Windowed[String], UserAggregate] = validEvents
      .groupByKey
      .windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
      .aggregate(
        // Initializer
        () => UserAggregate("", 0L, 0.0, 0.0, Double.MinValue, Double.MaxValue, 0L)
      )(
        // Aggregator
        (key: String, event: UserEvent, aggregate: UserAggregate) => {
          val newCount = aggregate.eventCount + 1
          val newTotal = aggregate.totalValue + event.value
          val newAvg = newTotal / newCount
          val newMax = math.max(aggregate.maxValue, event.value)
          val newMin = if (aggregate.minValue == Double.MaxValue) event.value else math.min(aggregate.minValue, event.value)
          
          UserAggregate(
            userId = event.userId,
            eventCount = newCount,
            totalValue = newTotal,
            avgValue = newAvg,
            maxValue = newMax,
            minValue = newMin,
            lastEventTime = math.max(aggregate.lastEventTime, event.timestamp)
          )
        }
      )
    
    // Output windowed aggregates
    windowedAggregates
      .toStream
      .filter((_, aggregate) => aggregate.eventCount > 0)
      .mapValues(serializeAggregate)
      .peek((windowedKey, aggregate) => {
        val window = windowedKey.window()
        logger.info(s"Aggregate for user ${windowedKey.key()} in window [${window.start()}-${window.end()}]: $aggregate")
      })
      .selectKey((windowedKey, _) => windowedKey.key())
      .to("user-aggregates")
    
    // Session windows for detecting user activity sessions
    val sessionAggregates = validEvents
      .filter((_, event) => event.eventType == "CLICK" || event.eventType == "VIEW")
      .groupByKey
      .windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
      .count()
      .toStream
      .filter((_, count) => count >= 3) // Sessions with at least 3 events
      .mapValues(count => s"""{
        |  "sessionEventCount": $count,
        |  "sessionType": "ACTIVE"
        |}""".stripMargin)
      .selectKey((windowedKey, _) => windowedKey.key())
      .to("user-sessions")
    
    val streams = new KafkaStreams(builder.build(), props)
    
    streams.setUncaughtExceptionHandler((thread, exception) => {
      logger.error(s"Uncaught exception in stream thread $thread", exception)
      StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
    })
    
    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      logger.info("Shutting down stream processor...")
      streams.close(Duration.ofSeconds(10))
    }))
    
    try {
      streams.start()
      logger.info("User aggregate processor started")
      Thread.currentThread().join()
    } catch {
      case e: Exception => 
        logger.error("Stream processing failed", e)
        System.exit(1)
    }
  }
}

Create additional required topics

Create topics for session processing and configure retention policies for stateful operations.

cd /opt/kafka

Create topic for user sessions

bin/kafka-topics.sh --create --topic user-sessions \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1 \ --config retention.ms=86400000

Configure changelog topics for state stores

bin/kafka-topics.sh --create --topic user-aggregate-processor-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1 \ --config cleanup.policy=compact

Configure error handling and exactly-once semantics

Implement robust error handling

Create a Java processor with comprehensive error handling and dead letter topic support.

package com.example.streams;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Map;
import java.util.Properties;

public class RobustEventProcessor {
    private static final Logger logger = LoggerFactory.getLogger(RobustEventProcessor.class);
    
    public static class CustomProductionExceptionHandler implements ProductionExceptionHandler {
        @Override
        public ProductionExceptionHandlerResponse handle(ProducerRecord record, Exception exception) {
            logger.error("Error producing record to topic {}: {}", record.topic(), exception.getMessage());
            // Send to dead letter topic instead of failing
            return ProductionExceptionHandlerResponse.CONTINUE;
        }
        
        @Override
        public void configure(Map configs) {}
    }
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "robust-event-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // Disable caching for testing
        props.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, 
                 CustomProductionExceptionHandler.class.getName());
        
        // Transaction timeout should be less than max.poll.interval.ms
        props.put(StreamsConfig.TRANSACTION_TIMEOUT_CONFIG, 10000);
        props.put("max.poll.interval.ms", 300000);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        KStream events = builder.stream("user-events");
        
        // Branch for error handling
        KStream[] branches = events.branch(
            (key, value) -> {
                try {
                    UserEvent.fromJson(value);
                    return true;
                } catch (Exception e) {
                    return false;
                }
            },
            (key, value) -> true // catch invalid JSON
        );
        
        // Process valid events
        branches[0]
            .mapValues(value -> {
                try {
                    UserEvent event = UserEvent.fromJson(value);
                    return event.enrich().toJson();
                } catch (Exception e) {
                    logger.error("Error processing valid event: {}", e.getMessage());
                    return null;
                }
            })
            .filter((key, value) -> value != null)
            .to("processed-events");
        
        // Send invalid events to dead letter topic
        branches[1]
            .mapValues(value -> {
                logger.warn("Invalid event sent to DLT: {}", value);
                return "{\"error\":\"Invalid JSON format\",\"originalValue\":\"" + 
                       value.replace("\"", "\\\"") + "\"}";
            })
            .to("dead-letter-topic");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        // Set exception handlers
        streams.setUncaughtExceptionHandler(exception -> {
            logger.error("Uncaught exception in stream processing", exception);
            if (exception instanceof StreamsException) {
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            }
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
        });
        
        streams.setStateListener((newState, oldState) -> {
            logger.info("State transition: {} -> {}", oldState, newState);
            if (newState == KafkaStreams.State.ERROR) {
                logger.error("Stream processing entered ERROR state");
            }
        });
        
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("Shutting down gracefully...");
            streams.close(Duration.ofSeconds(30));
        }));
        
        try {
            streams.start();
            logger.info("Robust event processor started with exactly-once semantics");
            
            // Keep the main thread alive
            Thread.currentThread().join();
        } catch (Exception e) {
            logger.error("Failed to start stream processor", e);
            System.exit(1);
        }
    }
}

Create dead letter topic

Set up a dead letter topic for handling processing errors and invalid messages.

cd /opt/kafka

Create dead letter topic

bin/kafka-topics.sh --create --topic dead-letter-topic \ --bootstrap-server localhost:9092 \ --partitions 1 \ --replication-factor 1 \ --config retention.ms=604800000

Implement stream joins and complex event processing

Create stream-table join processor

Implement complex event processing with stream-table joins for real-time enrichment.

package com.example.streams

import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.streams.kstream.{JoinWindows, StreamJoined}
import org.slf4j.LoggerFactory

import java.time.Duration
import java.util.Properties

case class UserProfile(
  userId: String,
  name: String,
  tier: String,
  region: String,
  registrationDate: Long
)

case class EnrichedEvent(
  userId: String,
  eventType: String,
  value: Double,
  timestamp: Long,
  userName: String,
  userTier: String,
  userRegion: String,
  enrichmentTime: Long
)

object StreamJoinProcessor {
  private val logger = LoggerFactory.getLogger(this.getClass)
  
  implicit val consumed: Consumed[String, String] = Consumed.with(Serdes.stringSerde, Serdes.stringSerde)
  implicit val produced: Produced[String, String] = Produced.with(Serdes.stringSerde, Serdes.stringSerde)
  
  def parseUserProfile(json: String): Option[UserProfile] = {
    // Simplified parsing - in production use proper JSON library
    try {
      val objectMapper = new com.fasterxml.jackson.databind.ObjectMapper()
      objectMapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
      Some(objectMapper.readValue(json, classOf[UserProfile]))
    } catch {
      case e: Exception =>
        logger.warn(s"Failed to parse user profile: $json", e)
        None
    }
  }
  
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-join-processor")
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2)
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2")
    
    val builder = new StreamsBuilder()
    
    // User events stream
    val userEvents: KStream[String, UserEvent] = builder
      .streamString, String
      .flatMapValues(UserAggregateProcessor.parseUserEvent)
      .selectKey((_, event) => event.userId)
    
    // User profiles table (updated from user-profiles topic)
    val userProfiles: KTable[String, UserProfile] = builder
      .tableString, String
      .mapValues(parseUserProfile)
      .filter((_, profile) => profile.isDefined)
      .mapValues(_.get)
    
    // Stream-table join: enrich events with user profiles
    val enrichedEvents: KStream[String, EnrichedEvent] = userEvents
      .join(userProfiles)((event, profile) => {
        EnrichedEvent(
          userId = event.userId,
          eventType = event.eventType,
          value = event.value,
          timestamp = event.timestamp,
          userName = profile.name,
          userTier = profile.tier,
          userRegion = profile.region,
          enrichmentTime = System.currentTimeMillis()
        )
      })
    
    // Stream-stream join: detect related events within time window
    val clickEvents = userEvents.filter((_, event) => event.eventType == "CLICK")
    val purchaseEvents = userEvents.filter((_, event) => event.eventType == "PURCHASE")
    
    val clickToPurchase: KStream[String, String] = clickEvents
      .join(purchaseEvents)(
        (clickEvent, purchaseEvent) => {
          s"""{
            |  "userId": "${clickEvent.userId}",
            |  "clickTime": ${clickEvent.timestamp},
            |  "purchaseTime": ${purchaseEvent.timestamp},
            |  "conversionTime": ${purchaseEvent.timestamp - clickEvent.timestamp},
            |  "purchaseValue": ${purchaseEvent.value}
            |}""".stripMargin
        },
        JoinWindows.of(Duration.ofMinutes(15)),
        StreamJoined.with(Serdes.stringSerde, Serdes.stringSerde, Serdes.stringSerde)
      )
    
    // Output enriched events
    enrichedEvents
      .mapValues { event =>
        val objectMapper = new com.fasterxml.jackson.databind.ObjectMapper()
        objectMapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
        objectMapper.writeValueAsString(event)
      }
      .to("enriched-events")
    
    // Output conversion events
    clickToPurchase
      .peek((userId, conversion) => logger.info(s"Conversion detected for user $userId: $conversion"))
      .to("conversion-events")
    
    val streams = new KafkaStreams(builder.build(), props)
    
    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      streams.close(Duration.ofSeconds(15))
    }))
    
    try {
      streams.start()
      logger.info("Stream join processor started")
      Thread.currentThread().join()
    } catch {
      case e: Exception =>
        logger.error("Stream join processing failed", e)
        System.exit(1)
    }
  }
}

Create additional topics for joins

Set up topics required for stream joins and user profile management.

cd /opt/kafka

Create user profiles topic (compacted for latest values)

bin/kafka-topics.sh --create --topic user-profiles \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1 \ --config cleanup.policy=compact

Create enriched events topic

bin/kafka-topics.sh --create --topic enriched-events \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1

Create conversion events topic

bin/kafka-topics.sh --create --topic conversion-events \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1

Set up testing framework

Create Java unit tests

Implement comprehensive unit tests using TopologyTestDriver for isolated testing.

package com.example.streams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.*;
import static org.junit.jupiter.api.Assertions.*;

import java.time.Instant;
import java.util.Properties;

class UserEventProcessorTest {
    private TopologyTestDriver testDriver;
    private TestInputTopic inputTopic;
    private TestOutputTopic outputTopic;
    
    @BeforeEach
    void setUp() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Simple test topology
        builder.stream("user-events")
            .filter((key, value) -> {
                try {
                    UserEvent event = UserEvent.fromJson(value);
                    return event.isValidEvent();
                } catch (Exception e) {
                    return false;
                }
            })
            .mapValues(value -> {
                try {
                    UserEvent event = UserEvent.fromJson(value);
                    return event.enrich().toJson();
                } catch (Exception e) {
                    return null;
                }
            })
            .filter((key, value) -> value != null)
            .to("processed-events");
        
        testDriver = new TopologyTestDriver(builder.build(), props);
        inputTopic = testDriver.createInputTopic("user-events", 
            Serdes.String().serializer(), Serdes.String().serializer());
        outputTopic = testDriver.createOutputTopic("processed-events", 
            Serdes.String().deserializer(), Serdes.String().deserializer());
    }
    
    @AfterEach
    void tearDown() {
        testDriver.close();
    }
    
    @Test
    void shouldProcessValidEvent() throws Exception {
        UserEvent event = new UserEvent("user123", "PURCHASE", System.currentTimeMillis(), 99.99, "test");
        
        inputTopic.pipeInput("user123", event.toJson());
        
        TestRecord output = outputTopic.readRecord();
        assertNotNull(output);
        assertEquals("user123", output.key());
        
        UserEvent processedEvent = UserEvent.fromJson(output.value());
        assertEquals("PURCHASE", processedEvent.eventType);
        assertEquals("processed_test", processedEvent.metadata);
    }
    
    @Test
    void shouldFilterInvalidEvent() {
        inputTopic.pipeInput("user123", "invalid json");
        assertTrue(outputTopic.isEmpty());
    }
    
    @Test
    void shouldFilterNegativeValues() throws Exception {
        UserEvent event = new UserEvent("user123", "PURCHASE", System.currentTimeMillis(), -10.0, "test");
        
        inputTopic.pipeInput("user123", event.toJson());
        assertTrue(outputTopic.isEmpty());
    }
    
    @Test
    void shouldProcessMultipleEvents() throws Exception {
        UserEvent event1 = new UserEvent("user1", "CLICK", System.currentTimeMillis(), 0.0, "click1");
        UserEvent event2 = new UserEvent("user2", "PURCHASE", System.currentTimeMillis(), 50.0, "purchase1");
        
        inputTopic.pipeInput("user1", event1.toJson());
        inputTopic.pipeInput("user2", event2.toJson());
        
        assertEquals(2, outputTopic.getQueueSize());
        
        TestRecord output1 = outputTopic.readRecord();
        TestRecord output2 = outputTopic.readRecord();
        
        assertNotNull(output1);
        assertNotNull(output2);
        assertTrue(outputTopic.isEmpty());
    }
}

Build and run tests

Compile and execute the test suite to verify stream processing logic.

cd ~/kafka-streams-java

Compile the project

mvn clean compile

Run tests

mvn test

Package the application

mvn package

Deploy and monitor in production

Create production configuration

Set up production-ready configuration with monitoring and performance tuning.

# Kafka Streams Production Configuration
application.id=user-event-processor-prod
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

Processing guarantees

processing.guarantee=exactly_once_v2 commit.interval.ms=1000 cache.max.bytes.buffering=10485760

Thread configuration

num.stream.threads=4 num.standby.replicas=1

State store configuration

state.dir=/opt/kafka-streams/state replication.factor=3

Producer configuration

acks=all retries=2147483647 max.in.flight.requests.per.connection=5 enable.idempotence=true batch.size=16384 linger.ms=5

Consumer configuration

auto.offset.reset=earliest max.poll.records=1000 fetch.min.bytes=1024 fetch.max.wait.ms=500

Monitoring

metrics.recording.level=INFO metric.reporters=org.apache.kafka.common.metrics.JmxReporter

Security (if using SSL)

security.protocol=SSL ssl.truststore.location=/opt/kafka/config/kafka.client.truststore.jks ssl.truststore.password=changeme ssl.keystore.location=/opt/kafka/config/kafka.client.keystore.jks ssl.keystore.password=changeme ssl.key.password=changeme

Create systemd service

Deploy Kafka Streams application as a managed service with automatic restart and logging.

[Unit]
Description=Kafka Streams User Event Processor
After=kafka.service
Requires=kafka.service

[Service]
Type=simple
User=kafka
Group=kafka
WorkingDirectory=/opt/kafka-streams
ExecStart=/usr/bin/java -Xmx2g -Xms1g \
  -XX:+UseG1GC \
  -XX:MaxGCPauseMillis=20 \
  -XX:InitiatingHeapOccupancyPercent=35 \
  -XX:+ExplicitGCInvokesConcurrent \
  -Djava.awt.headless=true \
  -Dcom.sun.management.jmxremote \
  -Dcom.sun.management.jmxremote.authenticate=false \
  -Dcom.sun.management.jmxremote.ssl=false \
  -Dcom.sun.management.jmxremote.local.only=false \
  -Dcom.sun.management.jmxremote.port=9999 \
  -Dcom.sun.management.jmxremote.rmi.port=9999 \
  -Djava.rmi.server.hostname=localhost \
  -jar kafka-streams-java-1.0.0.jar
ExecReload=/bin/kill -HUP $MAINPID
KillMode=process
Restart=on-failure
RestartSec=42s
TimeoutSec=300
StandardOutput=journal
StandardError=journal
SyslogIdentifier=kafka-streams

[Install]
WantedBy=multi-user.target

Create monitoring setup

Configure JMX monitoring and create deployment directories with proper permissions.

# Create application user and directories
sudo useradd -r -s /bin/false kafka
sudo mkdir -p /opt/kafka-streams/state
sudo mkdir -p /opt/kafka-streams/logs
sudo chown -R kafka:kafka /opt/kafka-streams
sudo chmod 755 /opt/kafka-streams
sudo chmod 750 /opt/kafka-streams/state

Copy application JAR

sudo cp ~/kafka-streams-java/target/kafka-streams-java-1.0.0.jar /opt/kafka-streams/ sudo chown kafka:kafka /opt/kafka-streams/kafka-streams-java-1.0.0.jar sudo chmod 644 /opt/kafka-streams/kafka-streams-java-1.0.0.jar

Enable and start service

sudo systemctl daemon-reload sudo systemctl enable kafka-streams.service sudo systemctl start kafka-streams.service

Check service status

sudo systemctl status kafka-streams.service sudo journalctl -u kafka-streams.service -f

Verify your setup

Test the complete stream processing pipeline with sample data and monitoring.

# Check if all services are running
sudo systemctl status kafka-streams.service
jps | grep -E '(QuorumPeerMain|Kafka)'

Send test events to input topic

cd /opt/kafka echo '{"userId":"user123","eventType":"PURCHASE","timestamp":1703123456789,"value":99.99,"metadata":"test-purchase"}' | \ bin/kafka-console-producer.sh --topic user-events --bootstrap-server localhost:9092 echo '{"userId":"user456","eventType":"CLICK","timestamp":1703123456790,"value":0.0,"metadata":"test-click"}' | \ bin/kafka-console-producer.sh --topic user-events --bootstrap-server localhost:9092

Verify processed events

bin/kafka-console-consumer.sh --topic processed-events \ --bootstrap-server localhost:9092 \ --from-beginning \ --max-messages 5

Check JMX metrics

echo "Checking stream processing metrics..." curl -s http://localhost:9999/mbean?objectname=kafka.streams:type=stream-metrics,client-id=user-event-processor

Verify exactly-once processing

bin/kafka-run-class.sh kafka.tools.StreamsResetter \ --application-id user-event-processor \ --bootstrap-servers localhost:9092 \ --dry-run

Common issues

SymptomCauseFix
Stream processor won't start Kafka broker not available Check Kafka service: sudo systemctl status kafka
Exactly-once semantics failing Transaction timeout too high Set transaction.timeout.config=10000 in StreamsConfig
High memory usage Large state stores or cache Tune cache.max.bytes.buffering and add more stream threads
Processing lag increasing Insufficient parallelism Increase topic partitions and num.stream.threads
Rebalancing frequently Long processing time Increase max.poll.interval.ms and optimize processing logic
State store corruption Unclean shutdown Delete state directory and restart: rm -rf /opt/kafka-streams/state/*
JSON parsing errors Malformed input data Add validation and use dead letter topic for bad records
Join operations missing records Clock skew or wrong windowing Synchronize system clocks and adjust join window size

Next steps

Running this in production?

Want this handled for you? Setting this up once is straightforward. Keeping it patched, monitored, backed up and tuned across environments is the harder part. See how we run infrastructure like this for European SaaS and e-commerce teams.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.