Configure Kafka cluster and Java applications for exactly-once processing semantics with transaction state management, idempotent producers, and EOS isolation levels for reliable stream processing.
Prerequisites
- Running Kafka cluster version 2.8+
- Java 11 or higher
- Maven build tool
- Sufficient disk space for transaction logs
What this solves
Kafka Streams exactly-once semantics (EOS) ensures that each record is processed exactly once, even during failures or restarts. This prevents duplicate processing and maintains data consistency in stream processing applications, crucial for financial transactions, inventory management, and other mission-critical workloads where duplicate processing could cause incorrect results or data corruption.
Prerequisites
You need a running Kafka cluster with at least version 2.8.0, Java 11 or higher, and Maven for building the application. Your Kafka cluster should have sufficient brokers for replication and transaction log management.
Install Java and Maven
Install the required development tools for building Kafka Streams applications.
sudo apt update
sudo apt install -y openjdk-11-jdk maven
Configure Kafka cluster for exactly-once support
Enable transaction support in Kafka brokers
Configure your Kafka brokers to support transactions and exactly-once semantics by updating the server configuration.
# Enable transaction coordinator
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.num.partitions=50
Enable idempotent producer support
enable.idempotence=true
Configure transaction timeout
transaction.max.timeout.ms=900000
transaction.timeout.ms=60000
Configure log segment settings for transaction state
log.segment.bytes=1073741824
log.retention.hours=168
Restart Kafka brokers
Restart your Kafka brokers to apply the transaction configuration changes.
sudo systemctl restart kafka
sudo systemctl status kafka
Create topics with proper replication
Create input and output topics with sufficient replication for exactly-once processing.
/opt/kafka/bin/kafka-topics.sh --create \
--topic input-topic \
--bootstrap-server localhost:9092 \
--partitions 6 \
--replication-factor 3
/opt/kafka/bin/kafka-topics.sh --create \
--topic output-topic \
--bootstrap-server localhost:9092 \
--partitions 6 \
--replication-factor 3
Implement Kafka Streams application with EOS
Create Maven project structure
Set up a new Maven project for the Kafka Streams application with exactly-once semantics.
mkdir -p kafka-streams-eos/src/main/java/com/example
cd kafka-streams-eos
Configure Maven dependencies
Create the pom.xml file with Kafka Streams dependencies and Java 11 configuration.
4.0.0
com.example
kafka-streams-eos
1.0.0
11
11
3.6.0
org.apache.kafka
kafka-streams
${kafka.version}
org.slf4j
slf4j-simple
2.0.9
org.apache.maven.plugins
maven-compiler-plugin
3.11.0
Implement exactly-once Kafka Streams application
Create the main application class with exactly-once processing semantics and transaction configuration.
package com.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class ExactlyOnceStreamsApp {
public static void main(String[] args) {
Properties props = new Properties();
// Basic Kafka Streams configuration
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Enable exactly-once semantics
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Transaction and idempotent producer settings
props.put(StreamsConfig.PRODUCER_PREFIX + "enable.idempotence", "true");
props.put(StreamsConfig.PRODUCER_PREFIX + "acks", "all");
props.put(StreamsConfig.PRODUCER_PREFIX + "retries", "10");
props.put(StreamsConfig.PRODUCER_PREFIX + "max.in.flight.requests.per.connection", "1");
// Consumer configuration for exactly-once
props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// State store and changelog configuration
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams-eos");
// Transaction timeout configuration
props.put(StreamsConfig.TRANSACTION_TIMEOUT_CONFIG, 30000);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
StreamsBuilder builder = new StreamsBuilder();
// Define the processing topology
KStream inputStream = builder.stream("input-topic");
// Transform and process messages with exactly-once semantics
KStream processedStream = inputStream
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> "PROCESSED-" + value.toUpperCase() + "-" + System.currentTimeMillis())
.peek((key, value) -> System.out.println("Processing: " + key + " -> " + value));
// Output to result topic with exactly-once guarantee
processedStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
System.out.println("Kafka Streams application started with exactly-once semantics");
latch.await();
} catch (Throwable e) {
System.err.println("Error starting streams: " + e.getMessage());
System.exit(1);
}
System.exit(0);
}
}
Build the application
Compile and package the Kafka Streams application using Maven.
mvn clean compile package
Configure transaction state and isolation levels
Create advanced configuration class
Implement a configuration utility for managing exactly-once settings and transaction parameters.
package com.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
public class ExactlyOnceConfig {
public static Properties getStreamsConfig(String applicationId, String bootstrapServers) {
Properties props = new Properties();
// Application identity
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.CLIENT_ID_CONFIG, applicationId + "-client");
// Serialization
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Exactly-once semantics configuration
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// Producer configuration for exactly-once
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG, "all");
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
// Consumer configuration for read_committed isolation
props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Transaction and commit configuration
props.put(StreamsConfig.TRANSACTION_TIMEOUT_CONFIG, 30000);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// State store configuration
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams/" + applicationId);
// Performance tuning
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 1000);
return props;
}
public static void validateConfiguration(Properties props) {
String processingGuarantee = props.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
if (!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {
throw new IllegalArgumentException("Processing guarantee must be exactly_once_v2");
}
String isolationLevel = props.getProperty(
StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.ISOLATION_LEVEL_CONFIG
);
if (!"read_committed".equals(isolationLevel)) {
throw new IllegalArgumentException("Isolation level must be read_committed for EOS");
}
System.out.println("Exactly-once configuration validated successfully");
}
}
Create stateful processing example
Implement a more complex example with state stores to demonstrate exactly-once semantics with stateful operations.
package com.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class StatefulExactlyOnceApp {
private static final String STORE_NAME = "word-count-store";
public static void main(String[] args) {
Properties props = ExactlyOnceConfig.getStreamsConfig(
"stateful-exactly-once-app",
"localhost:9092"
);
ExactlyOnceConfig.validateConfiguration(props);
StreamsBuilder builder = new StreamsBuilder();
// Add state store to the topology
StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_NAME),
Serdes.String(),
Serdes.Long()
).withCachingEnabled();
builder.addStateStore(storeBuilder);
// Create input stream
KStream inputStream = builder.stream("input-topic");
// Process with stateful transformations
KStream wordCountStream = inputStream
.flatMapValues(value -> {
if (value == null) return java.util.Collections.emptyList();
return java.util.Arrays.asList(value.toLowerCase().split("\\W+"));
})
.transform(() -> new WordCountTransformer(), STORE_NAME);
// Output results
wordCountStream.to("word-count-output", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
latch.countDown();
}));
try {
streams.start();
System.out.println("Stateful exactly-once application started");
latch.await();
} catch (Exception e) {
System.err.println("Error: " + e.getMessage());
e.printStackTrace();
System.exit(1);
}
System.exit(0);
}
private static class WordCountTransformer implements Transformer> {
private KeyValueStore store;
@Override
public void init(org.apache.kafka.streams.processor.ProcessorContext context) {
this.store = context.getStateStore(STORE_NAME);
}
@Override
public KeyValue transform(String key, String word) {
if (word == null || word.isEmpty()) {
return null;
}
Long currentCount = store.get(word);
long newCount = (currentCount == null) ? 1L : currentCount + 1L;
// This update is exactly-once due to transaction semantics
store.put(word, newCount);
return KeyValue.pair(word, word + ":" + newCount);
}
@Override
public void close() {
// Cleanup if needed
}
}
}
Create additional output topic
Create the topic for the stateful processing example output.
/opt/kafka/bin/kafka-topics.sh --create \
--topic word-count-output \
--bootstrap-server localhost:9092 \
--partitions 6 \
--replication-factor 3
Monitor and troubleshoot exactly-once processing
Create monitoring utility
Implement monitoring capabilities to track exactly-once processing metrics and transaction state.
package com.example;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.StateStore;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ExactlyOnceMonitor {
private final KafkaStreams streams;
private final ScheduledExecutorService scheduler;
public ExactlyOnceMonitor(KafkaStreams streams) {
this.streams = streams;
this.scheduler = Executors.newScheduledThreadPool(1);
}
public void startMonitoring() {
scheduler.scheduleAtFixedRate(() -> {
try {
logApplicationState();
logMetrics();
} catch (Exception e) {
System.err.println("Monitoring error: " + e.getMessage());
}
}, 0, 30, TimeUnit.SECONDS);
}
private void logApplicationState() {
KafkaStreams.State state = streams.state();
System.out.println("Application state: " + state);
if (state == KafkaStreams.State.ERROR) {
System.err.println("Application is in ERROR state - check logs");
}
}
private void logMetrics() {
// Log key metrics for exactly-once processing
streams.metrics().forEach((metricName, metric) -> {
String name = metricName.name();
if (name.contains("transaction") || name.contains("commit") || name.contains("abort")) {
System.out.println("Metric " + name + ": " + metric.metricValue());
}
});
}
public void shutdown() {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Run the application with monitoring
Start the exactly-once processing application and verify transaction behavior.
# Run the basic exactly-once application
java -cp target/classes:$(mvn dependency:build-classpath -q -Dmdep.outputFile=/dev/stdout) com.example.ExactlyOnceStreamsApp
Test exactly-once behavior
Produce test messages and verify that processing happens exactly once, even with failures.
# In a new terminal, produce test messages
echo "test message 1" | /opt/kafka/bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092
echo "test message 2" | /opt/kafka/bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092
echo "duplicate test" | /opt/kafka/bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092
Consume from output topic to verify processing
/opt/kafka/bin/kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning --isolation-level read_committed
Verify your setup
Test the exactly-once processing setup and verify transaction isolation.
# Check Kafka topics
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Verify transaction state topic exists
/opt/kafka/bin/kafka-topics.sh --describe --topic __transaction_state --bootstrap-server localhost:9092
Check consumer group information
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group exactly-once-app
Monitor application logs for transaction commits
tail -f /opt/kafka/logs/server.log | grep -i transaction
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| TransactionTimeoutException | Transaction timeout too low | Increase transaction.timeout.ms in broker and streams config |
| ProducerFencedException | Multiple instances with same transactional.id | Ensure unique application.id for each instance |
| Invalid transaction state | Corrupted transaction log | Check __transaction_state topic replication and cleanup |
| High commit latency | Frequent small transactions | Tune commit.interval.ms and batch size settings |
| Consumer lag increasing | Read committed isolation waiting for transactions | Monitor transaction completion time and timeout settings |
Next steps
- Configure Kafka Streams state stores and RocksDB optimization for high-performance streaming applications
- Set up Kafka Streams testing framework with TopologyTestDriver for automated stream processing validation
- Configure Kafka Streams exactly-once monitoring with Prometheus and Grafana dashboards
- Implement Kafka Streams error handling with dead letter queues and retry mechanisms
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Kafka Streams Exactly-Once Processing Setup Script
# Usage: ./install_kafka_streams_eos.sh [kafka_broker_host] [kafka_version]
KAFKA_HOST="${1:-localhost:9092}"
KAFKA_VERSION="${2:-3.6.0}"
INSTALL_DIR="/opt/kafka-streams-eos"
SERVICE_USER="kafkastreams"
# Color codes
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Error cleanup
cleanup() {
echo -e "${RED}[ERROR] Installation failed. Cleaning up...${NC}"
systemctl stop kafka-streams-eos 2>/dev/null || true
systemctl disable kafka-streams-eos 2>/dev/null || true
rm -f /etc/systemd/system/kafka-streams-eos.service
userdel -r $SERVICE_USER 2>/dev/null || true
rm -rf $INSTALL_DIR
echo -e "${RED}Cleanup completed${NC}"
exit 1
}
trap cleanup ERR
# Usage message
usage() {
echo "Usage: $0 [kafka_broker_host] [kafka_version]"
echo " kafka_broker_host: Kafka broker address (default: localhost:9092)"
echo " kafka_version: Kafka version (default: 3.6.0)"
exit 1
}
if [[ "${1:-}" == "-h" || "${1:-}" == "--help" ]]; then
usage
fi
# Check prerequisites
echo -e "${YELLOW}[1/10] Checking prerequisites...${NC}"
if [[ $EUID -ne 0 ]]; then
echo -e "${RED}This script must be run as root${NC}"
exit 1
fi
# Detect distribution
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update"
JAVA_PKG="openjdk-11-jdk"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y --refresh"
JAVA_PKG="java-11-openjdk-devel"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
JAVA_PKG="java-11-openjdk-devel"
;;
*)
echo -e "${RED}Unsupported distro: $ID${NC}"
exit 1
;;
esac
else
echo -e "${RED}Cannot detect distribution${NC}"
exit 1
fi
echo -e "${GREEN}Detected: $PRETTY_NAME${NC}"
# Update package manager
echo -e "${YELLOW}[2/10] Updating package manager...${NC}"
$PKG_UPDATE
# Install Java and Maven
echo -e "${YELLOW}[3/10] Installing Java 11 and Maven...${NC}"
$PKG_INSTALL $JAVA_PKG maven curl wget
# Verify Java installation
java_version=$(java -version 2>&1 | head -1 | cut -d'"' -f2)
echo -e "${GREEN}Java installed: $java_version${NC}"
# Create service user
echo -e "${YELLOW}[4/10] Creating service user...${NC}"
if ! id "$SERVICE_USER" &>/dev/null; then
useradd -r -s /bin/false -d $INSTALL_DIR $SERVICE_USER
fi
# Create project structure
echo -e "${YELLOW}[5/10] Creating project structure...${NC}"
mkdir -p $INSTALL_DIR/src/main/java/com/example
mkdir -p $INSTALL_DIR/logs
mkdir -p $INSTALL_DIR/config
# Create pom.xml
echo -e "${YELLOW}[6/10] Creating Maven configuration...${NC}"
cat > $INSTALL_DIR/pom.xml << EOF
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-streams-eos</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<kafka.version>$KAFKA_VERSION</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>\${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.ExactlyOnceStreamsApp</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
EOF
# Create Kafka Streams application
echo -e "${YELLOW}[7/10] Creating Kafka Streams application...${NC}"
cat > $INSTALL_DIR/src/main/java/com/example/ExactlyOnceStreamsApp.java << 'EOF'
package com.example;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class ExactlyOnceStreamsApp {
public static void main(String[] args) {
Properties props = new Properties();
// Basic Kafka Streams configuration
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, System.getProperty("bootstrap.servers", "localhost:9092"));
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// Exactly-once semantics configuration
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// Consumer configuration for exactly-once
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Transaction configuration
props.put(StreamsConfig.producerPrefix(org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG), "exactly-once-producer");
props.put(StreamsConfig.producerPrefix(org.apache.kafka.clients.producer.ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), 60000);
StreamsBuilder builder = new StreamsBuilder();
// Define stream processing topology
KStream<String, String> inputStream = builder.stream("input-topic");
KStream<String, String> processedStream = inputStream
.mapValues(value -> {
// Simulate processing with transformation
return "processed-" + value.toUpperCase();
});
processedStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
System.out.println("Shutting down Kafka Streams application...");
streams.close();
latch.countDown();
}
});
try {
System.out.println("Starting Kafka Streams application with exactly-once semantics...");
streams.start();
latch.await();
} catch (Throwable e) {
System.err.println("Error running streams application: " + e.getMessage());
System.exit(1);
}
System.exit(0);
}
}
EOF
# Build the application
echo -e "${YELLOW}[8/10] Building application...${NC}"
cd $INSTALL_DIR
mvn clean package -q
# Create configuration file
cat > $INSTALL_DIR/config/application.properties << EOF
bootstrap.servers=$KAFKA_HOST
application.id=exactly-once-app
processing.guarantee=exactly_once_v2
commit.interval.ms=1000
EOF
# Create systemd service
echo -e "${YELLOW}[9/10] Creating systemd service...${NC}"
cat > /etc/systemd/system/kafka-streams-eos.service << EOF
[Unit]
Description=Kafka Streams Exactly-Once Processing Application
After=network.target
Requires=network.target
[Service]
Type=simple
User=$SERVICE_USER
Group=$SERVICE_USER
WorkingDirectory=$INSTALL_DIR
Environment=JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))
ExecStart=/usr/bin/java -Dbootstrap.servers=$KAFKA_HOST -jar $INSTALL_DIR/target/kafka-streams-eos-1.0.0.jar
Restart=on-failure
RestartSec=10
StandardOutput=append:$INSTALL_DIR/logs/application.log
StandardError=append:$INSTALL_DIR/logs/error.log
[Install]
WantedBy=multi-user.target
EOF
# Set proper ownership and permissions
chown -R $SERVICE_USER:$SERVICE_USER $INSTALL_DIR
chmod 755 $INSTALL_DIR
chmod 644 $INSTALL_DIR/pom.xml
chmod 644 $INSTALL_DIR/src/main/java/com/example/ExactlyOnceStreamsApp.java
chmod 644 $INSTALL_DIR/config/application.properties
chmod 755 $INSTALL_DIR/logs
chmod 644 /etc/systemd/system/kafka-streams-eos.service
# Enable and start service
systemctl daemon-reload
systemctl enable kafka-streams-eos
echo -e "${YELLOW}[10/10] Running verification checks...${NC}"
# Verify Java installation
if ! java -version &>/dev/null; then
echo -e "${RED}Java verification failed${NC}"
exit 1
fi
# Verify Maven installation
if ! mvn -version &>/dev/null; then
echo -e "${RED}Maven verification failed${NC}"
exit 1
fi
# Verify JAR file exists
if [[ ! -f "$INSTALL_DIR/target/kafka-streams-eos-1.0.0.jar" ]]; then
echo -e "${RED}Application JAR not found${NC}"
exit 1
fi
# Verify service is enabled
if ! systemctl is-enabled kafka-streams-eos &>/dev/null; then
echo -e "${RED}Service not enabled${NC}"
exit 1
fi
echo -e "${GREEN}✓ Installation completed successfully!${NC}"
echo ""
echo "Next steps:"
echo "1. Ensure Kafka cluster is configured with exactly-once support"
echo "2. Create input-topic and output-topic with replication factor 3"
echo "3. Start the service: systemctl start kafka-streams-eos"
echo "4. Monitor logs: journalctl -u kafka-streams-eos -f"
echo "5. Check status: systemctl status kafka-streams-eos"
echo ""
echo "Application installed at: $INSTALL_DIR"
echo "Service user: $SERVICE_USER"
echo "Kafka broker: $KAFKA_HOST"
Review the script before running. Execute with: bash install.sh