Set up Kafka Streams testing framework with TopologyTestDriver for automated stream processing validation

Intermediate 45 min May 15, 2026 23 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Configure a complete testing framework for Kafka Streams applications using TopologyTestDriver to validate stream processing logic with automated tests and mock data pipelines.

Prerequisites

  • Java 11 or later
  • Maven or Gradle build tool
  • Basic understanding of Apache Kafka concepts

What this solves

Testing Kafka Streams applications in production environments is complex and expensive. The TopologyTestDriver provides a lightweight testing framework that simulates Kafka brokers and topics without requiring a full Kafka cluster. This enables fast, automated unit tests for stream processing logic, topology validation, and data transformation pipelines.

Step-by-step installation

Install Java Development Kit

Kafka Streams requires Java 11 or later for development and testing.

sudo apt update
sudo apt install -y openjdk-17-jdk maven gradle
sudo dnf update -y
sudo dnf install -y java-17-openjdk-devel maven gradle

Download and install Apache Kafka

Install Kafka with Scala 2.13 for Streams API compatibility.

cd /opt
sudo wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-2.8.2.tgz
sudo tar -xzf kafka_2.13-2.8.2.tgz
sudo mv kafka_2.13-2.8.2 kafka
sudo chown -R $(whoami):$(whoami) /opt/kafka

Set up environment variables

Configure Java and Kafka paths for development tools.

export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin:$JAVA_HOME/bin
source ~/.bashrc
java -version
echo $KAFKA_HOME

Create Maven project structure

Set up a Maven project with proper directory structure for Kafka Streams testing.

mkdir -p kafka-streams-testing
cd kafka-streams-testing
mvn archetype:generate -DgroupId=com.example.streams \
    -DartifactId=kafka-streams-test \
    -DarchetypeArtifactId=maven-archetype-quickstart \
    -DinteractiveMode=false

Configure Maven dependencies

Add Kafka Streams, TopologyTestDriver, and testing dependencies to your project.



    4.0.0
    com.example.streams
    kafka-streams-test
    1.0.0
    jar
    
    
        17
        17
        2.8.2
        5.8.2
    
    
    
        
            org.apache.kafka
            kafka-streams
            ${kafka.version}
        
        
            org.apache.kafka
            kafka-streams-test-utils
            ${kafka.version}
            test
        
        
            org.junit.jupiter
            junit-jupiter
            ${junit.version}
            test
        
        
            org.slf4j
            slf4j-simple
            1.7.36
        
    
    
    
        
            
                org.apache.maven.plugins
                maven-surefire-plugin
                3.0.0-M7
            
        
    

Create Gradle build configuration

Alternative Gradle setup for projects preferring Gradle over Maven.

plugins {
    id 'java'
    id 'application'
}

group = 'com.example.streams'
version = '1.0.0'

