Configure Kafka Schema Registry with Avro serialization for data processing

Intermediate 25 min Apr 17, 2026 14 views
Ubuntu 24.04 Debian 12 AlmaLinux 9 Rocky Linux 9

Set up Confluent Schema Registry with Apache Kafka to manage Avro schemas for data serialization. Configure producers and consumers with schema evolution and compatibility rules for production data processing workflows.

Prerequisites

  • Apache Kafka cluster running
  • Java 8 or higher installed
  • Python 3 with pip available
  • Root or sudo access

What this solves

Kafka Schema Registry provides centralized schema management for Avro, JSON, and Protobuf data formats in Apache Kafka. It ensures data compatibility across producers and consumers, enables schema evolution, and prevents data corruption from incompatible schema changes in production environments.

Step-by-step installation

Update system packages

Start by updating your package manager to ensure you have the latest security patches.

sudo apt update && sudo apt upgrade -y
sudo dnf update -y

Install Java runtime environment

Schema Registry requires Java 8 or higher. Install OpenJDK 11 for optimal performance and long-term support.

sudo apt install -y openjdk-11-jdk
java -version
sudo dnf install -y java-11-openjdk java-11-openjdk-devel
java -version

Create dedicated user for Schema Registry

Create a system user for running Schema Registry service with minimal privileges for security.

sudo useradd -r -s /bin/false -d /opt/schema-registry schema-registry
sudo mkdir -p /opt/schema-registry /var/log/schema-registry
sudo chown -R schema-registry:schema-registry /opt/schema-registry /var/log/schema-registry

Download and install Confluent Schema Registry

Download the latest Confluent Platform which includes Schema Registry, or use the open-source version.

cd /tmp
wget https://packages.confluent.io/archive/7.5/confluent-7.5.1.tar.gz
tar -xzf confluent-7.5.1.tar.gz
sudo mv confluent-7.5.1 /opt/confluent
sudo chown -R schema-registry:schema-registry /opt/confluent

Configure Schema Registry properties

Create the main configuration file with Kafka bootstrap servers and storage topic settings.

# Kafka cluster connection
bootstrap.servers=localhost:9092

Schema Registry listeners

listeners=http://0.0.0.0:8081

Topic for storing schemas

kafka.topic=_schemas kafka.topic.replication.factor=3

Topic for storing schema IDs

kafka.topic.schema.registry.store=_schemas

Master eligibility

master.eligibility=true

Schema compatibility level

schema.compatibility.level=BACKWARD

Authentication (if using SASL)

security.protocol=SASL_SSL

sasl.mechanism=PLAIN

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="schema-registry" password="your-password";

SSL configuration (if using SSL)

ssl.truststore.location=/opt/confluent/etc/schema-registry/truststore.jks

ssl.truststore.password=your-truststore-password

ssl.keystore.location=/opt/confluent/etc/schema-registry/keystore.jks

ssl.keystore.password=your-keystore-password

ssl.key.password=your-key-password

Create systemd service file

Set up Schema Registry as a system service for automatic startup and management.

[Unit]
Description=Confluent Schema Registry
After=network.target
Requires=network.target

[Service]
Type=simple
User=schema-registry
Group=schema-registry
ExecStart=/opt/confluent/bin/schema-registry-start /opt/confluent/etc/schema-registry/schema-registry.properties
ExecStop=/opt/confluent/bin/schema-registry-stop
Restart=always
RestartSec=5
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
Environment=SCHEMA_REGISTRY_HEAP_OPTS="-Xms512M -Xmx1G"
StandardOutput=append:/var/log/schema-registry/schema-registry.log
StandardError=append:/var/log/schema-registry/schema-registry-error.log

[Install]
WantedBy=multi-user.target

Configure log rotation

Set up log rotation to prevent disk space issues from accumulating log files.

