Set up Confluent Schema Registry with Apache Kafka to manage Avro schemas for data serialization. Configure producers and consumers with schema evolution and compatibility rules for production data processing workflows.
Prerequisites
- Apache Kafka cluster running
- Java 8 or higher installed
- Python 3 with pip available
- Root or sudo access
What this solves
Kafka Schema Registry provides centralized schema management for Avro, JSON, and Protobuf data formats in Apache Kafka. It ensures data compatibility across producers and consumers, enables schema evolution, and prevents data corruption from incompatible schema changes in production environments.
Step-by-step installation
Update system packages
Start by updating your package manager to ensure you have the latest security patches.
sudo apt update && sudo apt upgrade -y
Install Java runtime environment
Schema Registry requires Java 8 or higher. Install OpenJDK 11 for optimal performance and long-term support.
sudo apt install -y openjdk-11-jdk
java -version
Create dedicated user for Schema Registry
Create a system user for running Schema Registry service with minimal privileges for security.
sudo useradd -r -s /bin/false -d /opt/schema-registry schema-registry
sudo mkdir -p /opt/schema-registry /var/log/schema-registry
sudo chown -R schema-registry:schema-registry /opt/schema-registry /var/log/schema-registry
Download and install Confluent Schema Registry
Download the latest Confluent Platform which includes Schema Registry, or use the open-source version.
cd /tmp
wget https://packages.confluent.io/archive/7.5/confluent-7.5.1.tar.gz
tar -xzf confluent-7.5.1.tar.gz
sudo mv confluent-7.5.1 /opt/confluent
sudo chown -R schema-registry:schema-registry /opt/confluent
Configure Schema Registry properties
Create the main configuration file with Kafka bootstrap servers and storage topic settings.
# Kafka cluster connection
bootstrap.servers=localhost:9092
Schema Registry listeners
listeners=http://0.0.0.0:8081
Topic for storing schemas
kafka.topic=_schemas
kafka.topic.replication.factor=3
Topic for storing schema IDs
kafka.topic.schema.registry.store=_schemas
Master eligibility
master.eligibility=true
Schema compatibility level
schema.compatibility.level=BACKWARD
Authentication (if using SASL)
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="schema-registry" password="your-password";
SSL configuration (if using SSL)
ssl.truststore.location=/opt/confluent/etc/schema-registry/truststore.jks
ssl.truststore.password=your-truststore-password
ssl.keystore.location=/opt/confluent/etc/schema-registry/keystore.jks
ssl.keystore.password=your-keystore-password
ssl.key.password=your-key-password
Create systemd service file
Set up Schema Registry as a system service for automatic startup and management.
[Unit]
Description=Confluent Schema Registry
After=network.target
Requires=network.target
[Service]
Type=simple
User=schema-registry
Group=schema-registry
ExecStart=/opt/confluent/bin/schema-registry-start /opt/confluent/etc/schema-registry/schema-registry.properties
ExecStop=/opt/confluent/bin/schema-registry-stop
Restart=always
RestartSec=5
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
Environment=SCHEMA_REGISTRY_HEAP_OPTS="-Xms512M -Xmx1G"
StandardOutput=append:/var/log/schema-registry/schema-registry.log
StandardError=append:/var/log/schema-registry/schema-registry-error.log
[Install]
WantedBy=multi-user.target
Configure log rotation
Set up log rotation to prevent disk space issues from accumulating log files.
/var/log/schema-registry/*.log {
daily
missingok
rotate 7
compress
notifempty
create 644 schema-registry schema-registry
postrotate
systemctl reload schema-registry
endscript
}
Start and enable Schema Registry
Enable the service to start automatically on boot and start it immediately.
sudo systemctl daemon-reload
sudo systemctl enable schema-registry
sudo systemctl start schema-registry
sudo systemctl status schema-registry
Configure Avro schema management
Install Schema Registry CLI tools
Install the command-line tools for managing schemas and interacting with the registry.
sudo apt install -y curl jq python3-pip
pip3 install confluent-kafka avro-python3
Create sample Avro schema
Define a user profile schema to demonstrate schema registration and evolution.
{
"type": "record",
"name": "UserProfile",
"namespace": "com.example.user",
"fields": [
{
"name": "user_id",
"type": "long"
},
{
"name": "username",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis"
}
]
}
Register the schema
Register your first schema version with the Schema Registry using the REST API.
curl -X POST http://localhost:8081/subjects/user-profile-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\": \"record\", \"name\": \"UserProfile\", \"namespace\": \"com.example.user\", \"fields\": [{\"name\": \"user_id\", \"type\": \"long\"}, {\"name\": \"username\", \"type\": \"string\"}, {\"name\": \"email\", \"type\": \"string\"}, {\"name\": \"created_at\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\"}]}"
}'
List all subjects
curl http://localhost:8081/subjects
Configure schema compatibility
Set backward compatibility to ensure new schema versions can read old data.
# Set global compatibility level
curl -X PUT http://localhost:8081/config \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
Set subject-specific compatibility
curl -X PUT http://localhost:8081/config/user-profile-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
Check compatibility settings
curl http://localhost:8081/config
curl http://localhost:8081/config/user-profile-value
Configure Kafka producers with Avro
Create Python producer example
Build a producer that serializes data using the registered Avro schema.
#!/usr/bin/env python3
from confluent_kafka import Producer
from confluent_kafka.avro import AvroProducer
from confluent_kafka.avro.serializer import SerializerError
import time
import json
Schema Registry and Kafka configuration
conf = {
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
}
Avro schema (must match registered schema)
value_schema_str = """
{
"type": "record",
"name": "UserProfile",
"namespace": "com.example.user",
"fields": [
{
"name": "user_id",
"type": "long"
},
{
"name": "username",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis"
}
]
}
"""
key_schema_str = '{"type": "string"}'
def delivery_report(err, msg):
"""Callback for message delivery reports."""
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
def main():
# Create AvroProducer
avroProducer = AvroProducer(conf, default_key_schema=key_schema_str,
default_value_schema=value_schema_str)
# Sample user data
users = [
{
"user_id": 1001,
"username": "alice_smith",
"email": "alice@example.com",
"created_at": int(time.time() * 1000)
},
{
"user_id": 1002,
"username": "bob_jones",
"email": "bob@example.com",
"created_at": int(time.time() * 1000)
}
]
try:
for user in users:
# Produce Avro encoded messages
avroProducer.produce(topic='user-profiles',
key=user['username'],
value=user,
callback=delivery_report)
print(f"Produced user: {user['username']}")
# Wait for messages to be delivered
avroProducer.flush()
except SerializerError as e:
print(f"Message serialization failed: {e}")
except Exception as e:
print(f"Error: {e}")
if __name__ == '__main__':
main()
Create Python consumer example
Build a consumer that deserializes Avro data using the Schema Registry.
#!/usr/bin/env python3
from confluent_kafka import Consumer
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
import sys
Consumer configuration
conf = {
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081',
'group.id': 'user-profile-consumer',
'auto.offset.reset': 'earliest'
}
def main():
# Create AvroConsumer
avroConsumer = AvroConsumer(conf)
# Subscribe to topic
avroConsumer.subscribe(['user-profiles'])
try:
while True:
try:
# Poll for messages
msg = avroConsumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
# Message successfully consumed
key = msg.key()
value = msg.value()
print(f"Received message:")
print(f" Key: {key}")
print(f" Value: {value}")
print(f" Topic: {msg.topic()}")
print(f" Partition: {msg.partition()}")
print(f" Offset: {msg.offset()}")
print("-" * 50)
except SerializerError as e:
print(f"Message deserialization failed: {e}")
except KeyboardInterrupt:
break
except Exception as e:
print(f"Error: {e}")
finally:
# Close consumer
avroConsumer.close()
if __name__ == '__main__':
main()
Set correct permissions for scripts
Make the producer and consumer scripts executable by the schema-registry user.
sudo chown schema-registry:schema-registry /opt/schema-registry/avro-producer.py /opt/schema-registry/avro-consumer.py
sudo chmod 755 /opt/schema-registry/avro-producer.py /opt/schema-registry/avro-consumer.py
Implement schema evolution
Create evolved schema version
Add optional fields to demonstrate backward-compatible schema evolution.
{
"type": "record",
"name": "UserProfile",
"namespace": "com.example.user",
"fields": [
{
"name": "user_id",
"type": "long"
},
{
"name": "username",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "phone_number",
"type": ["null", "string"],
"default": null
},
{
"name": "status",
"type": {
"type": "enum",
"name": "UserStatus",
"symbols": ["ACTIVE", "INACTIVE", "PENDING"]
},
"default": "ACTIVE"
}
]
}
Test schema compatibility
Verify the new schema version is compatible before registering it.
# Test compatibility of new schema
curl -X POST http://localhost:8081/compatibility/subjects/user-profile-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d @- <<'EOF'
{
"schema": "{\"type\": \"record\", \"name\": \"UserProfile\", \"namespace\": \"com.example.user\", \"fields\": [{\"name\": \"user_id\", \"type\": \"long\"}, {\"name\": \"username\", \"type\": \"string\"}, {\"name\": \"email\", \"type\": \"string\"}, {\"name\": \"created_at\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\"}, {\"name\": \"phone_number\", \"type\": [\"null\", \"string\"], \"default\": null}, {\"name\": \"status\", \"type\": {\"type\": \"enum\", \"name\": \"UserStatus\", \"symbols\": [\"ACTIVE\", \"INACTIVE\", \"PENDING\"]}, \"default\": \"ACTIVE\"}]}"
}
EOF
Register evolved schema
Register the new schema version if compatibility check passes.
# Register schema version 2
curl -X POST http://localhost:8081/subjects/user-profile-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d @- <<'EOF'
{
"schema": "{\"type\": \"record\", \"name\": \"UserProfile\", \"namespace\": \"com.example.user\", \"fields\": [{\"name\": \"user_id\", \"type\": \"long\"}, {\"name\": \"username\", \"type\": \"string\"}, {\"name\": \"email\", \"type\": \"string\"}, {\"name\": \"created_at\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\"}, {\"name\": \"phone_number\", \"type\": [\"null\", \"string\"], \"default\": null}, {\"name\": \"status\", \"type\": {\"type\": \"enum\", \"name\": \"UserStatus\", \"symbols\": [\"ACTIVE\", \"INACTIVE\", \"PENDING\"]}, \"default\": \"ACTIVE\"}]}"
}
EOF
List all versions for the subject
curl http://localhost:8081/subjects/user-profile-value/versions
Configure security and monitoring
Configure firewall rules
Allow Schema Registry traffic through the firewall on port 8081.
sudo ufw allow 8081/tcp comment "Schema Registry"
sudo ufw reload
sudo ufw status numbered
Configure JMX monitoring
Enable JMX metrics for monitoring Schema Registry performance.
# Add JMX configuration for monitoring
jmx.port=9999
Additional monitoring settings
metrics.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.bootstrap.servers=localhost:9092
confluent.metrics.reporter.topic=_confluent-metrics
Create health check script
Monitor Schema Registry health and schema availability.
#!/bin/bash
Schema Registry health check script
REGISTRY_URL="http://localhost:8081"
LOG_FILE="/var/log/schema-registry/health-check.log"
log_message() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> "$LOG_FILE"
}
Check if Schema Registry is responding
if curl -s -f "$REGISTRY_URL/subjects" > /dev/null; then
log_message "SUCCESS: Schema Registry is healthy"
# Check specific subject
SUBJECT_COUNT=$(curl -s "$REGISTRY_URL/subjects" | jq length)
log_message "INFO: $SUBJECT_COUNT subjects registered"
exit 0
else
log_message "ERROR: Schema Registry health check failed"
systemctl status schema-registry >> "$LOG_FILE"
exit 1
fi
sudo chown schema-registry:schema-registry /opt/schema-registry/health-check.sh
sudo chmod 755 /opt/schema-registry/health-check.sh
Add to crontab for regular health checks
echo "/5 * /opt/schema-registry/health-check.sh" | sudo crontab -u schema-registry -
Verify your setup
# Check Schema Registry status
sudo systemctl status schema-registry
Verify Schema Registry is listening
sudo netstat -tlnp | grep :8081
Test API endpoints
curl http://localhost:8081/subjects
curl http://localhost:8081/config
curl http://localhost:8081/subjects/user-profile-value/versions
Test producer and consumer
cd /opt/schema-registry
python3 avro-producer.py
In another terminal
cd /opt/schema-registry
python3 avro-consumer.py
Check logs
sudo tail -f /var/log/schema-registry/schema-registry.log
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Schema Registry won't start | Kafka not running or incorrect bootstrap.servers | Start Kafka first, verify connection: systemctl status kafka |
| Schema compatibility errors | Breaking changes in schema evolution | Add default values for new fields, check compatibility: curl http://localhost:8081/compatibility/subjects/topic/versions/latest |
| Producer serialization fails | Schema mismatch or registry unreachable | Verify schema registration: curl http://localhost:8081/subjects |
| Consumer deserialization errors | Schema evolution without proper compatibility | Check schema versions: curl http://localhost:8081/subjects/topic/versions |
| Permission denied errors | Incorrect file ownership | Fix ownership: sudo chown -R schema-registry:schema-registry /opt/schema-registry |
| Out of memory errors | Insufficient JVM heap size | Increase heap in service file: SCHEMA_REGISTRY_HEAP_OPTS="-Xms1G -Xmx2G" |
Next steps
- Set up Apache Kafka cluster for production Schema Registry deployment
- Configure Prometheus monitoring for Schema Registry JMX metrics
- Build Kafka Streams applications with Avro schema integration
- Configure Kafka Connect with Schema Registry for database integration
- Enable SSL/TLS encryption for Schema Registry security
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Default values
KAFKA_BOOTSTRAP_SERVERS="${1:-localhost:9092}"
SCHEMA_REGISTRY_PORT="${2:-8081}"
CONFLUENT_VERSION="7.5.1"
# Usage function
usage() {
echo "Usage: $0 [kafka_bootstrap_servers] [schema_registry_port]"
echo "Example: $0 localhost:9092 8081"
echo "Example: $0 kafka1:9092,kafka2:9092,kafka3:9092 8081"
exit 1
}
# Logging functions
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Cleanup function
cleanup() {
log_error "Installation failed. Cleaning up..."
systemctl stop schema-registry 2>/dev/null || true
systemctl disable schema-registry 2>/dev/null || true
rm -f /etc/systemd/system/schema-registry.service
rm -rf /opt/confluent /opt/schema-registry /var/log/schema-registry
userdel -r schema-registry 2>/dev/null || true
exit 1
}
trap cleanup ERR
# Check if running as root or with sudo
if [[ $EUID -ne 0 ]]; then
log_error "This script must be run as root or with sudo"
exit 1
fi
# Detect distribution
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update && apt upgrade -y"
JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64"
JAVA_PACKAGE="openjdk-11-jdk"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
JAVA_HOME="/usr/lib/jvm/java-11-openjdk"
JAVA_PACKAGE="java-11-openjdk java-11-openjdk-devel"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
JAVA_HOME="/usr/lib/jvm/java-11-openjdk"
JAVA_PACKAGE="java-11-openjdk java-11-openjdk-devel"
;;
*)
log_error "Unsupported distribution: $ID"
exit 1
;;
esac
else
log_error "Cannot detect OS distribution"
exit 1
fi
log_info "[1/10] Updating system packages..."
eval $PKG_UPDATE
log_info "[2/10] Installing Java Runtime Environment..."
eval "$PKG_INSTALL $JAVA_PACKAGE wget tar"
# Verify Java installation
if ! java -version >/dev/null 2>&1; then
log_error "Java installation failed"
exit 1
fi
log_info "[3/10] Creating dedicated Schema Registry user..."
useradd -r -s /bin/false -d /opt/schema-registry schema-registry 2>/dev/null || true
mkdir -p /opt/schema-registry /var/log/schema-registry
chown -R schema-registry:schema-registry /opt/schema-registry /var/log/schema-registry
chmod 755 /opt/schema-registry /var/log/schema-registry
log_info "[4/10] Downloading Confluent Schema Registry..."
cd /tmp
wget -q "https://packages.confluent.io/archive/7.5/confluent-${CONFLUENT_VERSION}.tar.gz"
tar -xzf "confluent-${CONFLUENT_VERSION}.tar.gz"
mv "confluent-${CONFLUENT_VERSION}" /opt/confluent
chown -R schema-registry:schema-registry /opt/confluent
chmod -R 755 /opt/confluent
log_info "[5/10] Creating configuration directory..."
mkdir -p /opt/confluent/etc/schema-registry
chown -R schema-registry:schema-registry /opt/confluent/etc/schema-registry
chmod 755 /opt/confluent/etc/schema-registry
log_info "[6/10] Configuring Schema Registry properties..."
cat > /opt/confluent/etc/schema-registry/schema-registry.properties << EOF
# Kafka cluster connection
bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS}
# Schema Registry listeners
listeners=http://0.0.0.0:${SCHEMA_REGISTRY_PORT}
# Topic for storing schemas
kafkastore.topic=_schemas
# Master eligibility
master.eligibility=true
# Schema compatibility level
schema.compatibility.level=BACKWARD
# Debug mode
debug=false
EOF
chown schema-registry:schema-registry /opt/confluent/etc/schema-registry/schema-registry.properties
chmod 644 /opt/confluent/etc/schema-registry/schema-registry.properties
log_info "[7/10] Creating systemd service file..."
cat > /etc/systemd/system/schema-registry.service << EOF
[Unit]
Description=Confluent Schema Registry
After=network.target
Requires=network.target
[Service]
Type=simple
User=schema-registry
Group=schema-registry
ExecStart=/opt/confluent/bin/schema-registry-start /opt/confluent/etc/schema-registry/schema-registry.properties
Restart=always
RestartSec=5
Environment=JAVA_HOME=${JAVA_HOME}
Environment=SCHEMA_REGISTRY_HEAP_OPTS="-Xms512M -Xmx1G"
StandardOutput=append:/var/log/schema-registry/schema-registry.log
StandardError=append:/var/log/schema-registry/schema-registry-error.log
KillMode=process
TimeoutStopSec=300
[Install]
WantedBy=multi-user.target
EOF
chmod 644 /etc/systemd/system/schema-registry.service
log_info "[8/10] Configuring log rotation..."
cat > /etc/logrotate.d/schema-registry << EOF
/var/log/schema-registry/*.log {
daily
missingok
rotate 7
compress
notifempty
create 644 schema-registry schema-registry
postrotate
systemctl reload schema-registry 2>/dev/null || true
endscript
}
EOF
chmod 644 /etc/logrotate.d/schema-registry
log_info "[9/10] Starting and enabling Schema Registry service..."
systemctl daemon-reload
systemctl enable schema-registry
systemctl start schema-registry
# Wait for service to start
sleep 10
log_info "[10/10] Verifying installation..."
if systemctl is-active --quiet schema-registry; then
log_info "Schema Registry service is running"
else
log_error "Schema Registry service failed to start"
systemctl status schema-registry
exit 1
fi
# Test Schema Registry REST API
if command -v curl >/dev/null 2>&1; then
if curl -sf "http://localhost:${SCHEMA_REGISTRY_PORT}/subjects" >/dev/null 2>&1; then
log_info "Schema Registry REST API is responding"
else
log_warn "Schema Registry REST API test failed - service may still be starting"
fi
else
eval "$PKG_INSTALL curl"
sleep 5
if curl -sf "http://localhost:${SCHEMA_REGISTRY_PORT}/subjects" >/dev/null 2>&1; then
log_info "Schema Registry REST API is responding"
else
log_warn "Schema Registry REST API test failed - service may still be starting"
fi
fi
log_info "Schema Registry installation completed successfully!"
log_info "Service status: $(systemctl is-active schema-registry)"
log_info "Access Schema Registry at: http://localhost:${SCHEMA_REGISTRY_PORT}"
log_info "Configuration file: /opt/confluent/etc/schema-registry/schema-registry.properties"
log_info "Logs location: /var/log/schema-registry/"
log_info ""
log_info "Useful commands:"
log_info " View status: sudo systemctl status schema-registry"
log_info " View logs: sudo tail -f /var/log/schema-registry/schema-registry.log"
log_info " List subjects: curl http://localhost:${SCHEMA_REGISTRY_PORT}/subjects"
Review the script before running. Execute with: bash install.sh