Configure Confluent Schema Registry with Avro serialization for production Kafka deployments. Includes schema evolution, producer/consumer integration, and SSL security configuration.
Prerequisites
- Apache Kafka cluster running
- Java 11 or higher
- Administrative access
- 4GB+ RAM recommended
- Network access to Kafka brokers
What this solves
Schema Registry provides centralized schema management for Kafka topics, enabling schema evolution and data compatibility across your streaming applications. This tutorial sets up Confluent Schema Registry with Avro serialization, authentication, and SSL security for production data processing pipelines.
Step-by-step installation
Install Java and dependencies
Schema Registry requires Java 11 or higher. Install the required dependencies first.
sudo apt update
sudo apt install -y openjdk-11-jdk wget curl unzipCreate dedicated user for Schema Registry
Run Schema Registry under a dedicated user account for security isolation.
sudo useradd --system --home-dir /opt/schema-registry --shell /bin/false --create-home schema-registryDownload and install Confluent Platform
Download the Confluent Platform which includes Schema Registry, or install just the Schema Registry component.
cd /tmp
wget https://packages.confluent.io/archive/7.5/confluent-7.5.0.tar.gz
tar -xzf confluent-7.5.0.tar.gz
sudo mv confluent-7.5.0 /opt/confluent
sudo chown -R schema-registry:schema-registry /opt/confluentConfigure Schema Registry properties
Create the main configuration file with connection details for your Kafka cluster.
# Schema Registry listeners
listeners=http://0.0.0.0:8081
Kafka cluster connection
kafkastore.bootstrap.servers=localhost:9092
kafkastore.topic=_schemas
kafkastore.topic.replication.factor=3
Schema Registry identity
schema.registry.group.id=schema-registry
Security settings
kafkastore.security.protocol=PLAINTEXT
schema.registry.inter.instance.protocol=http
Avro compatibility settings
avro.compatibility.level=BACKWARD
schema.compatibility.level=BACKWARD
Response cache settings
response.mediatype.preferred=application/vnd.schemaregistry.v1+json
response.mediatype.default=application/vnd.schemaregistry.v1+jsonConfigure Schema Registry with SSL
Enable SSL encryption for production deployments. Create SSL configuration.
# HTTPS listeners
listeners=https://0.0.0.0:8081
SSL keystore configuration
ssl.keystore.location=/opt/confluent/ssl/schema-registry.keystore.jks
ssl.keystore.password=changeme
ssl.key.password=changeme
SSL truststore configuration
ssl.truststore.location=/opt/confluent/ssl/schema-registry.truststore.jks
ssl.truststore.password=changeme
SSL client authentication
ssl.client.auth=required
Kafka SSL connection
kafkastore.bootstrap.servers=localhost:9093
kafkastore.security.protocol=SSL
kafkastore.ssl.keystore.location=/opt/confluent/ssl/client.keystore.jks
kafkastore.ssl.keystore.password=changeme
kafkastore.ssl.key.password=changeme
kafkastore.ssl.truststore.location=/opt/confluent/ssl/client.truststore.jks
kafkastore.ssl.truststore.password=changemeCreate SSL certificates
Generate SSL certificates for Schema Registry and client authentication.
sudo mkdir -p /opt/confluent/ssl
cd /opt/confluent/ssl
Generate CA certificate
sudo keytool -genkey -keyalg RSA -alias ca-cert -keystore ca.keystore.jks -validity 365 -dname "CN=example.com,OU=IT,O=Example,L=City,S=State,C=US" -storepass changeme -keypass changeme
Export CA certificate
sudo keytool -export -alias ca-cert -file ca-cert -keystore ca.keystore.jks -storepass changeme
Create truststore and import CA
sudo keytool -import -alias ca-cert -file ca-cert -keystore schema-registry.truststore.jks -storepass changeme -noprompt
Generate server certificate
sudo keytool -genkey -keyalg RSA -alias schema-registry -keystore schema-registry.keystore.jks -validity 365 -dname "CN=schema-registry.example.com,OU=IT,O=Example,L=City,S=State,C=US" -storepass changeme -keypass changeme
Sign server certificate with CA
sudo keytool -certreq -alias schema-registry -file schema-registry.csr -keystore schema-registry.keystore.jks -storepass changeme
sudo keytool -gencert -alias ca-cert -infile schema-registry.csr -outfile schema-registry.crt -keystore ca.keystore.jks -storepass changeme -validity 365
sudo keytool -import -alias ca-cert -file ca-cert -keystore schema-registry.keystore.jks -storepass changeme -noprompt
sudo keytool -import -alias schema-registry -file schema-registry.crt -keystore schema-registry.keystore.jks -storepass changeme
Set permissions
sudo chown -R schema-registry:schema-registry /opt/confluent/ssl
sudo chmod 600 /opt/confluent/ssl/*.jksCreate systemd service
Create a systemd service to manage Schema Registry startup and monitoring.
[Unit]
Description=Confluent Schema Registry
After=network.target
Requires=network.target
[Service]
Type=simple
User=schema-registry
Group=schema-registry
ExecStart=/opt/confluent/bin/schema-registry-start /opt/confluent/etc/schema-registry/schema-registry.properties
ExecStop=/opt/confluent/bin/schema-registry-stop
Restart=on-failure
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier=schema-registry
KillMode=process
TimeoutStopSec=300
Security settings
NoNewPrivileges=yes
PrivateTmp=yes
ProtectSystem=strict
ProtectHome=yes
ReadWritePaths=/opt/confluent/logs
Environment
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
Environment=SCHEMA_REGISTRY_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=SCHEMA_REGISTRY_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
[Install]
WantedBy=multi-user.targetConfigure log directory and permissions
Set up logging directory with correct ownership and permissions.
sudo mkdir -p /opt/confluent/logs
sudo chown schema-registry:schema-registry /opt/confluent/logs
sudo chmod 755 /opt/confluent/logsConfigure authentication
Enable basic authentication for Schema Registry access control.
# Enable authentication
authentication.method=BASIC
authentication.realm=SchemaRegistry
authentication.roles=admin,developer,readonly
JAAS configuration
schema.registry.resource.extension.class=io.confluent.kafka.schemaregistry.security.SchemaRegistrySecurityResourceExtension
confluent.schema.registry.auth.mechanism=JETTY_AUTH
schema.registry.auth.mechanism=JETTY_AUTHCreate user credentials file
Define users and their access levels for Schema Registry.
# Format: username: password,role1,role2
admin: admin123,admin
producer: producer123,developer
consumer: consumer123,readonly
app-user: app-secret,developerEnable and start Schema Registry
Start the Schema Registry service and enable it to start on boot.
sudo systemctl daemon-reload
sudo systemctl enable schema-registry
sudo systemctl start schema-registry
sudo systemctl status schema-registryConfigure Avro schema management
Create sample Avro schema
Define an Avro schema for your data structure with proper field types and evolution support.
{
"namespace": "com.example.users",
"type": "record",
"name": "User",
"version": "1",
"fields": [
{
"name": "id",
"type": "long",
"doc": "User unique identifier"
},
{
"name": "username",
"type": "string",
"doc": "User login name"
},
{
"name": "email",
"type": "string",
"doc": "User email address"
},
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis",
"doc": "User creation timestamp"
},
{
"name": "metadata",
"type": ["null", {
"type": "map",
"values": "string"
}],
"default": null,
"doc": "Optional user metadata"
}
]
}Register schema with Schema Registry
Upload the schema to Schema Registry using the REST API.
# Register schema for a subject (topic)
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @/tmp/user-schema.json \
http://localhost:8081/subjects/users-value/versions
Register with authentication
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
-u admin:admin123 \
--data '{"schema": "{\"namespace\": \"com.example.users\", \"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\": \"long\"}, {\"name\": \"username\", \"type\": \"string\"}, {\"name\": \"email\", \"type\": \"string\"}, {\"name\": \"created_at\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\"}, {\"name\": \"metadata\", \"type\": [\"null\", {\"type\": \"map\", \"values\": \"string\"}], \"default\": null}]}"}' \
http://localhost:8081/subjects/users-value/versionsConfigure producer and consumer integration
Java producer configuration
Configure a Kafka producer to use Avro serialization with Schema Registry.
# Kafka cluster connection
bootstrap.servers=localhost:9092
Schema Registry settings
schema.registry.url=http://localhost:8081
schema.registry.basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=producer:producer123
Avro serialization
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
Producer settings
acks=all
retries=2147483647
max.in.flight.requests.per.connection=1
enable.idempotence=true
compression.type=snappy
Schema evolution
auto.register.schemas=false
use.latest.version=trueJava consumer configuration
Configure a Kafka consumer to deserialize Avro messages using Schema Registry.
# Kafka cluster connection
bootstrap.servers=localhost:9092
Schema Registry settings
schema.registry.url=http://localhost:8081
schema.registry.basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=consumer:consumer123
Avro deserialization
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
Consumer settings
group.id=user-consumer-group
auto.offset.reset=earliest
enable.auto.commit=false
Avro specific settings
specific.avro.reader=truePython client configuration
Configure Python producer and consumer using confluent-kafka with Avro support.
# Install Python dependencies
pip install confluent-kafka[avro] requestsfrom confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
import json
Schema Registry configuration
schema_registry_conf = {
'url': 'http://localhost:8081',
'basic.auth.user.info': 'producer:producer123'
}
Producer configuration
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081',
'schema.registry.basic.auth.credentials.source': 'USER_INFO',
'schema.registry.basic.auth.user.info': 'producer:producer123'
}
Load schema
with open('/tmp/user-schema.json', 'r') as f:
user_schema = json.load(f)
Create producer
producer = AvroProducer(producer_conf, default_value_schema=avro.loads(json.dumps(user_schema)))
Send message
user_data = {
'id': 12345,
'username': 'john_doe',
'email': 'john@example.com',
'created_at': 1703097600000,
'metadata': {'role': 'user', 'department': 'engineering'}
}
producer.produce(topic='users', value=user_data, key='user_12345')
producer.flush()Configure schema evolution
Enable compatibility checking
Configure compatibility levels to control schema evolution policies.
# Set global compatibility level
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
-u admin:admin123 \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config
Set subject-specific compatibility
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
-u admin:admin123 \
--data '{"compatibility": "BACKWARD_TRANSITIVE"}' \
http://localhost:8081/config/users-valueCreate evolved schema version
Add a new optional field to demonstrate backward-compatible schema evolution.
{
"namespace": "com.example.users",
"type": "record",
"name": "User",
"version": "2",
"fields": [
{
"name": "id",
"type": "long",
"doc": "User unique identifier"
},
{
"name": "username",
"type": "string",
"doc": "User login name"
},
{
"name": "email",
"type": "string",
"doc": "User email address"
},
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis",
"doc": "User creation timestamp"
},
{
"name": "last_login",
"type": ["null", "long"],
"logicalType": "timestamp-millis",
"default": null,
"doc": "Last login timestamp (new field)"
},
{
"name": "metadata",
"type": ["null", {
"type": "map",
"values": "string"
}],
"default": null,
"doc": "Optional user metadata"
}
]
}Register evolved schema
Upload the new schema version and verify compatibility.
# Test compatibility before registering
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
-u admin:admin123 \
--data @/tmp/user-schema-v2.json \
http://localhost:8081/compatibility/subjects/users-value/versions/latest
Register new version if compatible
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
-u admin:admin123 \
--data @/tmp/user-schema-v2.json \
http://localhost:8081/subjects/users-value/versionsVerify your setup
# Check Schema Registry health
curl http://localhost:8081/
List all subjects
curl -u admin:admin123 http://localhost:8081/subjects
Get latest schema version
curl -u admin:admin123 http://localhost:8081/subjects/users-value/versions/latest
Check compatibility settings
curl -u admin:admin123 http://localhost:8081/config
Verify service status
sudo systemctl status schema-registry
Check logs
journalctl -u schema-registry -fCommon issues
| Symptom | Cause | Fix |
|---|---|---|
| Schema Registry won't start | Kafka not available | Ensure Kafka cluster is running and accessible |
| SSL connection fails | Certificate mismatch | Verify certificate CN matches hostname |
| Schema registration fails | Authentication required | Check credentials in client configuration |
| Compatibility check fails | Breaking schema changes | Add default values for new required fields |
| Consumer deserialization error | Schema not found | Verify subject name matches topic naming |
| Permission denied errors | Incorrect file ownership | chown -R schema-registry:schema-registry /opt/confluent |
Next steps
- Set up Kafka cluster with monitoring
- Configure Kafka Connect for database integration
- Build Kafka Streams applications for real-time processing
- Monitor Kafka and Schema Registry with Prometheus
- Configure Kafka SASL authentication and ACLs
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Configuration
CONFLUENT_VERSION="7.5.0"
SCHEMA_REGISTRY_USER="schema-registry"
SCHEMA_REGISTRY_HOME="/opt/schema-registry"
CONFLUENT_HOME="/opt/confluent"
SSL_DIR="/opt/confluent/ssl"
KAFKA_BOOTSTRAP_SERVERS="${1:-localhost:9092}"
DOMAIN_NAME="${2:-schema-registry.example.com}"
# Usage message
usage() {
echo "Usage: $0 [kafka_bootstrap_servers] [domain_name]"
echo "Example: $0 localhost:9092 schema-registry.example.com"
exit 1
}
# Error handling
cleanup() {
echo -e "${RED}[ERROR] Installation failed. Cleaning up...${NC}"
systemctl stop confluent-schema-registry 2>/dev/null || true
systemctl disable confluent-schema-registry 2>/dev/null || true
rm -f /etc/systemd/system/confluent-schema-registry.service
rm -rf "$CONFLUENT_HOME" "$SCHEMA_REGISTRY_HOME"
userdel "$SCHEMA_REGISTRY_USER" 2>/dev/null || true
}
trap cleanup ERR
# Check if running as root
if [[ $EUID -ne 0 ]]; then
echo -e "${RED}[ERROR] This script must be run as root${NC}"
exit 1
fi
# Auto-detect distribution
echo -e "${YELLOW}[1/10] Detecting distribution...${NC}"
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_UPDATE="apt update"
PKG_INSTALL="apt install -y"
JAVA_PKG="openjdk-11-jdk"
FIREWALL_CMD="ufw"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_UPDATE="dnf update -y"
PKG_INSTALL="dnf install -y"
JAVA_PKG="java-11-openjdk java-11-openjdk-devel"
FIREWALL_CMD="firewall-cmd"
;;
amzn)
PKG_MGR="yum"
PKG_UPDATE="yum update -y"
PKG_INSTALL="yum install -y"
JAVA_PKG="java-11-openjdk java-11-openjdk-devel"
FIREWALL_CMD="firewall-cmd"
;;
*)
echo -e "${RED}[ERROR] Unsupported distribution: $ID${NC}"
exit 1
;;
esac
else
echo -e "${RED}[ERROR] Cannot detect distribution${NC}"
exit 1
fi
echo -e "${GREEN}Detected: $PRETTY_NAME${NC}"
# Update system and install dependencies
echo -e "${YELLOW}[2/10] Installing Java and dependencies...${NC}"
$PKG_UPDATE
$PKG_INSTALL $JAVA_PKG wget curl unzip tar
# Create dedicated user
echo -e "${YELLOW}[3/10] Creating dedicated user...${NC}"
if ! id "$SCHEMA_REGISTRY_USER" &>/dev/null; then
useradd --system --home-dir "$SCHEMA_REGISTRY_HOME" --shell /bin/false --create-home "$SCHEMA_REGISTRY_USER"
fi
# Download and install Confluent Platform
echo -e "${YELLOW}[4/10] Downloading Confluent Platform...${NC}"
cd /tmp
if [ ! -f "confluent-${CONFLUENT_VERSION}.tar.gz" ]; then
wget "https://packages.confluent.io/archive/7.5/confluent-${CONFLUENT_VERSION}.tar.gz"
fi
echo -e "${YELLOW}[5/10] Installing Confluent Platform...${NC}"
tar -xzf "confluent-${CONFLUENT_VERSION}.tar.gz"
if [ -d "$CONFLUENT_HOME" ]; then
rm -rf "$CONFLUENT_HOME"
fi
mv "confluent-${CONFLUENT_VERSION}" "$CONFLUENT_HOME"
chown -R "$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_USER" "$CONFLUENT_HOME"
# Create SSL directory and certificates
echo -e "${YELLOW}[6/10] Creating SSL certificates...${NC}"
mkdir -p "$SSL_DIR"
cd "$SSL_DIR"
# Generate CA certificate
keytool -genkey -keyalg RSA -alias ca-cert -keystore ca.keystore.jks -validity 365 \
-dname "CN=$DOMAIN_NAME,OU=IT,O=Example,L=City,S=State,C=US" \
-storepass changeme -keypass changeme
# Export CA certificate
keytool -export -alias ca-cert -file ca-cert -keystore ca.keystore.jks -storepass changeme
# Create truststore and import CA
keytool -import -alias ca-cert -file ca-cert -keystore schema-registry.truststore.jks \
-storepass changeme -noprompt
# Generate server certificate
keytool -genkey -keyalg RSA -alias schema-registry -keystore schema-registry.keystore.jks -validity 365 \
-dname "CN=$DOMAIN_NAME,OU=IT,O=Example,L=City,S=State,C=US" \
-storepass changeme -keypass changeme
# Create client certificates
keytool -genkey -keyalg RSA -alias client -keystore client.keystore.jks -validity 365 \
-dname "CN=client,OU=IT,O=Example,L=City,S=State,C=US" \
-storepass changeme -keypass changeme
cp schema-registry.truststore.jks client.truststore.jks
# Set proper ownership and permissions
chown -R "$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_USER" "$SSL_DIR"
chmod 750 "$SSL_DIR"
chmod 640 "$SSL_DIR"/*.jks
# Create configuration file
echo -e "${YELLOW}[7/10] Creating configuration...${NC}"
cat > "$CONFLUENT_HOME/etc/schema-registry/schema-registry.properties" << EOF
# Schema Registry listeners
listeners=http://0.0.0.0:8081
# Kafka cluster connection
kafkastore.bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS
kafkastore.topic=_schemas
kafkastore.topic.replication.factor=3
# Schema Registry identity
schema.registry.group.id=schema-registry
# Security settings
kafkastore.security.protocol=PLAINTEXT
schema.registry.inter.instance.protocol=http
# Avro compatibility settings
avro.compatibility.level=BACKWARD
schema.compatibility.level=BACKWARD
# Response cache settings
response.mediatype.preferred=application/vnd.schemaregistry.v1+json
response.mediatype.default=application/vnd.schemaregistry.v1+json
EOF
chown "$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_USER" "$CONFLUENT_HOME/etc/schema-registry/schema-registry.properties"
chmod 640 "$CONFLUENT_HOME/etc/schema-registry/schema-registry.properties"
# Create systemd service
echo -e "${YELLOW}[8/10] Creating systemd service...${NC}"
cat > /etc/systemd/system/confluent-schema-registry.service << EOF
[Unit]
Description=Confluent Schema Registry
After=network.target
[Service]
Type=simple
User=$SCHEMA_REGISTRY_USER
Group=$SCHEMA_REGISTRY_USER
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk
ExecStart=$CONFLUENT_HOME/bin/schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
ExecStop=$CONFLUENT_HOME/bin/schema-registry-stop
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
EOF
# Configure firewall
echo -e "${YELLOW}[9/10] Configuring firewall...${NC}"
if command -v ufw >/dev/null 2>&1; then
ufw allow 8081/tcp
elif command -v firewall-cmd >/dev/null 2>&1; then
firewall-cmd --permanent --add-port=8081/tcp
firewall-cmd --reload
fi
# Enable and start service
systemctl daemon-reload
systemctl enable confluent-schema-registry
systemctl start confluent-schema-registry
# Verification
echo -e "${YELLOW}[10/10] Verifying installation...${NC}"
sleep 10
if systemctl is-active --quiet confluent-schema-registry; then
echo -e "${GREEN}✓ Schema Registry service is running${NC}"
else
echo -e "${RED}✗ Schema Registry service failed to start${NC}"
exit 1
fi
if curl -s http://localhost:8081/subjects >/dev/null; then
echo -e "${GREEN}✓ Schema Registry API is responding${NC}"
else
echo -e "${RED}✗ Schema Registry API is not responding${NC}"
exit 1
fi
echo -e "${GREEN}Installation completed successfully!${NC}"
echo -e "${YELLOW}Schema Registry is available at: http://localhost:8081${NC}"
echo -e "${YELLOW}SSL certificates are located at: $SSL_DIR${NC}"
echo -e "${YELLOW}Configuration file: $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties${NC}"
echo ""
echo "To test the installation:"
echo " curl http://localhost:8081/subjects"
echo ""
echo "To enable SSL, update the configuration file with SSL settings and restart the service."
Review the script before running. Execute with: bash install.sh