Implement Kafka Streams exactly-once processing semantics with Java applications

Advanced 45 min May 19, 2026 11 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Configure Kafka cluster and Java applications for exactly-once processing semantics with transaction state management, idempotent producers, and EOS isolation levels for reliable stream processing.

Prerequisites

  • Running Kafka cluster version 2.8+
  • Java 11 or higher
  • Maven build tool
  • Sufficient disk space for transaction logs

What this solves

Kafka Streams exactly-once semantics (EOS) ensures that each record is processed exactly once, even during failures or restarts. This prevents duplicate processing and maintains data consistency in stream processing applications, crucial for financial transactions, inventory management, and other mission-critical workloads where duplicate processing could cause incorrect results or data corruption.

Prerequisites

You need a running Kafka cluster with at least version 2.8.0, Java 11 or higher, and Maven for building the application. Your Kafka cluster should have sufficient brokers for replication and transaction log management.

Install Java and Maven

Install the required development tools for building Kafka Streams applications.

sudo apt update
sudo apt install -y openjdk-11-jdk maven
sudo dnf install -y java-11-openjdk-devel maven

Configure Kafka cluster for exactly-once support

Enable transaction support in Kafka brokers

Configure your Kafka brokers to support transactions and exactly-once semantics by updating the server configuration.

# Enable transaction coordinator
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.num.partitions=50

Enable idempotent producer support

enable.idempotence=true

Configure transaction timeout

transaction.max.timeout.ms=900000 transaction.timeout.ms=60000

Configure log segment settings for transaction state

log.segment.bytes=1073741824 log.retention.hours=168

Restart Kafka brokers

Restart your Kafka brokers to apply the transaction configuration changes.

sudo systemctl restart kafka
sudo systemctl status kafka

Create topics with proper replication

Create input and output topics with sufficient replication for exactly-once processing.

/opt/kafka/bin/kafka-topics.sh --create \
  --topic input-topic \
  --bootstrap-server localhost:9092 \
  --partitions 6 \
  --replication-factor 3

/opt/kafka/bin/kafka-topics.sh --create \
  --topic output-topic \
  --bootstrap-server localhost:9092 \
  --partitions 6 \
  --replication-factor 3

Implement Kafka Streams application with EOS

Create Maven project structure

Set up a new Maven project for the Kafka Streams application with exactly-once semantics.

mkdir -p kafka-streams-eos/src/main/java/com/example
cd kafka-streams-eos

Configure Maven dependencies

Create the pom.xml file with Kafka Streams dependencies and Java 11 configuration.



    4.0.0
    
    com.example
    kafka-streams-eos
    1.0.0
    
    
        11
        11
        3.6.0
    
    
    
        
            org.apache.kafka
            kafka-streams
            ${kafka.version}
        
        
            org.slf4j
            slf4j-simple
            2.0.9
        
    
    
    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.11.0
            
        
    

Implement exactly-once Kafka Streams application

Create the main application class with exactly-once processing semantics and transaction configuration.

package com.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class ExactlyOnceStreamsApp {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        
        // Basic Kafka Streams configuration
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        // Enable exactly-once semantics
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        
        // Transaction and idempotent producer settings
        props.put(StreamsConfig.PRODUCER_PREFIX + "enable.idempotence", "true");
        props.put(StreamsConfig.PRODUCER_PREFIX + "acks", "all");
        props.put(StreamsConfig.PRODUCER_PREFIX + "retries", "10");
        props.put(StreamsConfig.PRODUCER_PREFIX + "max.in.flight.requests.per.connection", "1");
        
        // Consumer configuration for exactly-once
        props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        
        // State store and changelog configuration
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams-eos");
        
        // Transaction timeout configuration
        props.put(StreamsConfig.TRANSACTION_TIMEOUT_CONFIG, 30000);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Define the processing topology
        KStream inputStream = builder.stream("input-topic");
        
        // Transform and process messages with exactly-once semantics
        KStream processedStream = inputStream
            .filter((key, value) -> value != null && !value.isEmpty())
            .mapValues(value -> "PROCESSED-" + value.toUpperCase() + "-" + System.currentTimeMillis())
            .peek((key, value) -> System.out.println("Processing: " + key + " -> " + value));
        
        // Output to result topic with exactly-once guarantee
        processedStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        final CountDownLatch latch = new CountDownLatch(1);
        
        // Attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        
        try {
            streams.start();
            System.out.println("Kafka Streams application started with exactly-once semantics");
            latch.await();
        } catch (Throwable e) {
            System.err.println("Error starting streams: " + e.getMessage());
            System.exit(1);
        }
        System.exit(0);
    }
}

