Set up Kafka Connect with JDBC connectors for real-time database integration and configure Debezium for change data capture. Monitor connector performance and troubleshoot common integration issues.
Prerequisites
- Apache Kafka cluster running
- PostgreSQL or MySQL database
- Java 17 or higher
- Administrative access to database systems
- Network connectivity between services
What this solves
Kafka Connect enables real-time integration between databases and Apache Kafka by streaming data changes automatically. This tutorial shows you how to configure JDBC source and sink connectors for database integration, set up Debezium for change data capture (CDC), and monitor connector performance in production environments.
Step-by-step configuration
Install required packages and dependencies
Install Java 17 and required database drivers for JDBC connectivity.
sudo apt update
sudo apt install -y openjdk-17-jdk wget curl unzip postgresql-client mysql-client
Download and install Confluent Platform
Download Confluent Platform which includes Kafka Connect with enhanced connectors and monitoring capabilities.
cd /opt
sudo wget https://packages.confluent.io/archive/7.5/confluent-7.5.0.tar.gz
sudo tar -xzf confluent-7.5.0.tar.gz
sudo mv confluent-7.5.0 confluent
sudo chown -R $(whoami):$(whoami) /opt/confluent
Configure environment variables
Set up environment variables for Confluent Platform and add them to your shell profile.
CONFLUENT_HOME=/opt/confluent
PATH="$PATH:/opt/confluent/bin"
JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
source /etc/environment
export CONFLUENT_HOME=/opt/confluent
export PATH=$PATH:$CONFLUENT_HOME/bin
Download JDBC drivers
Download database-specific JDBC drivers for PostgreSQL, MySQL, and SQL Server integration.
cd $CONFLUENT_HOME/share/java/kafka-connect-jdbc
wget https://jdbc.postgresql.org/download/postgresql-42.7.1.jar
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-j-8.2.0.jar
wget https://github.com/Microsoft/mssql-jdbc/releases/download/v12.4.2/mssql-jdbc-12.4.2.jre11.jar
Configure Kafka Connect distributed mode
Create configuration for Kafka Connect in distributed mode for production scalability.
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
plugin.path=/opt/confluent/share/java,/opt/confluent/share/confluent-hub-components
rest.port=8083
rest.advertised.host.name=localhost
rest.advertised.port=8083
Install Debezium connectors
Install Debezium connectors for change data capture from various database systems.
confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.4.0
confluent-hub install --no-prompt debezium/debezium-connector-mysql:2.4.0
confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:2.4.0
Create systemd service for Kafka Connect
Configure Kafka Connect as a systemd service for automatic startup and management.
[Unit]
Description=Apache Kafka Connect
Requires=network.target
After=network.target
[Service]
Type=simple
User=confluent
Group=confluent
ExecStart=/opt/confluent/bin/connect-distributed /opt/confluent/etc/kafka/connect-distributed.properties
Restart=on-failure
RestartSec=10
KillMode=process
TimeoutSec=300
Environment=JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
Environment=CONFLUENT_HOME=/opt/confluent
[Install]
WantedBy=multi-user.target
Create dedicated user and set permissions
Create a dedicated user for running Kafka Connect with appropriate permissions.
sudo useradd -r -s /bin/false confluent
sudo chown -R confluent:confluent /opt/confluent
sudo chmod -R 755 /opt/confluent
sudo chmod 644 /etc/systemd/system/kafka-connect.service
Start Kafka Connect service
Enable and start the Kafka Connect service to begin processing connector configurations.
sudo systemctl daemon-reload
sudo systemctl enable kafka-connect
sudo systemctl start kafka-connect
sudo systemctl status kafka-connect
Configure JDBC connectors
Create JDBC source connector for PostgreSQL
Configure a JDBC source connector to stream data from PostgreSQL tables to Kafka topics.
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://localhost:5432/production",
"connection.user": "kafka_user",
"connection.password": "secure_password_123",
"table.whitelist": "users,orders,products",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "postgres-",
"poll.interval.ms": 1000,
"batch.max.rows": 500,
"table.poll.interval.ms": 5000
}
}
curl -X POST -H "Content-Type: application/json" --data @/tmp/postgres-source.json http://localhost:8083/connectors
Create JDBC sink connector
Configure a sink connector to write Kafka topic data to a target database table.
{
"name": "postgres-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://localhost:5432/analytics",
"connection.user": "kafka_sink_user",
"connection.password": "sink_password_456",
"topics": "user-events,order-events",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_key",
"table.name.format": "kafka_${topic}"
}
}
curl -X POST -H "Content-Type: application/json" --data @/tmp/postgres-sink.json http://localhost:8083/connectors
Configure Debezium CDC
Prepare PostgreSQL for CDC
Enable logical replication in PostgreSQL for Debezium change data capture.
sudo -u postgres psql -c "ALTER SYSTEM SET wal_level = logical;"
sudo -u postgres psql -c "ALTER SYSTEM SET max_replication_slots = 10;"
sudo -u postgres psql -c "ALTER SYSTEM SET max_wal_senders = 10;"
sudo systemctl restart postgresql
Create replication user
Create a dedicated PostgreSQL user with replication privileges for Debezium.
sudo -u postgres psql << EOF
CREATE USER debezium_user WITH REPLICATION PASSWORD 'debezium_pass_789';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_user;
GRANT USAGE ON SCHEMA public TO debezium_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_user;
EOF
Configure Debezium PostgreSQL connector
Set up Debezium connector to capture all data changes from PostgreSQL tables.
{
"name": "debezium-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "debezium_pass_789",
"database.dbname": "production",
"database.server.name": "postgres-server",
"table.include.list": "public.users,public.orders,public.products",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_publication",
"snapshot.mode": "initial",
"time.precision.mode": "adaptive_time_microseconds",
"decimal.handling.mode": "precise"
}
}
curl -X POST -H "Content-Type: application/json" --data @/tmp/debezium-postgres.json http://localhost:8083/connectors
Configure Debezium MySQL connector
Set up change data capture for MySQL using Debezium's binlog reader.
{
"name": "debezium-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "mysql_debezium_pass",
"database.server.id": "184054",
"database.server.name": "mysql-server",
"table.include.list": "inventory.customers,inventory.orders",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"include.schema.changes": "true",
"snapshot.mode": "when_needed",
"binlog.buffer.size": "32768"
}
}
curl -X POST -H "Content-Type: application/json" --data @/tmp/debezium-mysql.json http://localhost:8083/connectors
Monitor connector performance
Configure JMX monitoring
Enable JMX metrics collection for detailed connector performance monitoring.
# Add these JMX configuration lines
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999"
export JMX_PORT=9999
Set up connector monitoring scripts
Create monitoring scripts to track connector status and performance metrics.
#!/bin/bash
CONNECT_URL="http://localhost:8083"
echo "=== Kafka Connect Cluster Status ==="
curl -s $CONNECT_URL/ | jq .
echo -e "\n=== Active Connectors ==="
curl -s $CONNECT_URL/connectors | jq .
echo -e "\n=== Connector Status Details ==="
for connector in $(curl -s $CONNECT_URL/connectors | jq -r '.[]'); do
echo "--- $connector ---"
curl -s $CONNECT_URL/connectors/$connector/status | jq .
done
echo -e "\n=== Failed Tasks ==="
for connector in $(curl -s $CONNECT_URL/connectors | jq -r '.[]'); do
status=$(curl -s $CONNECT_URL/connectors/$connector/status | jq -r '.connector.state')
if [ "$status" != "RUNNING" ]; then
echo "ALERT: $connector is in $status state"
fi
done
chmod +x /opt/confluent/scripts/monitor-connectors.sh
sudo chown confluent:confluent /opt/confluent/scripts/monitor-connectors.sh
Configure log monitoring
Set up centralized logging and log rotation for Kafka Connect.
log4j.rootLogger=INFO, stdout, connectAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.connectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.connectAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.connectAppender.File=/var/log/kafka-connect/connect.log
log4j.appender.connectAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.connectAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask=WARN
log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask=WARN
sudo mkdir -p /var/log/kafka-connect
sudo chown confluent:confluent /var/log/kafka-connect
sudo chmod 755 /var/log/kafka-connect
Verify your setup
Check that Kafka Connect is running and connectors are functioning properly.
# Check Kafka Connect service status
sudo systemctl status kafka-connect
List all connectors
curl -s http://localhost:8083/connectors | jq .
Check specific connector status
curl -s http://localhost:8083/connectors/debezium-postgres-connector/status | jq .
Verify topics created by connectors
kafka-topics --bootstrap-server localhost:9092 --list | grep -E "(postgres-|mysql-server|connect-)"
Check connector metrics
curl -s http://localhost:8083/connectors/postgres-source-connector/status | jq '.tasks[0].trace'
Monitor data flow
kafka-console-consumer --bootstrap-server localhost:9092 --topic postgres-users --from-beginning --max-messages 5
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Connector fails to start | Database connection issues or missing JDBC driver | Check connection URL, credentials, and verify JDBC driver is in plugin path |
| Debezium connector stuck in SNAPSHOT mode | Large table or insufficient database permissions | Use snapshot.mode=schema_only or grant proper SELECT permissions |
| High memory usage | Large batch sizes or insufficient JVM heap | Reduce batch.max.rows and increase JVM heap with -Xmx4g |
| Replication slot errors | PostgreSQL replication slot conflicts | Drop existing slot: SELECT pg_drop_replication_slot('debezium_slot'); |
| Schema evolution failures | Incompatible schema changes | Use auto.evolve=true or handle schema registry compatibility |
| Connector tasks frequently restart | Database connection timeouts or deadlocks | Tune connection.timeout.ms and poll.interval.ms settings |
Next steps
- Set up Prometheus and Grafana monitoring for Kafka Connect metrics
- Implement Spark streaming with Kafka for real-time analytics
- Configure Kafka Schema Registry with Avro for data serialization
- Build Kafka Streams applications for real-time data processing
- Set up Kafka Connect cluster with high availability and load balancing
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
# Default configuration
CONFLUENT_VERSION="7.5.0"
CONFLUENT_HOME="/opt/confluent"
KAFKA_CONNECT_USER="confluent"
# Usage information
usage() {
echo "Usage: $0 [options]"
echo "Options:"
echo " -h, --help Show this help message"
echo " -v, --version Confluent Platform version (default: 7.5.0)"
echo " -d, --directory Installation directory (default: /opt/confluent)"
exit 1
}
# Parse command line arguments
while [[ $# -gt 0 ]]; do
case $1 in
-h|--help)
usage
;;
-v|--version)
CONFLUENT_VERSION="$2"
shift 2
;;
-d|--directory)
CONFLUENT_HOME="$2"
shift 2
;;
*)
echo -e "${RED}Unknown option: $1${NC}"
usage
;;
esac
done
# Cleanup function for rollback
cleanup() {
echo -e "${RED}[ERROR] Installation failed. Cleaning up...${NC}"
sudo systemctl stop kafka-connect 2>/dev/null || true
sudo systemctl disable kafka-connect 2>/dev/null || true
sudo rm -f /etc/systemd/system/kafka-connect.service
sudo userdel -r $KAFKA_CONNECT_USER 2>/dev/null || true
sudo rm -rf $CONFLUENT_HOME
exit 1
}
trap cleanup ERR
# Check if running with sudo/root
if [[ $EUID -ne 0 ]]; then
echo -e "${RED}This script must be run as root or with sudo${NC}"
exit 1
fi
# Auto-detect distribution and set package manager
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update"
JAVA_HOME_PATH="/usr/lib/jvm/java-17-openjdk-amd64"
DB_PACKAGES="postgresql-client mysql-client"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
JAVA_HOME_PATH="/usr/lib/jvm/java-17-openjdk"
DB_PACKAGES="postgresql mysql"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
JAVA_HOME_PATH="/usr/lib/jvm/java-17-openjdk"
DB_PACKAGES="postgresql mysql"
;;
*)
echo -e "${RED}Unsupported distribution: $ID${NC}"
exit 1
;;
esac
else
echo -e "${RED}Cannot detect distribution${NC}"
exit 1
fi
echo -e "${GREEN}[1/8] Installing required packages and dependencies...${NC}"
$PKG_UPDATE
$PKG_INSTALL openjdk-17-jdk wget curl unzip $DB_PACKAGES
echo -e "${GREEN}[2/8] Downloading and installing Confluent Platform...${NC}"
cd /opt
wget -q "https://packages.confluent.io/archive/${CONFLUENT_VERSION%.*}/confluent-${CONFLUENT_VERSION}.tar.gz"
tar -xzf "confluent-${CONFLUENT_VERSION}.tar.gz"
mv "confluent-${CONFLUENT_VERSION}" confluent
rm -f "confluent-${CONFLUENT_VERSION}.tar.gz"
echo -e "${GREEN}[3/8] Creating dedicated user and setting permissions...${NC}"
useradd -r -s /bin/false $KAFKA_CONNECT_USER || true
chown -R $KAFKA_CONNECT_USER:$KAFKA_CONNECT_USER $CONFLUENT_HOME
chmod -R 755 $CONFLUENT_HOME
echo -e "${GREEN}[4/8] Configuring environment variables...${NC}"
cat > /etc/environment << EOF
CONFLUENT_HOME=$CONFLUENT_HOME
JAVA_HOME=$JAVA_HOME_PATH
PATH="\$PATH:$CONFLUENT_HOME/bin"
EOF
export CONFLUENT_HOME=$CONFLUENT_HOME
export JAVA_HOME=$JAVA_HOME_PATH
export PATH=$PATH:$CONFLUENT_HOME/bin
echo -e "${GREEN}[5/8] Downloading JDBC drivers...${NC}"
JDBC_DIR="$CONFLUENT_HOME/share/java/kafka-connect-jdbc"
mkdir -p $JDBC_DIR
cd $JDBC_DIR
# Download JDBC drivers
wget -q https://jdbc.postgresql.org/download/postgresql-42.7.1.jar
wget -q https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-j-8.2.0.jar
wget -q https://github.com/Microsoft/mssql-jdbc/releases/download/v12.4.2/mssql-jdbc-12.4.2.jre11.jar
chown -R $KAFKA_CONNECT_USER:$KAFKA_CONNECT_USER $JDBC_DIR
chmod 644 $JDBC_DIR/*.jar
echo -e "${GREEN}[6/8] Configuring Kafka Connect distributed mode...${NC}"
cat > $CONFLUENT_HOME/etc/kafka/connect-distributed.properties << EOF
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
plugin.path=$CONFLUENT_HOME/share/java,$CONFLUENT_HOME/share/confluent-hub-components
rest.port=8083
rest.advertised.host.name=localhost
rest.advertised.port=8083
EOF
chown $KAFKA_CONNECT_USER:$KAFKA_CONNECT_USER $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
chmod 644 $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
echo -e "${GREEN}[7/8] Installing Debezium connectors...${NC}"
sudo -u $KAFKA_CONNECT_USER $CONFLUENT_HOME/bin/confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.4.0
sudo -u $KAFKA_CONNECT_USER $CONFLUENT_HOME/bin/confluent-hub install --no-prompt debezium/debezium-connector-mysql:2.4.0
sudo -u $KAFKA_CONNECT_USER $CONFLUENT_HOME/bin/confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:2.4.0
echo -e "${GREEN}[8/8] Creating systemd service and starting Kafka Connect...${NC}"
cat > /etc/systemd/system/kafka-connect.service << EOF
[Unit]
Description=Apache Kafka Connect
Requires=network.target
After=network.target
[Service]
Type=simple
User=$KAFKA_CONNECT_USER
Group=$KAFKA_CONNECT_USER
ExecStart=$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties
Restart=on-failure
RestartSec=10
KillMode=process
TimeoutSec=300
Environment=JAVA_HOME=$JAVA_HOME_PATH
Environment=CONFLUENT_HOME=$CONFLUENT_HOME
[Install]
WantedBy=multi-user.target
EOF
chmod 644 /etc/systemd/system/kafka-connect.service
systemctl daemon-reload
systemctl enable kafka-connect
# Configure firewall based on distribution
if command -v firewall-cmd &> /dev/null; then
firewall-cmd --permanent --add-port=8083/tcp
firewall-cmd --reload
elif command -v ufw &> /dev/null; then
ufw allow 8083/tcp
fi
echo -e "${GREEN}Kafka Connect installation completed successfully!${NC}"
echo -e "${YELLOW}Note: Make sure Kafka is running before starting Kafka Connect${NC}"
echo -e "${YELLOW}To start Kafka Connect: sudo systemctl start kafka-connect${NC}"
echo -e "${YELLOW}To check status: sudo systemctl status kafka-connect${NC}"
echo -e "${YELLOW}REST API will be available at: http://localhost:8083${NC}"
# Verification checks
echo -e "${GREEN}Running verification checks...${NC}"
if systemctl is-enabled kafka-connect &>/dev/null; then
echo -e "${GREEN}✓ Kafka Connect service is enabled${NC}"
else
echo -e "${RED}✗ Kafka Connect service is not enabled${NC}"
fi
if [ -f "$CONFLUENT_HOME/bin/connect-distributed" ]; then
echo -e "${GREEN}✓ Confluent Platform is installed${NC}"
else
echo -e "${RED}✗ Confluent Platform installation failed${NC}"
fi
if [ -f "$JDBC_DIR/postgresql-42.7.1.jar" ]; then
echo -e "${GREEN}✓ JDBC drivers are installed${NC}"
else
echo -e "${RED}✗ JDBC drivers installation failed${NC}"
fi
echo -e "${GREEN}Installation completed! You can now configure your database connectors.${NC}"
Review the script before running. Execute with: bash install.sh