Configure Kafka Connect for database integration with JDBC connectors and CDC

Intermediate 45 min Apr 03, 2026 406 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Set up Kafka Connect with JDBC connectors for real-time database integration and configure Debezium for change data capture. Monitor connector performance and troubleshoot common integration issues.

Prerequisites

  • Apache Kafka cluster running
  • PostgreSQL or MySQL database
  • Java 17 or higher
  • Administrative access to database systems
  • Network connectivity between services

What this solves

Kafka Connect enables real-time integration between databases and Apache Kafka by streaming data changes automatically. This tutorial shows you how to configure JDBC source and sink connectors for database integration, set up Debezium for change data capture (CDC), and monitor connector performance in production environments.

Step-by-step configuration

Install required packages and dependencies

Install Java 17 and required database drivers for JDBC connectivity.

sudo apt update
sudo apt install -y openjdk-17-jdk wget curl unzip postgresql-client mysql-client
sudo dnf update -y
sudo dnf install -y java-17-openjdk wget curl unzip postgresql mysql

Download and install Confluent Platform

Download Confluent Platform which includes Kafka Connect with enhanced connectors and monitoring capabilities.

cd /opt
sudo wget https://packages.confluent.io/archive/7.5/confluent-7.5.0.tar.gz
sudo tar -xzf confluent-7.5.0.tar.gz
sudo mv confluent-7.5.0 confluent
sudo chown -R $(whoami):$(whoami) /opt/confluent

Configure environment variables

Set up environment variables for Confluent Platform and add them to your shell profile.

CONFLUENT_HOME=/opt/confluent
PATH="$PATH:/opt/confluent/bin"
JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
source /etc/environment
export CONFLUENT_HOME=/opt/confluent
export PATH=$PATH:$CONFLUENT_HOME/bin

Download JDBC drivers

Download database-specific JDBC drivers for PostgreSQL, MySQL, and SQL Server integration.

cd $CONFLUENT_HOME/share/java/kafka-connect-jdbc
wget https://jdbc.postgresql.org/download/postgresql-42.7.1.jar
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-j-8.2.0.jar
wget https://github.com/Microsoft/mssql-jdbc/releases/download/v12.4.2/mssql-jdbc-12.4.2.jre11.jar

Configure Kafka Connect distributed mode

Create configuration for Kafka Connect in distributed mode for production scalability.

bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
plugin.path=/opt/confluent/share/java,/opt/confluent/share/confluent-hub-components
rest.port=8083
rest.advertised.host.name=localhost
rest.advertised.port=8083

Install Debezium connectors

Install Debezium connectors for change data capture from various database systems.

confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.4.0
confluent-hub install --no-prompt debezium/debezium-connector-mysql:2.4.0
confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:2.4.0

Create systemd service for Kafka Connect

Configure Kafka Connect as a systemd service for automatic startup and management.

[Unit]
Description=Apache Kafka Connect
Requires=network.target
After=network.target

[Service]
Type=simple
User=confluent
Group=confluent
ExecStart=/opt/confluent/bin/connect-distributed /opt/confluent/etc/kafka/connect-distributed.properties
Restart=on-failure
RestartSec=10
KillMode=process
TimeoutSec=300
Environment=JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
Environment=CONFLUENT_HOME=/opt/confluent

[Install]
WantedBy=multi-user.target

Create dedicated user and set permissions

Create a dedicated user for running Kafka Connect with appropriate permissions.

sudo useradd -r -s /bin/false confluent
sudo chown -R confluent:confluent /opt/confluent
sudo chmod -R 755 /opt/confluent
sudo chmod 644 /etc/systemd/system/kafka-connect.service

Start Kafka Connect service

Enable and start the Kafka Connect service to begin processing connector configurations.

sudo systemctl daemon-reload
sudo systemctl enable kafka-connect
sudo systemctl start kafka-connect
sudo systemctl status kafka-connect

Configure JDBC connectors

Create JDBC source connector for PostgreSQL

Configure a JDBC source connector to stream data from PostgreSQL tables to Kafka topics.

{
  "name": "postgres-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://localhost:5432/production",
    "connection.user": "kafka_user",
    "connection.password": "secure_password_123",
    "table.whitelist": "users,orders,products",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "postgres-",
    "poll.interval.ms": 1000,
    "batch.max.rows": 500,
    "table.poll.interval.ms": 5000
  }
}
curl -X POST -H "Content-Type: application/json" --data @/tmp/postgres-source.json http://localhost:8083/connectors

Create JDBC sink connector

Configure a sink connector to write Kafka topic data to a target database table.

{
  "name": "postgres-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://localhost:5432/analytics",
    "connection.user": "kafka_sink_user",
    "connection.password": "sink_password_456",
    "topics": "user-events,order-events",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_key",
    "table.name.format": "kafka_${topic}"
  }
}
curl -X POST -H "Content-Type: application/json" --data @/tmp/postgres-sink.json http://localhost:8083/connectors

Configure Debezium CDC

Prepare PostgreSQL for CDC

Enable logical replication in PostgreSQL for Debezium change data capture.