java {
    sourceCompatibility = '17'
    targetCompatibility = '17'
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.apache.kafka:kafka-streams:2.8.2'
    implementation 'org.slf4j:slf4j-simple:1.7.36'
    
    testImplementation 'org.apache.kafka:kafka-streams-test-utils:2.8.2'
    testImplementation 'org.junit.jupiter:junit-jupiter:5.8.2'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

test {
    useJUnitPlatform()
    testLogging {
        events "passed", "skipped", "failed"
        exceptionFormat "full"
    }
}

application {
    mainClass = 'com.example.streams.StreamProcessor'
}

Create stream processing application

Build a sample Kafka Streams application for testing word count processing.

package com.example.streams;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Properties;

public class StreamProcessor {
    
    public static Topology createTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        KStream textLines = builder.stream("text-input");
        
        KTable wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .filter((key, word) -> word.length() > 0)
            .groupBy((key, word) -> word)
            .count();
            
        wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
        
        return builder.build();
    }
    
    public static Properties getStreamsConfig() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return props;
    }
    
    public static void main(String[] args) {
        Properties props = getStreamsConfig();
        Topology topology = createTopology();
        
        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Implement TopologyTestDriver tests

Create comprehensive unit tests using TopologyTestDriver for stream processing validation.

package com.example.streams;

import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Properties;

import static org.junit.jupiter.api.Assertions.*;

class StreamProcessorTest {
    
    private TopologyTestDriver testDriver;
    private TestInputTopic inputTopic;
    private TestOutputTopic outputTopic;
    
    @BeforeEach
    void setUp() {
        Properties props = StreamProcessor.getStreamsConfig();
        Topology topology = StreamProcessor.createTopology();
        testDriver = new TopologyTestDriver(topology, props);
        
        inputTopic = testDriver.createInputTopic(
            "text-input",
            new StringSerializer(),
            new StringSerializer()
        );
        
        outputTopic = testDriver.createOutputTopic(
            "word-count-output",
            new StringDeserializer(),
            new LongDeserializer()
        );
    }
    
    @AfterEach
    void tearDown() {
        testDriver.close();
    }
    
    @Test
    void shouldCountWordsCorrectly() {
        // Given
        inputTopic.pipeInput("key1", "hello world");
        inputTopic.pipeInput("key2", "world test hello");
        
        // When & Then
        TestRecord record1 = outputTopic.readRecord();
        assertEquals("hello", record1.key());
        assertEquals(Long.valueOf(1), record1.value());
        
        TestRecord record2 = outputTopic.readRecord();
        assertEquals("world", record2.key());
        assertEquals(Long.valueOf(1), record2.value());
        
        TestRecord record3 = outputTopic.readRecord();
        assertEquals("world", record3.key());
        assertEquals(Long.valueOf(2), record3.value());
        
        TestRecord record4 = outputTopic.readRecord();
        assertEquals("test", record4.key());
        assertEquals(Long.valueOf(1), record4.value());
        
        TestRecord record5 = outputTopic.readRecord();
        assertEquals("hello", record5.key());
        assertEquals(Long.valueOf(2), record5.value());
        
        assertTrue(outputTopic.isEmpty());
    }
    
    @Test
    void shouldIgnoreEmptyStrings() {
        // Given
        inputTopic.pipeInput("key1", "");
        inputTopic.pipeInput("key2", "   ");
        inputTopic.pipeInput("key3", "valid");
        
        // When & Then
        TestRecord record = outputTopic.readRecord();
        assertEquals("valid", record.key());
        assertEquals(Long.valueOf(1), record.value());
        
        assertTrue(outputTopic.isEmpty());
    }
    
    @Test
    void shouldHandleSpecialCharacters() {
        // Given
        inputTopic.pipeInput("key1", "hello,world!test@example");
        
        // When & Then
        outputTopic.readKeyValuesToMap();
        assertFalse(outputTopic.isEmpty());
        
        // Verify word separation by special characters
        TestRecord record1 = outputTopic.readRecord();
        TestRecord record2 = outputTopic.readRecord();
        TestRecord record3 = outputTopic.readRecord();
        TestRecord record4 = outputTopic.readRecord();
        
        // All words should be counted as 1
        assertEquals(Long.valueOf(1), record1.value());
        assertEquals(Long.valueOf(1), record2.value());
        assertEquals(Long.valueOf(1), record3.value());
        assertEquals(Long.valueOf(1), record4.value());
    }
}

Create advanced testing scenarios

Implement tests for error handling and edge cases in stream processing.

package com.example.streams;

import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.time.Instant;

import static org.junit.jupiter.api.Assertions.*;

class AdvancedStreamTest {
    
    private TopologyTestDriver testDriver;
    private TestInputTopic inputTopic;
    
    @BeforeEach
    void setUp() {
        testDriver = new TopologyTestDriver(
            StreamProcessor.createTopology(),
            StreamProcessor.getStreamsConfig()
        );
        
        inputTopic = testDriver.createInputTopic(
            "text-input",
            new StringSerializer(),
            new StringSerializer()
        );
    }
    
    @AfterEach
    void tearDown() {
        testDriver.close();
    }
    
    @Test
    void shouldHandleTimestampedRecords() {
        // Given
        Instant timestamp = Instant.parse("2023-01-01T12:00:00Z");
        
        // When
        inputTopic.pipeInput("key1", "timestamped message", timestamp);
        
        // Then
        TestOutputTopic outputTopic = testDriver.createOutputTopic(
            "word-count-output",
            new StringDeserializer(),
            new org.apache.kafka.common.serialization.LongDeserializer()
        );
        
        TestRecord record = outputTopic.readRecord();
        assertEquals(timestamp, record.getRecordTime());
    }
    
    @Test
    void shouldProcessBatchRecords() {
        // Given
        java.util.List> inputRecords = java.util.Arrays.asList(
            new TestRecord<>("key1", "batch one"),
            new TestRecord<>("key2", "batch two"),
            new TestRecord<>("key3", "batch three")
        );
        
        // When
        inputTopic.pipeRecordList(inputRecords);
        
        // Then
        TestOutputTopic outputTopic = testDriver.createOutputTopic(
            "word-count-output",
            new StringDeserializer(),
            new org.apache.kafka.common.serialization.LongDeserializer()
        );
        
        assertEquals(6, outputTopic.getQueueSize());
    }
    
    @Test
    void shouldAdvanceTimeCorrectly() {
        // Given
        Duration timeAdvancement = Duration.ofMinutes(1);
        
        // When
        inputTopic.pipeInput("key1", "time test");
        testDriver.advanceWallClockTime(timeAdvancement);
        inputTopic.pipeInput("key2", "time test again");
        
        // Then
        assertTrue(testDriver.getAllStateStores().isEmpty() || 
                  !testDriver.getAllStateStores().isEmpty());
    }
}

Configure test automation with Maven

Set up automated test execution and reporting with Maven surefire plugin.

cd kafka-streams-test
mvn clean compile
mvn test
# Run specific test class
mvn test -Dtest=StreamProcessorTest

Run tests with detailed output

mvn test -Dtest=StreamProcessorTest -DforkCount=1 -DreuseForks=false

Configure test automation with Gradle

Alternative Gradle configuration for automated testing and continuous integration.

# Build and run all tests
./gradlew test

Run specific test class

./gradlew test --tests StreamProcessorTest

Generate test reports

./gradlew test jacocoTestReport
// Add to existing build.gradle
apply plugin: 'jacoco'

jacoco {
    toolVersion = "0.8.7"
}

jacocoTestReport {
    reports {
        xml.required = false
        csv.required = false
        html.outputLocation = layout.buildDirectory.dir('jacocoHtml')
    }
}

Set up continuous integration testing

Create GitHub Actions workflow for automated testing on code changes.

name: Kafka Streams Tests

on:
  push:
    branches: [ main, develop ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
    
    strategy:
      matrix:
        java-version: [11, 17]
    
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up JDK ${{ matrix.java-version }}
      uses: actions/setup-java@v3
      with:
        java-version: ${{ matrix.java-version }}
        distribution: 'temurin'
    
    - name: Cache Maven dependencies
      uses: actions/cache@v3
      with:
        path: ~/.m2
        key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
    
    - name: Run tests
      run: mvn clean test
    
    - name: Generate test report
      run: mvn surefire-report:report
    
    - name: Upload test results
      uses: actions/upload-artifact@v3
      if: always()
      with:
        name: test-results-java-${{ matrix.java-version }}
        path: target/surefire-reports/

Verify your setup

# Check Java installation
java -version
javac -version

Verify Maven build

mvn --version mvn clean compile

Run all tests

mvn test

Check test results

ls -la target/surefire-reports/ cat target/surefire-reports/TEST-*.xml

Common issues

SymptomCauseFix
Tests fail with ClassNotFoundExceptionMissing kafka-streams-test-utils dependencyAdd test-utils dependency to pom.xml with test scope
TopologyTestDriver doesn't startInvalid Streams configurationEnsure APPLICATION_ID_CONFIG is set in test properties
Input/Output topics not foundTopic names don't match topologyVerify topic names in createInputTopic match stream builder
Serialization errors in testsMismatched serializers/deserializersUse consistent Serde types for test topics and topology
Maven build failsJava version incompatibilityEnsure Java 11+ and matching maven.compiler properties

Next steps

Running this in production?

Want this handled for you? Setting up Kafka Streams testing once is straightforward. Keeping it integrated with CI/CD, maintaining test environments, and ensuring comprehensive coverage across environments is the harder part. See how we run infrastructure like this for European teams building real-time data platforms.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.