/var/log/schema-registry/*.log {
    daily
    missingok
    rotate 7
    compress
    notifempty
    create 644 schema-registry schema-registry
    postrotate
        systemctl reload schema-registry
    endscript
}

Start and enable Schema Registry

Enable the service to start automatically on boot and start it immediately.

sudo systemctl daemon-reload
sudo systemctl enable schema-registry
sudo systemctl start schema-registry
sudo systemctl status schema-registry

Configure Avro schema management

Install Schema Registry CLI tools

Install the command-line tools for managing schemas and interacting with the registry.

sudo apt install -y curl jq python3-pip
pip3 install confluent-kafka avro-python3
sudo dnf install -y curl jq python3-pip
pip3 install confluent-kafka avro-python3

Create sample Avro schema

Define a user profile schema to demonstrate schema registration and evolution.

{
  "type": "record",
  "name": "UserProfile",
  "namespace": "com.example.user",
  "fields": [
    {
      "name": "user_id",
      "type": "long"
    },
    {
      "name": "username",
      "type": "string"
    },
    {
      "name": "email",
      "type": "string"
    },
    {
      "name": "created_at",
      "type": "long",
      "logicalType": "timestamp-millis"
    }
  ]
}

Register the schema

Register your first schema version with the Schema Registry using the REST API.

curl -X POST http://localhost:8081/subjects/user-profile-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\": \"record\", \"name\": \"UserProfile\", \"namespace\": \"com.example.user\", \"fields\": [{\"name\": \"user_id\", \"type\": \"long\"}, {\"name\": \"username\", \"type\": \"string\"}, {\"name\": \"email\", \"type\": \"string\"}, {\"name\": \"created_at\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\"}]}"
  }'

List all subjects

curl http://localhost:8081/subjects

Configure schema compatibility

Set backward compatibility to ensure new schema versions can read old data.

# Set global compatibility level
curl -X PUT http://localhost:8081/config \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

Set subject-specific compatibility

curl -X PUT http://localhost:8081/config/user-profile-value \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ -d '{"compatibility": "BACKWARD"}'

Check compatibility settings

curl http://localhost:8081/config curl http://localhost:8081/config/user-profile-value

Configure Kafka producers with Avro

Create Python producer example

Build a producer that serializes data using the registered Avro schema.

#!/usr/bin/env python3

from confluent_kafka import Producer
from confluent_kafka.avro import AvroProducer
from confluent_kafka.avro.serializer import SerializerError
import time
import json

Schema Registry and Kafka configuration

conf = { 'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081' }

Avro schema (must match registered schema)

value_schema_str = """ { "type": "record", "name": "UserProfile", "namespace": "com.example.user", "fields": [ { "name": "user_id", "type": "long" }, { "name": "username", "type": "string" }, { "name": "email", "type": "string" }, { "name": "created_at", "type": "long", "logicalType": "timestamp-millis" } ] } """ key_schema_str = '{"type": "string"}' def delivery_report(err, msg): """Callback for message delivery reports.""" if err is not None: print(f'Message delivery failed: {err}') else: print(f'Message delivered to {msg.topic()} [{msg.partition()}]') def main(): # Create AvroProducer avroProducer = AvroProducer(conf, default_key_schema=key_schema_str, default_value_schema=value_schema_str) # Sample user data users = [ { "user_id": 1001, "username": "alice_smith", "email": "alice@example.com", "created_at": int(time.time() * 1000) }, { "user_id": 1002, "username": "bob_jones", "email": "bob@example.com", "created_at": int(time.time() * 1000) } ] try: for user in users: # Produce Avro encoded messages avroProducer.produce(topic='user-profiles', key=user['username'], value=user, callback=delivery_report) print(f"Produced user: {user['username']}") # Wait for messages to be delivered avroProducer.flush() except SerializerError as e: print(f"Message serialization failed: {e}") except Exception as e: print(f"Error: {e}") if __name__ == '__main__': main()

Create Python consumer example

Build a consumer that deserializes Avro data using the Schema Registry.

#!/usr/bin/env python3

from confluent_kafka import Consumer
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
import sys

Consumer configuration

conf = { 'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8081', 'group.id': 'user-profile-consumer', 'auto.offset.reset': 'earliest' } def main(): # Create AvroConsumer avroConsumer = AvroConsumer(conf) # Subscribe to topic avroConsumer.subscribe(['user-profiles']) try: while True: try: # Poll for messages msg = avroConsumer.poll(timeout=1.0) if msg is None: continue if msg.error(): print(f"Consumer error: {msg.error()}") continue # Message successfully consumed key = msg.key() value = msg.value() print(f"Received message:") print(f" Key: {key}") print(f" Value: {value}") print(f" Topic: {msg.topic()}") print(f" Partition: {msg.partition()}") print(f" Offset: {msg.offset()}") print("-" * 50) except SerializerError as e: print(f"Message deserialization failed: {e}") except KeyboardInterrupt: break except Exception as e: print(f"Error: {e}") finally: # Close consumer avroConsumer.close() if __name__ == '__main__': main()

Set correct permissions for scripts

Make the producer and consumer scripts executable by the schema-registry user.

sudo chown schema-registry:schema-registry /opt/schema-registry/avro-producer.py /opt/schema-registry/avro-consumer.py
sudo chmod 755 /opt/schema-registry/avro-producer.py /opt/schema-registry/avro-consumer.py

Implement schema evolution

Create evolved schema version

Add optional fields to demonstrate backward-compatible schema evolution.

{
  "type": "record",
  "name": "UserProfile",
  "namespace": "com.example.user",
  "fields": [
    {
      "name": "user_id",
      "type": "long"
    },
    {
      "name": "username",
      "type": "string"
    },
    {
      "name": "email",
      "type": "string"
    },
    {
      "name": "created_at",
      "type": "long",
      "logicalType": "timestamp-millis"
    },
    {
      "name": "phone_number",
      "type": ["null", "string"],
      "default": null
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "UserStatus",
        "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
      },
      "default": "ACTIVE"
    }
  ]
}

Test schema compatibility

Verify the new schema version is compatible before registering it.

# Test compatibility of new schema
curl -X POST http://localhost:8081/compatibility/subjects/user-profile-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d @- <<'EOF'
{
  "schema": "{\"type\": \"record\", \"name\": \"UserProfile\", \"namespace\": \"com.example.user\", \"fields\": [{\"name\": \"user_id\", \"type\": \"long\"}, {\"name\": \"username\", \"type\": \"string\"}, {\"name\": \"email\", \"type\": \"string\"}, {\"name\": \"created_at\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\"}, {\"name\": \"phone_number\", \"type\": [\"null\", \"string\"], \"default\": null}, {\"name\": \"status\", \"type\": {\"type\": \"enum\", \"name\": \"UserStatus\", \"symbols\": [\"ACTIVE\", \"INACTIVE\", \"PENDING\"]}, \"default\": \"ACTIVE\"}]}"
}
EOF

Register evolved schema

Register the new schema version if compatibility check passes.

# Register schema version 2
curl -X POST http://localhost:8081/subjects/user-profile-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d @- <<'EOF'
{
  "schema": "{\"type\": \"record\", \"name\": \"UserProfile\", \"namespace\": \"com.example.user\", \"fields\": [{\"name\": \"user_id\", \"type\": \"long\"}, {\"name\": \"username\", \"type\": \"string\"}, {\"name\": \"email\", \"type\": \"string\"}, {\"name\": \"created_at\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\"}, {\"name\": \"phone_number\", \"type\": [\"null\", \"string\"], \"default\": null}, {\"name\": \"status\", \"type\": {\"type\": \"enum\", \"name\": \"UserStatus\", \"symbols\": [\"ACTIVE\", \"INACTIVE\", \"PENDING\"]}, \"default\": \"ACTIVE\"}]}"
}
EOF

List all versions for the subject

curl http://localhost:8081/subjects/user-profile-value/versions

Configure security and monitoring

Configure firewall rules

Allow Schema Registry traffic through the firewall on port 8081.

sudo ufw allow 8081/tcp comment "Schema Registry"
sudo ufw reload
sudo ufw status numbered
sudo firewall-cmd --permanent --add-port=8081/tcp
sudo firewall-cmd --reload
sudo firewall-cmd --list-ports

Configure JMX monitoring

Enable JMX metrics for monitoring Schema Registry performance.

# Add JMX configuration for monitoring
jmx.port=9999

Additional monitoring settings

metrics.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter confluent.metrics.reporter.bootstrap.servers=localhost:9092 confluent.metrics.reporter.topic=_confluent-metrics

Create health check script

Monitor Schema Registry health and schema availability.

#!/bin/bash

Schema Registry health check script

REGISTRY_URL="http://localhost:8081" LOG_FILE="/var/log/schema-registry/health-check.log" log_message() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" >> "$LOG_FILE" }

Check if Schema Registry is responding

if curl -s -f "$REGISTRY_URL/subjects" > /dev/null; then log_message "SUCCESS: Schema Registry is healthy" # Check specific subject SUBJECT_COUNT=$(curl -s "$REGISTRY_URL/subjects" | jq length) log_message "INFO: $SUBJECT_COUNT subjects registered" exit 0 else log_message "ERROR: Schema Registry health check failed" systemctl status schema-registry >> "$LOG_FILE" exit 1 fi
sudo chown schema-registry:schema-registry /opt/schema-registry/health-check.sh
sudo chmod 755 /opt/schema-registry/health-check.sh

Add to crontab for regular health checks

echo "/5 * /opt/schema-registry/health-check.sh" | sudo crontab -u schema-registry -

Verify your setup

# Check Schema Registry status
sudo systemctl status schema-registry

Verify Schema Registry is listening

sudo netstat -tlnp | grep :8081

Test API endpoints

curl http://localhost:8081/subjects curl http://localhost:8081/config curl http://localhost:8081/subjects/user-profile-value/versions

Test producer and consumer

cd /opt/schema-registry python3 avro-producer.py

In another terminal

cd /opt/schema-registry python3 avro-consumer.py

Check logs

sudo tail -f /var/log/schema-registry/schema-registry.log

Common issues

SymptomCauseFix
Schema Registry won't startKafka not running or incorrect bootstrap.serversStart Kafka first, verify connection: systemctl status kafka
Schema compatibility errorsBreaking changes in schema evolutionAdd default values for new fields, check compatibility: curl http://localhost:8081/compatibility/subjects/topic/versions/latest
Producer serialization failsSchema mismatch or registry unreachableVerify schema registration: curl http://localhost:8081/subjects
Consumer deserialization errorsSchema evolution without proper compatibilityCheck schema versions: curl http://localhost:8081/subjects/topic/versions
Permission denied errorsIncorrect file ownershipFix ownership: sudo chown -R schema-registry:schema-registry /opt/schema-registry
Out of memory errorsInsufficient JVM heap sizeIncrease heap in service file: SCHEMA_REGISTRY_HEAP_OPTS="-Xms1G -Xmx2G"
Never use chmod 777. It gives every user on the system full access to your files. Schema Registry files should be owned by the schema-registry user with 644 permissions for config files and 755 for executables.

Next steps

Running this in production?

Need enterprise support? Setting up Schema Registry once is straightforward. Keeping it monitored, backed up, and optimized across environments with proper schema governance is the harder part. See how we run infrastructure like this for European teams building data platforms.

Automated install script

Run this to automate the entire setup

Need help?

Don't want to manage this yourself?

We handle managed devops services for businesses that depend on uptime. From initial setup to ongoing operations.