Build production-ready Kafka Streams applications using Java and Scala for real-time data processing, including stateless transformations, stateful aggregations, windowing, and stream joins with exactly-once semantics.
Prerequisites
- Java 17 or later installed
- Apache Kafka cluster running
- Basic knowledge of stream processing concepts
- Understanding of JSON data formats
What this solves
Kafka Streams enables real-time data processing directly within your applications without requiring separate stream processing clusters. This tutorial shows you how to build production-grade stream processing applications using both Java and Scala, covering stateless operations, stateful processing with windowing, stream joins, and exactly-once semantics for mission-critical data pipelines.
Step-by-step installation
Install Java development environment
Kafka Streams requires Java 8 or later. Install OpenJDK and verify the installation.
sudo apt update
sudo apt install -y openjdk-17-jdk maven gradle
java -version
mvn -version
Install Scala development environment
Install Scala and SBT for Scala-based Kafka Streams development.
curl -fL https://github.com/coursier/launchers/raw/master/cs-x86_64-pc-linux.gz | gzip -d > cs
chmod +x cs
sudo mv cs /usr/local/bin
cs setup --yes
echo 'export PATH="$PATH:$HOME/.local/share/coursier/bin"' >> ~/.bashrc
source ~/.bashrc
scala -version
sbt --version
Set up Kafka cluster
Install and configure Apache Kafka for stream processing. We'll use the binary distribution for simplicity.
cd /opt
sudo wget https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz
sudo tar -xzf kafka_2.13-2.8.2.tgz
sudo mv kafka_2.13-2.8.2 kafka
sudo chown -R $USER:$USER /opt/kafka
echo 'export KAFKA_HOME=/opt/kafka' >> ~/.bashrc
echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> ~/.bashrc
source ~/.bashrc
Configure and start Kafka services
Start Zookeeper and Kafka broker with production-ready configurations.
cd /opt/kafka
Start Zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
Wait for Zookeeper to start
sleep 5
Start Kafka broker
bin/kafka-server-start.sh -daemon config/server.properties
Verify services are running
jps | grep -E '(QuorumPeerMain|Kafka)'
Create test topics
Create input and output topics for stream processing examples with multiple partitions for parallelism.
cd /opt/kafka
Create input topic for user events
bin/kafka-topics.sh --create --topic user-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
Create output topic for processed events
bin/kafka-topics.sh --create --topic processed-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
Create topic for aggregated results
bin/kafka-topics.sh --create --topic user-aggregates \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
List created topics
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Build stateless stream processing applications
Create Java Maven project structure
Set up a Maven project for Java-based Kafka Streams applications with proper dependencies.
mkdir -p ~/kafka-streams-java/src/main/java/com/example/streams
cd ~/kafka-streams-java
4.0.0
com.example
kafka-streams-java
1.0.0
jar
17
17
3.6.0
2.15.2
org.apache.kafka
kafka-streams
${kafka.version}
com.fasterxml.jackson.core
jackson-databind
${jackson.version}
org.slf4j
slf4j-simple
2.0.9
org.junit.jupiter
junit-jupiter-engine
5.10.0
test
org.apache.kafka
kafka-streams-test-utils
${kafka.version}
test
org.apache.maven.plugins
maven-compiler-plugin
3.11.0
17
17
org.codehaus.mojo
exec-maven-plugin
3.1.0
com.example.streams.UserEventProcessor
Implement Java stateless stream processor
Create a stream processor that filters and transforms user events with JSON serialization.
package com.example.streams;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class UserEvent {
@JsonProperty("userId")
public String userId;
@JsonProperty("eventType")
public String eventType;
@JsonProperty("timestamp")
public long timestamp;
@JsonProperty("value")
public double value;
@JsonProperty("metadata")
public String metadata;
private static final ObjectMapper objectMapper = new ObjectMapper();
public UserEvent() {}
public UserEvent(String userId, String eventType, long timestamp, double value, String metadata) {
this.userId = userId;
this.eventType = eventType;
this.timestamp = timestamp;
this.value = value;
this.metadata = metadata;
}
public String toJson() throws JsonProcessingException {
return objectMapper.writeValueAsString(this);
}
public static UserEvent fromJson(String json) throws JsonProcessingException {
return objectMapper.readValue(json, UserEvent.class);
}
public boolean isValidEvent() {
return userId != null && eventType != null && value >= 0;
}
public UserEvent enrich() {
return new UserEvent(
this.userId,
this.eventType.toUpperCase(),
this.timestamp,
this.value,
"processed_" + this.metadata
);
}
}
Create Java stream processing topology
Implement the main stream processing application with filtering, mapping, and branching logic.
package com.example.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class UserEventProcessor {
private static final Logger logger = LoggerFactory.getLogger(UserEventProcessor.class);
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-event-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic
KStream userEvents = builder.stream("user-events");
// Parse JSON and filter valid events
KStream validEvents = userEvents
.mapValues(value -> {
try {
return UserEvent.fromJson(value);
} catch (Exception e) {
logger.warn("Failed to parse event: {}", value);
return null;
}
})
.filter((key, event) -> event != null && event.isValidEvent())
.peek((key, event) -> logger.info("Processing event for user: {}", event.userId));
// Branch streams based on event type
@SuppressWarnings("unchecked")
KStream[] branches = validEvents.branch(
Named.as("high-value-"),
(Predicate) (key, event) -> event.value > 1000.0,
(Predicate) (key, event) -> event.eventType.equals("PURCHASE"),
(Predicate) (key, event) -> true // catch-all
);
// Process high-value events
branches[0]
.mapValues(event -> event.enrich())
.mapValues(event -> {
try {
return event.toJson();
} catch (Exception e) {
logger.error("Failed to serialize event", e);
return null;
}
})
.filter((key, value) -> value != null)
.to("processed-events");
// Process purchase events
branches[1]
.filter((key, event) -> event.value > 10.0)
.mapValues(event -> event.enrich())
.mapValues(event -> {
try {
return event.toJson();
} catch (Exception e) {
logger.error("Failed to serialize event", e);
return null;
}
})
.filter((key, value) -> value != null)
.to("processed-events");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
logger.info("Shutting down stream processor...");
streams.close();
latch.countDown();
}
});
try {
streams.start();
logger.info("Stream processor started");
latch.await();
} catch (Throwable e) {
logger.error("Stream processing failed", e);
System.exit(1);
}
System.exit(0);
}
}
Implement stateful stream processing
Create Scala SBT project
Set up a Scala project for advanced stream processing with aggregations and windowing.
mkdir -p ~/kafka-streams-scala/src/main/scala/com/example/streams
cd ~/kafka-streams-scala
ThisBuild / version := "1.0.0"
ThisBuild / scalaVersion := "2.13.11"
val kafkaVersion = "3.6.0"
val jacksonVersion = "2.15.2"
lazy val root = (project in file("."))
.settings(
name := "kafka-streams-scala",
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion,
"org.apache.kafka" % "kafka-streams" % kafkaVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion,
"ch.qos.logback" % "logback-classic" % "1.4.11",
"org.scalatest" %% "scalatest" % "3.2.17" % Test,
"org.apache.kafka" % "kafka-streams-test-utils" % kafkaVersion % Test
),
assembly / mainClass := Some("com.example.streams.UserAggregateProcessor"),
assembly / assemblyJarName := "kafka-streams-scala-assembly.jar",
assembly / assemblyMergeStrategy := {
case "META-INF/services/org.apache.kafka.common.config.ConfigDef" => MergeStrategy.concat
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case _ => MergeStrategy.first
}
)
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.3")
Implement Scala stateful processor with windowing
Create an advanced stream processor that performs windowed aggregations and maintains state.
package com.example.streams
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.streams.kstream.{TimeWindows, Windowed}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.Properties
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
case class UserEvent(
userId: String,
eventType: String,
timestamp: Long,
value: Double,
metadata: String
)
case class UserAggregate(
userId: String,
eventCount: Long,
totalValue: Double,
avgValue: Double,
maxValue: Double,
minValue: Double,
lastEventTime: Long
)
object UserAggregateProcessor {
private val logger = LoggerFactory.getLogger(this.getClass)
private val objectMapper = new ObjectMapper()
objectMapper.registerModule(DefaultScalaModule)
implicit val consumed: Consumed[String, String] = Consumed.with(Serdes.stringSerde, Serdes.stringSerde)
implicit val produced: Produced[String, String] = Produced.with(Serdes.stringSerde, Serdes.stringSerde)
def parseUserEvent(json: String): Option[UserEvent] = {
Try {
objectMapper.readValue(json, classOf[UserEvent])
} match {
case Success(event) if event.userId != null && event.eventType != null => Some(event)
case Success(_) =>
logger.warn(s"Invalid event format: $json")
None
case Failure(e) =>
logger.warn(s"Failed to parse event: $json", e)
None
}
}
def serializeAggregate(aggregate: UserAggregate): String = {
objectMapper.writeValueAsString(aggregate)
}
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "user-aggregate-processor")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "3")
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "10240")
val builder = new StreamsBuilder()
val userEvents: KStream[String, String] = builder.streamString, String
// Parse and key by userId
val validEvents: KStream[String, UserEvent] = userEvents
.flatMapValues(parseUserEvent)
.selectKey((_, event) => event.userId)
// Create 5-minute tumbling windows for aggregation
val windowedAggregates: KTable[Windowed[String], UserAggregate] = validEvents
.groupByKey
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
.aggregate(
// Initializer
() => UserAggregate("", 0L, 0.0, 0.0, Double.MinValue, Double.MaxValue, 0L)
)(
// Aggregator
(key: String, event: UserEvent, aggregate: UserAggregate) => {
val newCount = aggregate.eventCount + 1
val newTotal = aggregate.totalValue + event.value
val newAvg = newTotal / newCount
val newMax = math.max(aggregate.maxValue, event.value)
val newMin = if (aggregate.minValue == Double.MaxValue) event.value else math.min(aggregate.minValue, event.value)
UserAggregate(
userId = event.userId,
eventCount = newCount,
totalValue = newTotal,
avgValue = newAvg,
maxValue = newMax,
minValue = newMin,
lastEventTime = math.max(aggregate.lastEventTime, event.timestamp)
)
}
)
// Output windowed aggregates
windowedAggregates
.toStream
.filter((_, aggregate) => aggregate.eventCount > 0)
.mapValues(serializeAggregate)
.peek((windowedKey, aggregate) => {
val window = windowedKey.window()
logger.info(s"Aggregate for user ${windowedKey.key()} in window [${window.start()}-${window.end()}]: $aggregate")
})
.selectKey((windowedKey, _) => windowedKey.key())
.to("user-aggregates")
// Session windows for detecting user activity sessions
val sessionAggregates = validEvents
.filter((_, event) => event.eventType == "CLICK" || event.eventType == "VIEW")
.groupByKey
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
.count()
.toStream
.filter((_, count) => count >= 3) // Sessions with at least 3 events
.mapValues(count => s"""{
| "sessionEventCount": $count,
| "sessionType": "ACTIVE"
|}""".stripMargin)
.selectKey((windowedKey, _) => windowedKey.key())
.to("user-sessions")
val streams = new KafkaStreams(builder.build(), props)
streams.setUncaughtExceptionHandler((thread, exception) => {
logger.error(s"Uncaught exception in stream thread $thread", exception)
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
})
Runtime.getRuntime.addShutdownHook(new Thread(() => {
logger.info("Shutting down stream processor...")
streams.close(Duration.ofSeconds(10))
}))
try {
streams.start()
logger.info("User aggregate processor started")
Thread.currentThread().join()
} catch {
case e: Exception =>
logger.error("Stream processing failed", e)
System.exit(1)
}
}
}
Create additional required topics
Create topics for session processing and configure retention policies for stateful operations.
cd /opt/kafka
Create topic for user sessions
bin/kafka-topics.sh --create --topic user-sessions \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1 \
--config retention.ms=86400000
Configure changelog topics for state stores
bin/kafka-topics.sh --create --topic user-aggregate-processor-KSTREAM-AGGREGATE-STATE-STORE-0000000005-changelog \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1 \
--config cleanup.policy=compact
Configure error handling and exactly-once semantics
Implement robust error handling
Create a Java processor with comprehensive error handling and dead letter topic support.
package com.example.streams;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
public class RobustEventProcessor {
private static final Logger logger = LoggerFactory.getLogger(RobustEventProcessor.class);
public static class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(ProducerRecord record, Exception exception) {
logger.error("Error producing record to topic {}: {}", record.topic(), exception.getMessage());
// Send to dead letter topic instead of failing
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(Map configs) {}
}
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "robust-event-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // Disable caching for testing
props.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class.getName());
// Transaction timeout should be less than max.poll.interval.ms
props.put(StreamsConfig.TRANSACTION_TIMEOUT_CONFIG, 10000);
props.put("max.poll.interval.ms", 300000);
StreamsBuilder builder = new StreamsBuilder();
KStream events = builder.stream("user-events");
// Branch for error handling
KStream[] branches = events.branch(
(key, value) -> {
try {
UserEvent.fromJson(value);
return true;
} catch (Exception e) {
return false;
}
},
(key, value) -> true // catch invalid JSON
);
// Process valid events
branches[0]
.mapValues(value -> {
try {
UserEvent event = UserEvent.fromJson(value);
return event.enrich().toJson();
} catch (Exception e) {
logger.error("Error processing valid event: {}", e.getMessage());
return null;
}
})
.filter((key, value) -> value != null)
.to("processed-events");
// Send invalid events to dead letter topic
branches[1]
.mapValues(value -> {
logger.warn("Invalid event sent to DLT: {}", value);
return "{\"error\":\"Invalid JSON format\",\"originalValue\":\"" +
value.replace("\"", "\\\"") + "\"}";
})
.to("dead-letter-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// Set exception handlers
streams.setUncaughtExceptionHandler(exception -> {
logger.error("Uncaught exception in stream processing", exception);
if (exception instanceof StreamsException) {
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
}
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
});
streams.setStateListener((newState, oldState) -> {
logger.info("State transition: {} -> {}", oldState, newState);
if (newState == KafkaStreams.State.ERROR) {
logger.error("Stream processing entered ERROR state");
}
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Shutting down gracefully...");
streams.close(Duration.ofSeconds(30));
}));
try {
streams.start();
logger.info("Robust event processor started with exactly-once semantics");
// Keep the main thread alive
Thread.currentThread().join();
} catch (Exception e) {
logger.error("Failed to start stream processor", e);
System.exit(1);
}
}
}
Create dead letter topic
Set up a dead letter topic for handling processing errors and invalid messages.
cd /opt/kafka
Create dead letter topic
bin/kafka-topics.sh --create --topic dead-letter-topic \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1 \
--config retention.ms=604800000
Implement stream joins and complex event processing
Create stream-table join processor
Implement complex event processing with stream-table joins for real-time enrichment.
package com.example.streams
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.streams.kstream.{JoinWindows, StreamJoined}
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.Properties
case class UserProfile(
userId: String,
name: String,
tier: String,
region: String,
registrationDate: Long
)
case class EnrichedEvent(
userId: String,
eventType: String,
value: Double,
timestamp: Long,
userName: String,
userTier: String,
userRegion: String,
enrichmentTime: Long
)
object StreamJoinProcessor {
private val logger = LoggerFactory.getLogger(this.getClass)
implicit val consumed: Consumed[String, String] = Consumed.with(Serdes.stringSerde, Serdes.stringSerde)
implicit val produced: Produced[String, String] = Produced.with(Serdes.stringSerde, Serdes.stringSerde)
def parseUserProfile(json: String): Option[UserProfile] = {
// Simplified parsing - in production use proper JSON library
try {
val objectMapper = new com.fasterxml.jackson.databind.ObjectMapper()
objectMapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
Some(objectMapper.readValue(json, classOf[UserProfile]))
} catch {
case e: Exception =>
logger.warn(s"Failed to parse user profile: $json", e)
None
}
}
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-join-processor")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2")
val builder = new StreamsBuilder()
// User events stream
val userEvents: KStream[String, UserEvent] = builder
.streamString, String
.flatMapValues(UserAggregateProcessor.parseUserEvent)
.selectKey((_, event) => event.userId)
// User profiles table (updated from user-profiles topic)
val userProfiles: KTable[String, UserProfile] = builder
.tableString, String
.mapValues(parseUserProfile)
.filter((_, profile) => profile.isDefined)
.mapValues(_.get)
// Stream-table join: enrich events with user profiles
val enrichedEvents: KStream[String, EnrichedEvent] = userEvents
.join(userProfiles)((event, profile) => {
EnrichedEvent(
userId = event.userId,
eventType = event.eventType,
value = event.value,
timestamp = event.timestamp,
userName = profile.name,
userTier = profile.tier,
userRegion = profile.region,
enrichmentTime = System.currentTimeMillis()
)
})
// Stream-stream join: detect related events within time window
val clickEvents = userEvents.filter((_, event) => event.eventType == "CLICK")
val purchaseEvents = userEvents.filter((_, event) => event.eventType == "PURCHASE")
val clickToPurchase: KStream[String, String] = clickEvents
.join(purchaseEvents)(
(clickEvent, purchaseEvent) => {
s"""{
| "userId": "${clickEvent.userId}",
| "clickTime": ${clickEvent.timestamp},
| "purchaseTime": ${purchaseEvent.timestamp},
| "conversionTime": ${purchaseEvent.timestamp - clickEvent.timestamp},
| "purchaseValue": ${purchaseEvent.value}
|}""".stripMargin
},
JoinWindows.of(Duration.ofMinutes(15)),
StreamJoined.with(Serdes.stringSerde, Serdes.stringSerde, Serdes.stringSerde)
)
// Output enriched events
enrichedEvents
.mapValues { event =>
val objectMapper = new com.fasterxml.jackson.databind.ObjectMapper()
objectMapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
objectMapper.writeValueAsString(event)
}
.to("enriched-events")
// Output conversion events
clickToPurchase
.peek((userId, conversion) => logger.info(s"Conversion detected for user $userId: $conversion"))
.to("conversion-events")
val streams = new KafkaStreams(builder.build(), props)
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(Duration.ofSeconds(15))
}))
try {
streams.start()
logger.info("Stream join processor started")
Thread.currentThread().join()
} catch {
case e: Exception =>
logger.error("Stream join processing failed", e)
System.exit(1)
}
}
}
Create additional topics for joins
Set up topics required for stream joins and user profile management.
cd /opt/kafka
Create user profiles topic (compacted for latest values)
bin/kafka-topics.sh --create --topic user-profiles \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1 \
--config cleanup.policy=compact
Create enriched events topic
bin/kafka-topics.sh --create --topic enriched-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
Create conversion events topic
bin/kafka-topics.sh --create --topic conversion-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
Set up testing framework
Create Java unit tests
Implement comprehensive unit tests using TopologyTestDriver for isolated testing.
package com.example.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.*;
import static org.junit.jupiter.api.Assertions.*;
import java.time.Instant;
import java.util.Properties;
class UserEventProcessorTest {
private TopologyTestDriver testDriver;
private TestInputTopic inputTopic;
private TestOutputTopic outputTopic;
@BeforeEach
void setUp() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Simple test topology
builder.stream("user-events")
.filter((key, value) -> {
try {
UserEvent event = UserEvent.fromJson(value);
return event.isValidEvent();
} catch (Exception e) {
return false;
}
})
.mapValues(value -> {
try {
UserEvent event = UserEvent.fromJson(value);
return event.enrich().toJson();
} catch (Exception e) {
return null;
}
})
.filter((key, value) -> value != null)
.to("processed-events");
testDriver = new TopologyTestDriver(builder.build(), props);
inputTopic = testDriver.createInputTopic("user-events",
Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = testDriver.createOutputTopic("processed-events",
Serdes.String().deserializer(), Serdes.String().deserializer());
}
@AfterEach
void tearDown() {
testDriver.close();
}
@Test
void shouldProcessValidEvent() throws Exception {
UserEvent event = new UserEvent("user123", "PURCHASE", System.currentTimeMillis(), 99.99, "test");
inputTopic.pipeInput("user123", event.toJson());
TestRecord output = outputTopic.readRecord();
assertNotNull(output);
assertEquals("user123", output.key());
UserEvent processedEvent = UserEvent.fromJson(output.value());
assertEquals("PURCHASE", processedEvent.eventType);
assertEquals("processed_test", processedEvent.metadata);
}
@Test
void shouldFilterInvalidEvent() {
inputTopic.pipeInput("user123", "invalid json");
assertTrue(outputTopic.isEmpty());
}
@Test
void shouldFilterNegativeValues() throws Exception {
UserEvent event = new UserEvent("user123", "PURCHASE", System.currentTimeMillis(), -10.0, "test");
inputTopic.pipeInput("user123", event.toJson());
assertTrue(outputTopic.isEmpty());
}
@Test
void shouldProcessMultipleEvents() throws Exception {
UserEvent event1 = new UserEvent("user1", "CLICK", System.currentTimeMillis(), 0.0, "click1");
UserEvent event2 = new UserEvent("user2", "PURCHASE", System.currentTimeMillis(), 50.0, "purchase1");
inputTopic.pipeInput("user1", event1.toJson());
inputTopic.pipeInput("user2", event2.toJson());
assertEquals(2, outputTopic.getQueueSize());
TestRecord output1 = outputTopic.readRecord();
TestRecord output2 = outputTopic.readRecord();
assertNotNull(output1);
assertNotNull(output2);
assertTrue(outputTopic.isEmpty());
}
}
Build and run tests
Compile and execute the test suite to verify stream processing logic.
cd ~/kafka-streams-java
Compile the project
mvn clean compile
Run tests
mvn test
Package the application
mvn package
Deploy and monitor in production
Create production configuration
Set up production-ready configuration with monitoring and performance tuning.
# Kafka Streams Production Configuration
application.id=user-event-processor-prod
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
Processing guarantees
processing.guarantee=exactly_once_v2
commit.interval.ms=1000
cache.max.bytes.buffering=10485760
Thread configuration
num.stream.threads=4
num.standby.replicas=1
State store configuration
state.dir=/opt/kafka-streams/state
replication.factor=3
Producer configuration
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
enable.idempotence=true
batch.size=16384
linger.ms=5
Consumer configuration
auto.offset.reset=earliest
max.poll.records=1000
fetch.min.bytes=1024
fetch.max.wait.ms=500
Monitoring
metrics.recording.level=INFO
metric.reporters=org.apache.kafka.common.metrics.JmxReporter
Security (if using SSL)
security.protocol=SSL
ssl.truststore.location=/opt/kafka/config/kafka.client.truststore.jks
ssl.truststore.password=changeme
ssl.keystore.location=/opt/kafka/config/kafka.client.keystore.jks
ssl.keystore.password=changeme
ssl.key.password=changeme
Create systemd service
Deploy Kafka Streams application as a managed service with automatic restart and logging.
[Unit]
Description=Kafka Streams User Event Processor
After=kafka.service
Requires=kafka.service
[Service]
Type=simple
User=kafka
Group=kafka
WorkingDirectory=/opt/kafka-streams
ExecStart=/usr/bin/java -Xmx2g -Xms1g \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=20 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:+ExplicitGCInvokesConcurrent \
-Djava.awt.headless=true \
-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false \
-Dcom.sun.management.jmxremote.local.only=false \
-Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.rmi.port=9999 \
-Djava.rmi.server.hostname=localhost \
-jar kafka-streams-java-1.0.0.jar
ExecReload=/bin/kill -HUP $MAINPID
KillMode=process
Restart=on-failure
RestartSec=42s
TimeoutSec=300
StandardOutput=journal
StandardError=journal
SyslogIdentifier=kafka-streams
[Install]
WantedBy=multi-user.target
Create monitoring setup
Configure JMX monitoring and create deployment directories with proper permissions.
# Create application user and directories
sudo useradd -r -s /bin/false kafka
sudo mkdir -p /opt/kafka-streams/state
sudo mkdir -p /opt/kafka-streams/logs
sudo chown -R kafka:kafka /opt/kafka-streams
sudo chmod 755 /opt/kafka-streams
sudo chmod 750 /opt/kafka-streams/state
Copy application JAR
sudo cp ~/kafka-streams-java/target/kafka-streams-java-1.0.0.jar /opt/kafka-streams/
sudo chown kafka:kafka /opt/kafka-streams/kafka-streams-java-1.0.0.jar
sudo chmod 644 /opt/kafka-streams/kafka-streams-java-1.0.0.jar
Enable and start service
sudo systemctl daemon-reload
sudo systemctl enable kafka-streams.service
sudo systemctl start kafka-streams.service
Check service status
sudo systemctl status kafka-streams.service
sudo journalctl -u kafka-streams.service -f
Verify your setup
Test the complete stream processing pipeline with sample data and monitoring.
# Check if all services are running
sudo systemctl status kafka-streams.service
jps | grep -E '(QuorumPeerMain|Kafka)'
Send test events to input topic
cd /opt/kafka
echo '{"userId":"user123","eventType":"PURCHASE","timestamp":1703123456789,"value":99.99,"metadata":"test-purchase"}' | \
bin/kafka-console-producer.sh --topic user-events --bootstrap-server localhost:9092
echo '{"userId":"user456","eventType":"CLICK","timestamp":1703123456790,"value":0.0,"metadata":"test-click"}' | \
bin/kafka-console-producer.sh --topic user-events --bootstrap-server localhost:9092
Verify processed events
bin/kafka-console-consumer.sh --topic processed-events \
--bootstrap-server localhost:9092 \
--from-beginning \
--max-messages 5
Check JMX metrics
echo "Checking stream processing metrics..."
curl -s http://localhost:9999/mbean?objectname=kafka.streams:type=stream-metrics,client-id=user-event-processor
Verify exactly-once processing
bin/kafka-run-class.sh kafka.tools.StreamsResetter \
--application-id user-event-processor \
--bootstrap-servers localhost:9092 \
--dry-run
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Stream processor won't start | Kafka broker not available | Check Kafka service: sudo systemctl status kafka |
| Exactly-once semantics failing | Transaction timeout too high | Set transaction.timeout.config=10000 in StreamsConfig |
| High memory usage | Large state stores or cache | Tune cache.max.bytes.buffering and add more stream threads |
| Processing lag increasing | Insufficient parallelism | Increase topic partitions and num.stream.threads |
| Rebalancing frequently | Long processing time | Increase max.poll.interval.ms and optimize processing logic |
| State store corruption | Unclean shutdown | Delete state directory and restart: rm -rf /opt/kafka-streams/state/* |
| JSON parsing errors | Malformed input data | Add validation and use dead letter topic for bad records |
| Join operations missing records | Clock skew or wrong windowing | Synchronize system clocks and adjust join window size |
Next steps
- Set up Kafka Schema Registry with Avro serialization for better data governance
- Monitor Kafka Streams with Prometheus and Grafana for production observability
- Implement Kafka Connect for database integration with CDC patterns
- Set up Kafka Streams interactive queries for real-time state access
- Configure Kafka Streams security with SSL and ACLs for production deployment
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Global variables
KAFKA_VERSION="2.8.2"
SCALA_VERSION="2.13"
KAFKA_HOME="/opt/kafka"
INSTALL_USER="${SUDO_USER:-$(whoami)}"
INSTALL_HOME=$(getent passwd "$INSTALL_USER" | cut -d: -f6)
# Usage message
usage() {
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " -h, --help Show this help message"
echo " -v, --version Kafka version to install (default: $KAFKA_VERSION)"
exit 1
}
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
-h|--help)
usage
;;
-v|--version)
KAFKA_VERSION="$2"
shift 2
;;
*)
echo -e "${RED}Error: Unknown option $1${NC}"
usage
;;
esac
done
# Cleanup function
cleanup() {
echo -e "${RED}Installation failed. Cleaning up...${NC}"
# Stop services if they were started
pkill -f "kafka.Kafka" || true
pkill -f "QuorumPeerMain" || true
# Remove partial installations
rm -rf /opt/kafka* || true
exit 1
}
trap cleanup ERR
# Check prerequisites
check_prerequisites() {
echo -e "${BLUE}[1/8] Checking prerequisites...${NC}"
if [[ $EUID -ne 0 ]]; then
echo -e "${RED}Error: This script must be run as root or with sudo${NC}"
exit 1
fi
# Detect distribution
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update"
PKG_INSTALL="apt install -y"
JAVA_PKG="openjdk-17-jdk"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
JAVA_PKG="java-17-openjdk-devel"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
JAVA_PKG="java-17-openjdk-devel"
;;
*)
echo -e "${RED}Error: Unsupported distribution: $ID${NC}"
exit 1
;;
esac
else
echo -e "${RED}Error: Cannot detect Linux distribution${NC}"
exit 1
fi
echo -e "${GREEN}Prerequisites checked. Distribution: $ID${NC}"
}
# Install Java development environment
install_java() {
echo -e "${BLUE}[2/8] Installing Java development environment...${NC}"
$PKG_UPDATE
$PKG_INSTALL $JAVA_PKG maven gradle wget curl gzip tar
# Verify Java installation
java -version
mvn -version
echo -e "${GREEN}Java development environment installed successfully${NC}"
}
# Install Scala development environment
install_scala() {
echo -e "${BLUE}[3/8] Installing Scala development environment...${NC}"
# Install Coursier launcher
curl -fL https://github.com/coursier/launchers/raw/master/cs-x86_64-pc-linux.gz | gzip -d > cs
chmod 755 cs
mv cs /usr/local/bin/
# Setup Scala as the install user
sudo -u "$INSTALL_USER" bash -c "
export HOME='$INSTALL_HOME'
/usr/local/bin/cs setup --yes
echo 'export PATH=\"\$PATH:\$HOME/.local/share/coursier/bin\"' >> '$INSTALL_HOME/.bashrc'
"
# Verify Scala installation
sudo -u "$INSTALL_USER" bash -c "
export HOME='$INSTALL_HOME'
export PATH='\$PATH:$INSTALL_HOME/.local/share/coursier/bin'
scala -version || echo 'Scala will be available after sourcing .bashrc'
"
echo -e "${GREEN}Scala development environment installed successfully${NC}"
}
# Install Kafka cluster
install_kafka() {
echo -e "${BLUE}[4/8] Installing Kafka cluster...${NC}"
cd /opt
wget "https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
tar -xzf "kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
mv "kafka_${SCALA_VERSION}-${KAFKA_VERSION}" kafka
rm "kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
# Set proper ownership
chown -R "$INSTALL_USER:$INSTALL_USER" "$KAFKA_HOME"
chmod -R 755 "$KAFKA_HOME"
# Add Kafka to PATH for the install user
sudo -u "$INSTALL_USER" bash -c "
echo 'export KAFKA_HOME=$KAFKA_HOME' >> '$INSTALL_HOME/.bashrc'
echo 'export PATH=\$PATH:\$KAFKA_HOME/bin' >> '$INSTALL_HOME/.bashrc'
"
echo -e "${GREEN}Kafka installed successfully${NC}"
}
# Configure and start Kafka services
start_kafka_services() {
echo -e "${BLUE}[5/8] Starting Kafka services...${NC}"
cd "$KAFKA_HOME"
# Start Zookeeper as the install user
sudo -u "$INSTALL_USER" bash -c "
cd '$KAFKA_HOME'
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
"
# Wait for Zookeeper to start
sleep 10
# Start Kafka broker as the install user
sudo -u "$INSTALL_USER" bash -c "
cd '$KAFKA_HOME'
bin/kafka-server-start.sh -daemon config/server.properties
"
# Wait for Kafka to start
sleep 15
# Verify services are running
if pgrep -f "QuorumPeerMain" > /dev/null && pgrep -f "kafka.Kafka" > /dev/null; then
echo -e "${GREEN}Kafka services started successfully${NC}"
else
echo -e "${RED}Error: Kafka services failed to start${NC}"
exit 1
fi
}
# Create test topics
create_test_topics() {
echo -e "${BLUE}[6/8] Creating test topics...${NC}"
sudo -u "$INSTALL_USER" bash -c "
cd '$KAFKA_HOME'
# Create input topic for user events
bin/kafka-topics.sh --create --topic user-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# Create output topic for processed events
bin/kafka-topics.sh --create --topic processed-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# Create topic for aggregated results
bin/kafka-topics.sh --create --topic user-aggregates \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
"
echo -e "${GREEN}Test topics created successfully${NC}"
}
# Create Java project structure
create_java_project() {
echo -e "${BLUE}[7/8] Creating Java project structure...${NC}"
sudo -u "$INSTALL_USER" bash -c "
mkdir -p '$INSTALL_HOME/kafka-streams-java/src/main/java/com/example/streams'
cd '$INSTALL_HOME/kafka-streams-java'
cat > pom.xml << 'EOF'
<?xml version=\"1.0\" encoding=\"UTF-8\"?>
<project xmlns=\"http://maven.apache.org/POM/4.0.0\"
xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"
xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd\">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-streams-java</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<kafka.version>3.6.0</kafka.version>
<jackson.version>2.15.2</jackson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>\${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>\${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
</project>
EOF
"
echo -e "${GREEN}Java project structure created successfully${NC}"
}
# Final verification
verify_installation() {
echo -e "${BLUE}[8/8] Verifying installation...${NC}"
# Check Java
if ! java -version &>/dev/null; then
echo -e "${RED}Error: Java verification failed${NC}"
exit 1
fi
# Check Kafka services
if ! pgrep -f "QuorumPeerMain" > /dev/null || ! pgrep -f "kafka.Kafka" > /dev/null; then
echo -e "${RED}Error: Kafka services are not running${NC}"
exit 1
fi
# List topics to verify Kafka is working
sudo -u "$INSTALL_USER" bash -c "
cd '$KAFKA_HOME'
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
" > /dev/null
echo -e "${GREEN}Installation completed successfully!${NC}"
echo -e "${YELLOW}Next steps:${NC}"
echo "1. Source the bashrc file: source $INSTALL_HOME/.bashrc"
echo "2. Navigate to the Java project: cd $INSTALL_HOME/kafka-streams-java"
echo "3. Build the project: mvn compile"
echo "4. Kafka is running on localhost:9092"
echo "5. Topics created: user-events, processed-events, user-aggregates"
}
# Main execution
main() {
echo -e "${GREEN}Starting Kafka Streams development environment installation...${NC}"
check_prerequisites
install_java
install_scala
install_kafka
start_kafka_services
create_test_topics
create_java_project
verify_installation
echo -e "${GREEN}Kafka Streams development environment is ready!${NC}"
}
main "$@"
Review the script before running. Execute with: bash install.sh