Build the application

Compile and package the Kafka Streams application using Maven.

mvn clean compile package

Configure transaction state and isolation levels

Create advanced configuration class

Implement a configuration utility for managing exactly-once settings and transaction parameters.

package com.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;

import java.util.Properties;

public class ExactlyOnceConfig {
    
    public static Properties getStreamsConfig(String applicationId, String bootstrapServers) {
        Properties props = new Properties();
        
        // Application identity
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.CLIENT_ID_CONFIG, applicationId + "-client");
        
        // Serialization
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        
        // Exactly-once semantics configuration
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        
        // Producer configuration for exactly-once
        props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG, "all");
        props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        props.put(StreamsConfig.PRODUCER_PREFIX + ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
        
        // Consumer configuration for read_committed isolation
        props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // Transaction and commit configuration
        props.put(StreamsConfig.TRANSACTION_TIMEOUT_CONFIG, 30000);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        
        // State store configuration
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams/" + applicationId);
        
        // Performance tuning
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
        props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 1000);
        
        return props;
    }
    
    public static void validateConfiguration(Properties props) {
        String processingGuarantee = props.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
        if (!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {
            throw new IllegalArgumentException("Processing guarantee must be exactly_once_v2");
        }
        
        String isolationLevel = props.getProperty(
            StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.ISOLATION_LEVEL_CONFIG
        );
        if (!"read_committed".equals(isolationLevel)) {
            throw new IllegalArgumentException("Isolation level must be read_committed for EOS");
        }
        
        System.out.println("Exactly-once configuration validated successfully");
    }
}

Create stateful processing example

Implement a more complex example with state stores to demonstrate exactly-once semantics with stateful operations.

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class StatefulExactlyOnceApp {
    
    private static final String STORE_NAME = "word-count-store";
    
    public static void main(String[] args) {
        Properties props = ExactlyOnceConfig.getStreamsConfig(
            "stateful-exactly-once-app", 
            "localhost:9092"
        );
        
        ExactlyOnceConfig.validateConfiguration(props);
        
        StreamsBuilder builder = new StreamsBuilder();
        
        // Add state store to the topology
        StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore(STORE_NAME),
            Serdes.String(),
            Serdes.Long()
        ).withCachingEnabled();
        
        builder.addStateStore(storeBuilder);
        
        // Create input stream
        KStream inputStream = builder.stream("input-topic");
        
        // Process with stateful transformations
        KStream wordCountStream = inputStream
            .flatMapValues(value -> {
                if (value == null) return java.util.Collections.emptyList();
                return java.util.Arrays.asList(value.toLowerCase().split("\\W+"));
            })
            .transform(() -> new WordCountTransformer(), STORE_NAME);
        
        // Output results
        wordCountStream.to("word-count-output", Produced.with(Serdes.String(), Serdes.String()));
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        
        final CountDownLatch latch = new CountDownLatch(1);
        
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            streams.close();
            latch.countDown();
        }));
        
        try {
            streams.start();
            System.out.println("Stateful exactly-once application started");
            latch.await();
        } catch (Exception e) {
            System.err.println("Error: " + e.getMessage());
            e.printStackTrace();
            System.exit(1);
        }
        System.exit(0);
    }
    
    private static class WordCountTransformer implements Transformer> {
        private KeyValueStore store;
        
        @Override
        public void init(org.apache.kafka.streams.processor.ProcessorContext context) {
            this.store = context.getStateStore(STORE_NAME);
        }
        
        @Override
        public KeyValue transform(String key, String word) {
            if (word == null || word.isEmpty()) {
                return null;
            }
            
            Long currentCount = store.get(word);
            long newCount = (currentCount == null) ? 1L : currentCount + 1L;
            
            // This update is exactly-once due to transaction semantics
            store.put(word, newCount);
            
            return KeyValue.pair(word, word + ":" + newCount);
        }
        
        @Override
        public void close() {
            // Cleanup if needed
        }
    }
}

