Set up Kafka Streams applications with Java development environment to build real-time data processing pipelines for analytics and monitoring workloads.
Prerequisites
- Java 11 or later
- Apache Maven
- Running Kafka cluster
- Basic Java programming knowledge
What this solves
Kafka Streams enables you to build real-time stream processing applications that transform, aggregate, and analyze data as it flows through Apache Kafka topics. This tutorial shows you how to set up the complete Kafka Streams development environment and implement processing topologies for analytics and monitoring use cases.
Step-by-step installation
Update system packages
Start by updating your package manager to ensure you get the latest versions of all dependencies.
sudo apt update && sudo apt upgrade -y
Install Java Development Kit
Kafka Streams requires Java 11 or later. Install OpenJDK which provides the complete development environment.
sudo apt install -y openjdk-17-jdk maven wget curl
Verify Java installation
Check that Java and Maven are properly installed and accessible from the command line.
java -version
mvn -version
Download and install Apache Kafka
Download the latest Kafka binary distribution which includes the Kafka Streams library and all necessary dependencies.
cd /opt
sudo wget https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz
sudo tar -xzf kafka_2.13-2.8.2.tgz
sudo mv kafka_2.13-2.8.2 kafka
sudo chown -R $USER:$USER /opt/kafka
Configure environment variables
Set up environment variables to make Kafka commands available system-wide and configure Java heap settings for optimal performance.
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
source ~/.bashrc
Start ZooKeeper and Kafka broker
Start the ZooKeeper service first, then the Kafka broker. These provide the foundation for Kafka Streams applications.
cd $KAFKA_HOME
bin/zookeeper-server-start.sh config/zookeeper.properties &
sleep 10
bin/kafka-server-start.sh config/server.properties &
Create development topics
Create input and output topics for your Kafka Streams applications. These topics will hold the raw data and processed results.
bin/kafka-topics.sh --create --topic user-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
bin/kafka-topics.sh --create --topic processed-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
bin/kafka-topics.sh --create --topic analytics-results --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
Set up Kafka Streams development environment
Create Maven project structure
Set up a Maven project with the necessary directory structure for developing Kafka Streams applications.
mkdir -p ~/kafka-streams-project/src/main/java/com/example/streams
cd ~/kafka-streams-project
Configure Maven dependencies
Create a pom.xml file with Kafka Streams dependencies and build configuration for Java 17.
4.0.0
com.example
kafka-streams-analytics
1.0-SNAPSHOT
17
17
3.5.1
1.7.36
org.apache.kafka
kafka-streams
${kafka.version}
org.slf4j
slf4j-simple
${slf4j.version}
com.fasterxml.jackson.core
jackson-databind
2.15.2
org.apache.maven.plugins
maven-compiler-plugin
3.11.0
Install project dependencies
Download and install all Maven dependencies required for Kafka Streams development.
mvn clean compile
Implement real-time data processing topologies
Create event data model
Define a Java class to represent the events that will be processed by your Kafka Streams application.
package com.example.streams;
import com.fasterxml.jackson.annotation.JsonProperty;
public class UserEvent {
@JsonProperty("user_id")
private String userId;
@JsonProperty("event_type")
private String eventType;
@JsonProperty("timestamp")
private long timestamp;
@JsonProperty("value")
private double value;
public UserEvent() {}
public UserEvent(String userId, String eventType, long timestamp, double value) {
this.userId = userId;
this.eventType = eventType;
this.timestamp = timestamp;
this.value = value;
}
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getEventType() { return eventType; }
public void setEventType(String eventType) { this.eventType = eventType; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public double getValue() { return value; }
public void setValue(double value) { this.value = value; }
}
Create JSON serialization utilities
Implement serializers and deserializers for converting between Java objects and JSON for Kafka message processing.
package com.example.streams;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class JsonSerde implements Serde {
private final ObjectMapper objectMapper = new ObjectMapper();
private final Class type;
public JsonSerde(Class type) {
this.type = type;
}
@Override
public Serializer serializer() {
return new JsonSerializer<>();
}
@Override
public Deserializer deserializer() {
return new JsonDeserializer<>(type);
}
private class JsonSerializer implements Serializer {
@Override
public byte[] serialize(String topic, T data) {
try {
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new RuntimeException("Error serializing JSON", e);
}
}
}
private class JsonDeserializer implements Deserializer {
private final Class type;
public JsonDeserializer(Class type) {
this.type = type;
}
@Override
public T deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(data, type);
} catch (Exception e) {
throw new RuntimeException("Error deserializing JSON", e);
}
}
}
}
Implement real-time analytics processor
Create the main Kafka Streams application that processes events in real-time, performing filtering, aggregation, and windowing operations.
package com.example.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;
public class RealTimeAnalytics {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "real-time-analytics");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
JsonSerde userEventSerde = new JsonSerde<>(UserEvent.class);
// Read events from input topic
KStream events = builder.stream(
"user-events",
Consumed.with(Serdes.String(), userEventSerde)
);
// Filter high-value events
KStream highValueEvents = events
.filter((key, event) -> event.getValue() > 100.0);
// Send filtered events to processed topic
highValueEvents.to(
"processed-events",
Produced.with(Serdes.String(), userEventSerde)
);
// Aggregate events by user and event type in 5-minute windows
KTable, Double> aggregatedValues = events
.groupBy((key, event) -> event.getUserId() + ":" + event.getEventType())
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
() -> 0.0,
(key, event, aggregate) -> aggregate + event.getValue(),
Materialized.with(Serdes.String(), Serdes.Double())
);
// Convert windowed results to stream and send to analytics topic
aggregatedValues
.toStream()
.map((windowedKey, value) -> KeyValue.pair(
windowedKey.key(),
String.format("{\"window_start\": %d, \"window_end\": %d, \"total_value\": %.2f}",
windowedKey.window().start(),
windowedKey.window().end(),
value)
))
.to("analytics-results", Produced.with(Serdes.String(), Serdes.String()));
// Build and start the streams application
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// Add shutdown hook for graceful termination
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
System.out.println("Real-time analytics application started");
}
}
Configure stream processing for analytics and monitoring
Create monitoring topology
Implement a separate Kafka Streams application focused on monitoring metrics and alerting on anomalies.
package com.example.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;
public class MonitoringProcessor {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "monitoring-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
JsonSerde userEventSerde = new JsonSerde<>(UserEvent.class);
KStream events = builder.stream(
"user-events",
Consumed.with(Serdes.String(), userEventSerde)
);
// Count events per minute for rate monitoring
KTable, Long> eventCounts = events
.groupBy((key, event) -> event.getEventType())
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count();
// Detect anomalies (events exceeding threshold)
eventCounts
.toStream()
.filter((windowedKey, count) -> count > 1000)
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key(),
String.format("{\"alert\": \"high_event_rate\", \"event_type\": \"%s\", \"count\": %d, \"timestamp\": %d}",
windowedKey.key(),
count,
windowedKey.window().start())
))
.peek((key, value) -> System.out.println("ALERT: " + value))
.to("monitoring-alerts", Produced.with(Serdes.String(), Serdes.String()));
// Calculate average event values for trend analysis
KTable, Double> averageValues = events
.groupBy((key, event) -> "global")
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
() -> new AverageAccumulator(),
(key, event, accumulator) -> {
accumulator.addValue(event.getValue());
return accumulator;
},
Materialized.with(Serdes.String(), new JsonSerde<>(AverageAccumulator.class))
)
.mapValues(AverageAccumulator::getAverage);
KafkaStreams streams = new KafkaStreams(builder.build(), props);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.start();
System.out.println("Monitoring processor started");
}
public static class AverageAccumulator {
private double sum = 0.0;
private int count = 0;
public AverageAccumulator() {}
public void addValue(double value) {
sum += value;
count++;
}
public double getAverage() {
return count > 0 ? sum / count : 0.0;
}
// Getters and setters for JSON serialization
public double getSum() { return sum; }
public void setSum(double sum) { this.sum = sum; }
public int getCount() { return count; }
public void setCount(int count) { this.count = count; }
}
}
Create monitoring alerts topic
Create a dedicated topic for monitoring alerts that will be consumed by alerting systems.
cd $KAFKA_HOME
bin/kafka-topics.sh --create --topic monitoring-alerts --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Build and run the applications
Compile your Kafka Streams applications and run them to start processing real-time data.
cd ~/kafka-streams-project
mvn clean package
Run the real-time analytics application
java -cp target/classes:$(mvn dependency:build-classpath -Dmdep.outputFile=/dev/stdout -q) com.example.streams.RealTimeAnalytics &
Run the monitoring processor
java -cp target/classes:$(mvn dependency:build-classpath -Dmdep.outputFile=/dev/stdout -q) com.example.streams.MonitoringProcessor &
Configure application properties
Create production-ready configuration for your Kafka Streams applications with optimized settings for performance and reliability.
# Kafka Streams Configuration
application.id=kafka-streams-analytics
bootstrap.servers=localhost:9092
Performance tuning
num.stream.threads=4
processing.guarantee=exactly_once_v2
replication.factor=3
State store configuration
state.dir=/tmp/kafka-streams
commit.interval.ms=1000
Consumer configuration
auto.offset.reset=earliest
session.timeout.ms=30000
heartbeat.interval.ms=10000
Producer configuration
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
enable.idempotence=true
Verify your setup
Test your Kafka Streams applications by producing sample events and monitoring the output topics.
# Produce sample events
echo '{"user_id": "user1", "event_type": "click", "timestamp": 1702891200000, "value": 150.0}' | bin/kafka-console-producer.sh --topic user-events --bootstrap-server localhost:9092
echo '{"user_id": "user2", "event_type": "purchase", "timestamp": 1702891260000, "value": 250.0}' | bin/kafka-console-producer.sh --topic user-events --bootstrap-server localhost:9092
Monitor processed events
bin/kafka-console-consumer.sh --topic processed-events --bootstrap-server localhost:9092 --from-beginning
Monitor analytics results
bin/kafka-console-consumer.sh --topic analytics-results --bootstrap-server localhost:9092 --from-beginning
Check application status
jps | grep java
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Application fails to start | Kafka broker not running | Start ZooKeeper and Kafka: bin/zookeeper-server-start.sh config/zookeeper.properties |
| JSON deserialization errors | Message format mismatch | Check event structure matches UserEvent class fields |
| No output in result topics | Stream processing not triggered | Verify topics exist and produce test messages to input topic |
| High memory usage | Large state stores | Configure KAFKA_HEAP_OPTS="-Xmx2G" and tune window sizes |
| Rebalancing issues | Consumer group instability | Increase session.timeout.ms and heartbeat.interval.ms |
Next steps
- Configure Kafka Schema Registry with Avro serialization for better data evolution management
- Configure Grafana dashboards for TimescaleDB analytics to visualize your streaming data
- Set up Kubernetes monitoring for containerized Kafka Streams deployments
- Configure exactly-once processing semantics for mission-critical applications
- Monitor Kafka Streams state stores for performance optimization
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
readonly RED='\033[0;31m'
readonly GREEN='\033[0;32m'
readonly YELLOW='\033[1;33m'
readonly BLUE='\033[0;34m'
readonly NC='\033[0m' # No Color
# Configuration
readonly KAFKA_VERSION="2.8.2"
readonly SCALA_VERSION="2.13"
readonly KAFKA_USER="kafka"
readonly KAFKA_HOME="/opt/kafka"
readonly PROJECT_DIR="$HOME/kafka-streams-project"
# Cleanup function for error handling
cleanup() {
local exit_code=$?
if [ $exit_code -ne 0 ]; then
echo -e "${RED}[ERROR] Installation failed. Cleaning up...${NC}"
sudo systemctl stop kafka 2>/dev/null || true
sudo systemctl stop zookeeper 2>/dev/null || true
sudo rm -rf /opt/kafka* 2>/dev/null || true
fi
exit $exit_code
}
trap cleanup ERR
show_usage() {
echo "Usage: $0 [OPTIONS]"
echo "Options:"
echo " -h, --help Show this help message"
echo " --heap-size Set Kafka heap size (default: 1G)"
echo ""
echo "Example: $0 --heap-size 2G"
exit 1
}
log_step() {
echo -e "${BLUE}[$1] $2${NC}"
}
log_success() {
echo -e "${GREEN}[SUCCESS] $1${NC}"
}
log_warning() {
echo -e "${YELLOW}[WARNING] $1${NC}"
}
log_error() {
echo -e "${RED}[ERROR] $1${NC}"
exit 1
}
# Parse command line arguments
HEAP_SIZE="1G"
while [[ $# -gt 0 ]]; do
case $1 in
-h|--help)
show_usage
;;
--heap-size)
HEAP_SIZE="$2"
shift 2
;;
*)
echo "Unknown option: $1"
show_usage
;;
esac
done
# Check prerequisites
if [[ $EUID -eq 0 ]]; then
log_error "This script should not be run as root"
fi
if ! command -v sudo >/dev/null 2>&1; then
log_error "sudo is required but not installed"
fi
# Detect distribution and set package manager
log_step "1/12" "Detecting Linux distribution..."
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update && apt upgrade -y"
PKG_INSTALL="apt install -y"
JAVA_PKG="openjdk-17-jdk"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
JAVA_PKG="java-17-openjdk-devel"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
JAVA_PKG="java-17-openjdk-devel"
;;
*)
log_error "Unsupported distribution: $ID"
;;
esac
else
log_error "Cannot detect Linux distribution"
fi
log_success "Detected $PRETTY_NAME with $PKG_MGR package manager"
# Update system packages
log_step "2/12" "Updating system packages..."
sudo $PKG_UPDATE
# Install Java and dependencies
log_step "3/12" "Installing Java 17 and dependencies..."
sudo $PKG_INSTALL $JAVA_PKG maven wget curl tar
# Verify Java installation
log_step "4/12" "Verifying Java installation..."
if ! java -version 2>&1 | grep -q "17\|11"; then
log_error "Java 11+ installation failed or not found"
fi
if ! mvn -version >/dev/null 2>&1; then
log_error "Maven installation failed"
fi
log_success "Java and Maven installed successfully"
# Create kafka user
log_step "5/12" "Creating Kafka user..."
if ! id "$KAFKA_USER" >/dev/null 2>&1; then
sudo useradd -r -s /bin/false -d /opt/kafka $KAFKA_USER
fi
# Download and install Kafka
log_step "6/12" "Downloading and installing Apache Kafka..."
cd /tmp
wget -q "https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
sudo tar -xzf "kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz" -C /opt/
sudo mv "/opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION}" "$KAFKA_HOME"
sudo chown -R $KAFKA_USER:$KAFKA_USER "$KAFKA_HOME"
sudo chmod -R 755 "$KAFKA_HOME"
rm "kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
# Configure environment variables
log_step "7/12" "Configuring environment variables..."
cat >> ~/.bashrc << EOF
export KAFKA_HOME=$KAFKA_HOME
export PATH=\$PATH:\$KAFKA_HOME/bin
export KAFKA_HEAP_OPTS="-Xmx${HEAP_SIZE} -Xms${HEAP_SIZE}"
EOF
# Apply environment variables to current session
export KAFKA_HOME=$KAFKA_HOME
export PATH=$PATH:$KAFKA_HOME/bin
export KAFKA_HEAP_OPTS="-Xmx${HEAP_SIZE} -Xms${HEAP_SIZE}"
# Create systemd services
log_step "8/12" "Creating systemd services..."
# ZooKeeper service
sudo tee /etc/systemd/system/zookeeper.service > /dev/null << EOF
[Unit]
Description=Apache ZooKeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=forking
User=$KAFKA_USER
Group=$KAFKA_USER
Environment=KAFKA_HEAP_OPTS="$KAFKA_HEAP_OPTS"
ExecStart=$KAFKA_HOME/bin/zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
ExecStop=$KAFKA_HOME/bin/zookeeper-server-stop.sh
Restart=on-abnormal
TimeoutSec=40
KillMode=process
[Install]
WantedBy=multi-user.target
EOF
# Kafka service
sudo tee /etc/systemd/system/kafka.service > /dev/null << EOF
[Unit]
Description=Apache Kafka server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=forking
User=$KAFKA_USER
Group=$KAFKA_USER
Environment=KAFKA_HEAP_OPTS="$KAFKA_HEAP_OPTS"
ExecStart=$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
ExecStop=$KAFKA_HOME/bin/kafka-server-stop.sh
Restart=on-abnormal
TimeoutSec=60
KillMode=process
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl daemon-reload
# Start services
log_step "9/12" "Starting ZooKeeper and Kafka services..."
sudo systemctl enable zookeeper kafka
sudo systemctl start zookeeper
sleep 10
sudo systemctl start kafka
sleep 5
# Verify services are running
if ! sudo systemctl is-active --quiet zookeeper; then
log_error "ZooKeeper failed to start"
fi
if ! sudo systemctl is-active --quiet kafka; then
log_error "Kafka failed to start"
fi
# Create development topics
log_step "10/12" "Creating development topics..."
topics=("user-events" "processed-events" "analytics-results")
for topic in "${topics[@]}"; do
sudo -u $KAFKA_USER $KAFKA_HOME/bin/kafka-topics.sh \
--create --topic "$topic" \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1 \
--if-not-exists
done
# Set up Kafka Streams project
log_step "11/12" "Setting up Kafka Streams development environment..."
mkdir -p "$PROJECT_DIR/src/main/java/com/example/streams"
cd "$PROJECT_DIR"
# Create pom.xml
cat > pom.xml << 'EOF'
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-streams-analytics</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<kafka.version>3.5.1</kafka.version>
<slf4j.version>1.7.36</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
</plugin>
</plugins>
</build>
</project>
EOF
mvn clean compile
# Final verification
log_step "12/12" "Performing final verification..."
# Check services
if ! sudo systemctl is-active --quiet zookeeper kafka; then
log_error "Services are not running properly"
fi
# Check topics
topic_count=$(sudo -u $KAFKA_USER $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list | wc -l)
if [ "$topic_count" -lt 3 ]; then
log_error "Not all topics were created successfully"
fi
# Check project compilation
if [ ! -d "$PROJECT_DIR/target" ]; then
log_error "Maven project compilation failed"
fi
log_success "Kafka Streams installation and setup completed successfully!"
echo ""
echo "=== Installation Summary ==="
echo "- Kafka Home: $KAFKA_HOME"
echo "- Project Directory: $PROJECT_DIR"
echo "- Java Version: $(java -version 2>&1 | head -n1)"
echo "- Heap Size: $HEAP_SIZE"
echo ""
echo "Services:"
echo "- ZooKeeper: $(sudo systemctl is-active zookeeper)"
echo "- Kafka: $(sudo systemctl is-active kafka)"
echo ""
echo "Topics created: ${topics[*]}"
echo ""
echo "To use Kafka commands, run: source ~/.bashrc"
echo "Project ready for Kafka Streams development!"
Review the script before running. Execute with: bash install.sh