Build production-grade Kafka Streams applications for real-time data processing using Java and Scala. Configure stream processing topologies, implement windowing operations, and deploy scalable streaming applications with monitoring.
Prerequisites
- Running Apache Kafka cluster
- Java 11 or later installed
- Maven and SBT build tools
- Sufficient memory (4GB+ recommended)
- Understanding of stream processing concepts
What this solves
Kafka Streams enables you to build real-time stream processing applications that transform, aggregate, and analyze continuous data flows. This tutorial shows you how to implement production-ready Kafka Streams applications in both Java and Scala with proper topology configuration, windowing operations, and monitoring.
Step-by-step installation
Update system packages
Start by updating your package manager to ensure you get the latest versions of all dependencies.
sudo apt update && sudo apt upgrade -y
Install Java Development Kit
Kafka Streams requires Java 11 or later for optimal performance and compatibility.
sudo apt install -y openjdk-21-jdk openjdk-21-jre
export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
echo 'export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64' >> ~/.bashrc
Install Scala build tools
Install SBT (Scala Build Tool) for compiling and managing Scala-based Kafka Streams applications.
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key add
sudo apt update
sudo apt install -y sbt scala
Install Apache Maven
Maven handles Java project dependencies and builds for Kafka Streams applications.
sudo apt install -y maven
Create application directory structure
Set up organized directories for both Java and Scala Kafka Streams applications.
mkdir -p ~/kafka-streams-apps/{java-app,scala-app}
cd ~/kafka-streams-apps
Create Java Kafka Streams application
Initialize Maven project structure
Create a complete Maven project with the proper directory layout for Java development.
cd java-app
mvn archetype:generate -DgroupId=com.example.streams \
-DartifactId=kafka-streams-java \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false
cd kafka-streams-java
Configure Maven dependencies
Add Kafka Streams dependencies and configure Java version in the POM file.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.streams</groupId>
<artifactId>kafka-streams-java</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<kafka.version>3.6.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.0</version>
</dependency>
</dependencies>
</project>
Implement stream processing topology
Create a Java class that demonstrates stream transformations, aggregations, and windowing operations.
package com.example.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class StreamProcessor {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-processor-java");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 1024 1024L);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
StreamsBuilder builder = new StreamsBuilder();
// Read from input topic
KStream<String, String> source = builder.stream("input-events");
// Transform and filter data
KStream<String, String> processed = source
.filter((key, value) -> value != null && value.length() > 0)
.mapValues(value -> value.toUpperCase())
.selectKey((key, value) -> extractUserId(value));
// Create windowed aggregations
KTable<Windowed<String>, Long> windowedCounts = processed
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
.count();
// Output aggregated results
windowedCounts.toStream()
.map((windowedKey, count) -> {
String key = windowedKey.key() + "@" + windowedKey.window().start();
return KeyValue.pair(key, count.toString());
})
.to("aggregated-counts");
// Branch streams for different processing paths
KStream<String, String>[] branches = processed.branch(
(key, value) -> value.contains("ERROR"),
(key, value) -> value.contains("WARN"),
(key, value) -> true
);
branches[0].to("error-events");
branches[1].to("warning-events");
branches[2].to("info-events");
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
private static String extractUserId(String value) {
// Simple extraction - in production, use proper JSON parsing
int start = value.indexOf("userId:") + 7;
int end = value.indexOf(",", start);
if (end == -1) end = value.length();
return start > 6 ? value.substring(start, end).trim() : "unknown";
}
}
Create Scala Kafka Streams application
Initialize SBT project
Set up a Scala project with SBT build configuration and Kafka Streams dependencies.
cd ~/kafka-streams-apps/scala-app
mkdir -p src/main/scala/com/example/streams
mkdir project
Configure SBT build file
Define project dependencies and Scala version for the streams application.
name := "kafka-streams-scala"
version := "1.0"
scalaVersion := "2.13.12"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-streams" % "3.6.0",
"org.apache.kafka" %% "kafka-streams-scala" % "3.6.0",
"org.apache.kafka" % "kafka-clients" % "3.6.0",
"org.slf4j" % "slf4j-simple" % "2.0.9",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.16.0",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.0"
)
Configure SBT plugins
Add plugins for better Scala development experience.
addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.16")
Implement Scala stream processor
Create a Scala implementation with functional programming patterns and type safety.
package com.example.streams
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig, Topology}
import org.apache.kafka.streams.kstream.{TimeWindows, Windowed}
import java.time.Duration
import java.util.Properties
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
object StreamProcessor extends App {
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-processor-scala")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 1024 1024L: java.lang.Long)
p.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000: java.lang.Integer)
p
}
val builder = new StreamsBuilder()
// Define the topology
val sourceStream: KStream[String, String] = builder.streamString, String
// Data transformation pipeline
val processedStream: KStream[String, String] = sourceStream
.filter((_, value) => Option(value).exists(_.nonEmpty))
.mapValues(_.toUpperCase)
.selectKey((_, value) => extractUserId(value))
// Windowed aggregations with functional approach
val windowedCounts: KTable[Windowed[String], Long] = processedStream
.groupByKey
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)))
.count()
// Transform windowed results
windowedCounts.toStream
.map { case (windowedKey, count) =>
val key = s"${windowedKey.key()}@${windowedKey.window().start()}"
(key, count.toString)
}
.to("aggregated-counts")
// Pattern matching for stream branching
val branches = processedStream.branch(
(_, value) => value.contains("ERROR"),
(_, value) => value.contains("WARN"),
(_, _) => true
)
branches(0).to("error-events")
branches(1).to("warning-events")
branches(2).to("info-events")
// Create and run streams
val topology: Topology = builder.build()
val streams = new KafkaStreams(topology, props)
// Graceful shutdown
sys.addShutdownHook {
streams.close(Duration.ofSeconds(30))
}
Try {
streams.start()
// Keep the application running
Thread.currentThread().join()
} match {
case Success(_) => println("Stream processing completed successfully")
case Failure(exception) =>
println(s"Stream processing failed: ${exception.getMessage}")
sys.exit(1)
}
private def extractUserId(value: String): String = {
val pattern = "userId:(\\w+)".r
pattern.findFirstMatchIn(value)
.map(_.group(1))
.getOrElse("unknown")
}
}
Configure application properties
Create streams configuration file
Define reusable configuration properties for different environments.
# Kafka Streams Application Configuration
application.id=production-streams-app
bootstrap.servers=localhost:9092
Performance tuning
num.stream.threads=4
cache.max.bytes.buffering=16777216
commit.interval.ms=1000
default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
Security (uncomment for SSL)
security.protocol=SSL
ssl.truststore.location=/path/to/kafka.client.truststore.jks
ssl.truststore.password=truststore-password
ssl.keystore.location=/path/to/kafka.client.keystore.jks
ssl.keystore.password=keystore-password
Monitoring
metric.reporters=org.apache.kafka.common.metrics.JmxReporter
auto.include.jmx.reporter=true
Create logging configuration
Set up structured logging for monitoring and debugging stream processing applications.
# Root logger
log4j.rootLogger=INFO, stdout, file
Console appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
File appender
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/var/log/kafka-streams/streams.log
log4j.appender.file.MaxFileSize=100MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Kafka Streams specific logging
log4j.logger.org.apache.kafka.streams=INFO
log4j.logger.org.apache.kafka.clients=WARN
Build and deploy applications
Compile Java application
Build the Java Kafka Streams application with Maven.
cd ~/kafka-streams-apps/java-app/kafka-streams-java
mvn clean compile
mvn package
Compile Scala application
Build the Scala application using SBT with optimization flags.
cd ~/kafka-streams-apps/scala-app
sbt clean compile
sbt assembly
Create systemd service files
Set up systemd services for production deployment with proper resource management.
[Unit]
Description=Kafka Streams Java Application
After=network.target kafka.service
Requires=kafka.service
[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/usr/bin/java -Xms1g -Xmx2g -XX:+UseG1GC \
-Djava.awt.headless=true \
-Dlog4j.configuration=file:/opt/kafka-streams/log4j.properties \
-cp /opt/kafka-streams/java/target/kafka-streams-java-1.0-SNAPSHOT.jar \
com.example.streams.StreamProcessor
WorkingDirectory=/opt/kafka-streams
Restart=on-failure
RestartSec=30
TimeoutStartSec=60
TimeoutStopSec=30
Resource limits
LimitNOFILE=65536
LimitNPROC=32768
[Install]
WantedBy=multi-user.target
Create Scala service configuration
Configure systemd service for the Scala streams application.
[Unit]
Description=Kafka Streams Scala Application
After=network.target kafka.service
Requires=kafka.service
[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/usr/bin/java -Xms1g -Xmx2g -XX:+UseG1GC \
-Djava.awt.headless=true \
-Dlog4j.configuration=file:/opt/kafka-streams/log4j.properties \
-jar /opt/kafka-streams/scala/target/scala-2.13/kafka-streams-scala-assembly-1.0.jar
WorkingDirectory=/opt/kafka-streams
Restart=on-failure
RestartSec=30
TimeoutStartSec=60
TimeoutStopSec=30
Resource limits
LimitNOFILE=65536
LimitNPROC=32768
[Install]
WantedBy=multi-user.target
Deploy applications to production directory
Copy built applications and configuration files to the production directory structure.
sudo mkdir -p /opt/kafka-streams/{java,scala}
sudo mkdir -p /var/log/kafka-streams
sudo cp ~/kafka-streams-apps/java-app/kafka-streams-java/target/kafka-streams-java-1.0-SNAPSHOT.jar /opt/kafka-streams/java/
sudo cp ~/kafka-streams-apps/scala-app/target/scala-2.13/*.jar /opt/kafka-streams/scala/
sudo cp streams.properties /opt/kafka-streams/
sudo cp log4j.properties /opt/kafka-streams/
sudo chown -R kafka:kafka /opt/kafka-streams /var/log/kafka-streams
sudo chmod 755 /opt/kafka-streams
sudo chmod 644 /opt/kafka-streams/*.properties
Monitor and manage stream applications
Enable and start services
Start the Kafka Streams applications and enable them for automatic startup.
sudo systemctl daemon-reload
sudo systemctl enable kafka-streams-java kafka-streams-scala
sudo systemctl start kafka-streams-java
sudo systemctl start kafka-streams-scala
Configure JMX monitoring
Enable JMX metrics collection for monitoring stream processing performance.
#!/bin/bash
JMX Monitoring Script for Kafka Streams
echo "Kafka Streams Metrics Collection"
echo "================================"
Stream processing metrics
curl -s http://localhost:8080/jolokia/read/kafka.streams:type=stream-metrics,client-id=* | jq .
Consumer lag metrics
curl -s http://localhost:8080/jolokia/read/kafka.consumer:type=consumer-fetch-manager-metrics,client-id=* | jq .
Producer metrics
curl -s http://localhost:8080/jolokia/read/kafka.producer:type=producer-metrics,client-id=* | jq .
sudo chmod +x /opt/kafka-streams/jmx-monitoring.sh
Verify your setup
Test that your Kafka Streams applications are running correctly and processing data as expected.
# Check service status
sudo systemctl status kafka-streams-java
sudo systemctl status kafka-streams-scala
Check application logs
sudo tail -f /var/log/kafka-streams/streams.log
Test with sample data
echo 'userId:user123,event:login,timestamp:2024-01-15' | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic input-events
Verify output topics
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic aggregated-counts --from-beginning
Check stream application metrics
/opt/kafka-streams/jmx-monitoring.sh
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Application fails to start | Kafka not running or wrong bootstrap servers | Verify Kafka is running: sudo systemctl status kafka |
| High memory usage | Improper JVM heap sizing or caching configuration | Adjust -Xmx and cache.max.bytes.buffering values |
| Slow processing | Insufficient stream threads or poor parallelism | Increase num.stream.threads and topic partitions |
| Consumer lag increasing | Processing bottleneck or downstream system issues | Monitor processing time and scale horizontally |
| Compilation errors in Scala | Version mismatch between Scala and Kafka Streams | Ensure compatible versions in build.sbt |
| Windowing not working | Incorrect time semantics or missing timestamps | Check message timestamps and window configuration |
Next steps
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Script variables
SCRIPT_NAME="$(basename "$0")"
KAFKA_VERSION="3.6.0"
SBT_VERSION="1.9.7"
JAVA_VERSION="21"
APP_DIR="$HOME/kafka-streams-apps"
# Usage message
usage() {
echo "Usage: $SCRIPT_NAME [--app-dir DIR]"
echo " --app-dir DIR Directory to create applications (default: ~/kafka-streams-apps)"
exit 1
}
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
--app-dir)
APP_DIR="$2"
shift 2
;;
-h|--help)
usage
;;
*)
echo -e "${RED}Unknown option: $1${NC}"
usage
;;
esac
done
# Cleanup function for error handling
cleanup() {
echo -e "${RED}Installation failed. Cleaning up...${NC}"
if [[ -d "$APP_DIR" ]]; then
rm -rf "$APP_DIR"
fi
}
trap cleanup ERR
# Logging functions
log_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
log_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Check if running as root
check_root() {
if [[ $EUID -eq 0 ]]; then
log_error "This script should not be run as root for security reasons"
exit 1
fi
if ! sudo -n true 2>/dev/null; then
log_error "This script requires sudo privileges"
exit 1
fi
}
# Detect OS distribution
detect_distro() {
if [[ ! -f /etc/os-release ]]; then
log_error "Cannot detect OS distribution"
exit 1
fi
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update && apt upgrade -y"
PKG_INSTALL="apt install -y"
JAVA_HOME_PATH="/usr/lib/jvm/java-${JAVA_VERSION}-openjdk-amd64"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
JAVA_HOME_PATH="/usr/lib/jvm/java-${JAVA_VERSION}-openjdk"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
JAVA_HOME_PATH="/usr/lib/jvm/java-${JAVA_VERSION}-openjdk"
;;
*)
log_error "Unsupported distribution: $ID"
exit 1
;;
esac
log_success "Detected OS: $PRETTY_NAME"
}
# Update system packages
update_system() {
echo "[1/8] Updating system packages..."
sudo bash -c "$PKG_UPDATE"
log_success "System packages updated"
}
# Install Java Development Kit
install_java() {
echo "[2/8] Installing Java Development Kit..."
if [[ "$PKG_MGR" == "apt" ]]; then
sudo $PKG_INSTALL openjdk-${JAVA_VERSION}-jdk openjdk-${JAVA_VERSION}-jre
else
sudo $PKG_INSTALL java-${JAVA_VERSION}-openjdk java-${JAVA_VERSION}-openjdk-devel
fi
# Set JAVA_HOME
export JAVA_HOME="$JAVA_HOME_PATH"
if ! grep -q "export JAVA_HOME=$JAVA_HOME_PATH" ~/.bashrc; then
echo "export JAVA_HOME=$JAVA_HOME_PATH" >> ~/.bashrc
fi
log_success "Java $JAVA_VERSION installed and configured"
}
# Install Scala and SBT
install_scala() {
echo "[3/8] Installing Scala build tools..."
if [[ "$PKG_MGR" == "apt" ]]; then
# Add SBT repository
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list > /dev/null
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key add - 2>/dev/null
sudo apt update
sudo $PKG_INSTALL sbt scala
else
# Install SBT manually for RHEL-based systems
sudo $PKG_INSTALL curl
curl -L "https://github.com/sbt/sbt/releases/download/v${SBT_VERSION}/sbt-${SBT_VERSION}.tgz" | sudo tar -xz -C /opt/
sudo ln -sf /opt/sbt/bin/sbt /usr/local/bin/sbt
# Try to install scala from package manager
if ! sudo $PKG_INSTALL scala 2>/dev/null; then
log_warning "Scala not available in package manager, SBT will download it when needed"
fi
fi
log_success "Scala build tools installed"
}
# Install Apache Maven
install_maven() {
echo "[4/8] Installing Apache Maven..."
sudo $PKG_INSTALL maven
log_success "Maven installed"
}
# Create application directory structure
create_directories() {
echo "[5/8] Creating application directory structure..."
mkdir -p "$APP_DIR"/{java-app,scala-app}
log_success "Directory structure created at $APP_DIR"
}
# Create Java Kafka Streams application
create_java_app() {
echo "[6/8] Creating Java Kafka Streams application..."
cd "$APP_DIR/java-app"
mvn archetype:generate \
-DgroupId=com.example.streams \
-DartifactId=kafka-streams-java \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false \
-q
cd kafka-streams-java
# Create POM file
cat > pom.xml << 'EOF'
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.streams</groupId>
<artifactId>kafka-streams-java</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<kafka.version>3.6.0</kafka.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
</project>
EOF
log_success "Java application structure created"
}
# Create Scala Kafka Streams application
create_scala_app() {
echo "[7/8] Creating Scala Kafka Streams application..."
cd "$APP_DIR/scala-app"
mkdir -p src/main/scala/com/example/streams
# Create build.sbt
cat > build.sbt << EOF
name := "kafka-streams-scala"
version := "1.0"
scalaVersion := "2.13.12"
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka-streams-scala" % "$KAFKA_VERSION",
"org.apache.kafka" % "kafka-clients" % "$KAFKA_VERSION",
"org.slf4j" % "slf4j-simple" % "2.0.9"
)
EOF
# Create project/build.properties
mkdir -p project
echo "sbt.version=$SBT_VERSION" > project/build.properties
log_success "Scala application structure created"
}
# Verify installation
verify_installation() {
echo "[8/8] Verifying installation..."
# Check Java
if java -version 2>&1 | grep -q "openjdk version \"$JAVA_VERSION"; then
log_success "Java $JAVA_VERSION verified"
else
log_error "Java verification failed"
exit 1
fi
# Check Maven
if mvn -version >/dev/null 2>&1; then
log_success "Maven verified"
else
log_error "Maven verification failed"
exit 1
fi
# Check SBT
if sbt --version >/dev/null 2>&1; then
log_success "SBT verified"
else
log_warning "SBT verification failed, but may work when first used"
fi
# Check directory structure
if [[ -d "$APP_DIR/java-app/kafka-streams-java" && -d "$APP_DIR/scala-app" ]]; then
log_success "Application directories verified"
else
log_error "Directory structure verification failed"
exit 1
fi
}
# Main execution
main() {
log_info "Starting Kafka Streams development environment setup..."
check_root
detect_distro
update_system
install_java
install_scala
install_maven
create_directories
create_java_app
create_scala_app
verify_installation
echo
log_success "Kafka Streams development environment setup completed!"
echo -e "${GREEN}Application directory: $APP_DIR${NC}"
echo -e "${GREEN}Java app: $APP_DIR/java-app/kafka-streams-java${NC}"
echo -e "${GREEN}Scala app: $APP_DIR/scala-app${NC}"
echo
echo -e "${YELLOW}Note: Run 'source ~/.bashrc' to apply JAVA_HOME changes${NC}"
}
main "$@"
Review the script before running. Execute with: bash install.sh