sudo -u postgres psql -c "ALTER SYSTEM SET wal_level = logical;"
sudo -u postgres psql -c "ALTER SYSTEM SET max_replication_slots = 10;"
sudo -u postgres psql -c "ALTER SYSTEM SET max_wal_senders = 10;"
sudo systemctl restart postgresql

Create replication user

Create a dedicated PostgreSQL user with replication privileges for Debezium.

sudo -u postgres psql << EOF
CREATE USER debezium_user WITH REPLICATION PASSWORD 'debezium_pass_789';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
GRANT USAGE ON SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;
EOF

Configure Debezium PostgreSQL connector

Set up Debezium connector to capture all data changes from PostgreSQL tables.

{
  "name": "debezium-postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "debezium_user",
    "database.password": "debezium_pass_789",
    "database.dbname": "production",
    "database.server.name": "postgres-server",
    "table.include.list": "public.users,public.orders,public.products",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "debezium_publication",
    "snapshot.mode": "initial",
    "time.precision.mode": "adaptive_time_microseconds",
    "decimal.handling.mode": "precise"
  }
}
curl -X POST -H "Content-Type: application/json" --data @/tmp/debezium-postgres.json http://localhost:8083/connectors

Configure Debezium MySQL connector

Set up change data capture for MySQL using Debezium's binlog reader.

{
  "name": "debezium-mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "mysql_debezium_pass",
    "database.server.id": "184054",
    "database.server.name": "mysql-server",
    "table.include.list": "inventory.customers,inventory.orders",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "include.schema.changes": "true",
    "snapshot.mode": "when_needed",
    "binlog.buffer.size": "32768"
  }
}
curl -X POST -H "Content-Type: application/json" --data @/tmp/debezium-mysql.json http://localhost:8083/connectors

Monitor connector performance

Configure JMX monitoring

Enable JMX metrics collection for detailed connector performance monitoring.

# Add these JMX configuration lines
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999"
export JMX_PORT=9999

Set up connector monitoring scripts

Create monitoring scripts to track connector status and performance metrics.

#!/bin/bash

CONNECT_URL="http://localhost:8083"

echo "=== Kafka Connect Cluster Status ==="
curl -s $CONNECT_URL/ | jq .

echo -e "\n=== Active Connectors ==="
curl -s $CONNECT_URL/connectors | jq .

echo -e "\n=== Connector Status Details ==="
for connector in $(curl -s $CONNECT_URL/connectors | jq -r '.[]'); do
    echo "--- $connector ---"
    curl -s $CONNECT_URL/connectors/$connector/status | jq .
done

echo -e "\n=== Failed Tasks ==="
for connector in $(curl -s $CONNECT_URL/connectors | jq -r '.[]'); do
    status=$(curl -s $CONNECT_URL/connectors/$connector/status | jq -r '.connector.state')
    if [ "$status" != "RUNNING" ]; then
        echo "ALERT: $connector is in $status state"
    fi
done
chmod +x /opt/confluent/scripts/monitor-connectors.sh
sudo chown confluent:confluent /opt/confluent/scripts/monitor-connectors.sh

Configure log monitoring

Set up centralized logging and log rotation for Kafka Connect.

log4j.rootLogger=INFO, stdout, connectAppender

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.connectAppender.File=/var/log/kafka-connect/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.connectAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask=WARN
log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=WARN
sudo mkdir -p /var/log/kafka-connect
sudo chown confluent:confluent /var/log/kafka-connect
sudo chmod 755 /var/log/kafka-connect

Verify your setup

Check that Kafka Connect is running and connectors are functioning properly.

# Check Kafka Connect service status
sudo systemctl status kafka-connect

List all connectors

curl -s http://localhost:8083/connectors | jq .

Check specific connector status

curl -s http://localhost:8083/connectors/debezium-postgres-connector/status | jq .

Verify topics created by connectors

kafka-topics --bootstrap-server localhost:9092 --list | grep -E "(postgres-|mysql-server|connect-)"

Check connector metrics

curl -s http://localhost:8083/connectors/postgres-source-connector/status | jq '.tasks[0].trace'

Monitor data flow

kafka-console-consumer --bootstrap-server localhost:9092 --topic postgres-users --from-beginning --max-messages 5
Note: If you haven't set up Kafka yet, follow our Apache Kafka installation guide first to establish the messaging infrastructure.

Common issues

SymptomCauseFix
Connector fails to start Database connection issues or missing JDBC driver Check connection URL, credentials, and verify JDBC driver is in plugin path
Debezium connector stuck in SNAPSHOT mode Large table or insufficient database permissions Use snapshot.mode=schema_only or grant proper SELECT permissions
High memory usage Large batch sizes or insufficient JVM heap Reduce batch.max.rows and increase JVM heap with -Xmx4g
Replication slot errors PostgreSQL replication slot conflicts Drop existing slot: SELECT pg_drop_replication_slot('debezium_slot');
Schema evolution failures Incompatible schema changes Use auto.evolve=true or handle schema registry compatibility
Connector tasks frequently restart Database connection timeouts or deadlocks Tune connection.timeout.ms and poll.interval.ms settings
Warning: Never use chmod 777 on connector configuration files. Database credentials should have minimal required permissions, not admin access.

Next steps

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.