Create additional output topic

Create the topic for the stateful processing example output.

/opt/kafka/bin/kafka-topics.sh --create \
  --topic word-count-output \
  --bootstrap-server localhost:9092 \
  --partitions 6 \
  --replication-factor 3

Monitor and troubleshoot exactly-once processing

Create monitoring utility

Implement monitoring capabilities to track exactly-once processing metrics and transaction state.

package com.example;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.StateStore;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ExactlyOnceMonitor {
    private final KafkaStreams streams;
    private final ScheduledExecutorService scheduler;
    
    public ExactlyOnceMonitor(KafkaStreams streams) {
        this.streams = streams;
        this.scheduler = Executors.newScheduledThreadPool(1);
    }
    
    public void startMonitoring() {
        scheduler.scheduleAtFixedRate(() -> {
            try {
                logApplicationState();
                logMetrics();
            } catch (Exception e) {
                System.err.println("Monitoring error: " + e.getMessage());
            }
        }, 0, 30, TimeUnit.SECONDS);
    }
    
    private void logApplicationState() {
        KafkaStreams.State state = streams.state();
        System.out.println("Application state: " + state);
        
        if (state == KafkaStreams.State.ERROR) {
            System.err.println("Application is in ERROR state - check logs");
        }
    }
    
    private void logMetrics() {
        // Log key metrics for exactly-once processing
        streams.metrics().forEach((metricName, metric) -> {
            String name = metricName.name();
            if (name.contains("transaction") || name.contains("commit") || name.contains("abort")) {
                System.out.println("Metric " + name + ": " + metric.metricValue());
            }
        });
    }
    
    public void shutdown() {
        scheduler.shutdown();
        try {
            if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                scheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            scheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

Run the application with monitoring

Start the exactly-once processing application and verify transaction behavior.

# Run the basic exactly-once application
java -cp target/classes:$(mvn dependency:build-classpath -q -Dmdep.outputFile=/dev/stdout) com.example.ExactlyOnceStreamsApp

Test exactly-once behavior

Produce test messages and verify that processing happens exactly once, even with failures.

# In a new terminal, produce test messages
echo "test message 1" | /opt/kafka/bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092
echo "test message 2" | /opt/kafka/bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092
echo "duplicate test" | /opt/kafka/bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092

Consume from output topic to verify processing

/opt/kafka/bin/kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning --isolation-level read_committed

Verify your setup

Test the exactly-once processing setup and verify transaction isolation.

# Check Kafka topics
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Verify transaction state topic exists

/opt/kafka/bin/kafka-topics.sh --describe --topic __transaction_state --bootstrap-server localhost:9092

Check consumer group information

/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group exactly-once-app

Monitor application logs for transaction commits

tail -f /opt/kafka/logs/server.log | grep -i transaction

Common issues

SymptomCauseFix
TransactionTimeoutExceptionTransaction timeout too lowIncrease transaction.timeout.ms in broker and streams config
ProducerFencedExceptionMultiple instances with same transactional.idEnsure unique application.id for each instance
Invalid transaction stateCorrupted transaction logCheck __transaction_state topic replication and cleanup
High commit latencyFrequent small transactionsTune commit.interval.ms and batch size settings
Consumer lag increasingRead committed isolation waiting for transactionsMonitor transaction completion time and timeout settings
Warning: Exactly-once semantics requires careful configuration of timeouts and replication factors. Test thoroughly with failure scenarios to ensure your configuration handles broker failures and network partitions correctly.

Next steps

Running this in production?

Want this handled for you? Running Kafka Streams at scale adds a second layer of work: capacity planning, failover drills, transaction monitoring, and performance optimization across environments. See how we run infrastructure like this for European teams.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.