Set up Confluent Schema Registry with Avro serialization to manage schemas and ensure data compatibility in your Kafka streaming applications. This guide covers installation, schema management, and producer/consumer configuration.
Prerequisites
- Apache Kafka running on port 9092
- Java 8 or higher installed
- Basic understanding of Kafka topics and producers/consumers
What this solves
Schema Registry provides centralized schema management for Kafka topics, preventing data format mismatches between producers and consumers. Avro serialization gives you compact binary encoding with schema evolution support, essential for production streaming applications where data formats change over time.
Step-by-step installation
Install Java and required dependencies
Schema Registry requires Java 8 or higher to run properly.
sudo apt update
sudo apt install -y openjdk-11-jdk wget curl
Create dedicated user and directories
Run Schema Registry as a non-root user for security and create the necessary directories.
sudo useradd --system --create-home --shell /bin/false kafka
sudo mkdir -p /opt/schema-registry /var/log/schema-registry
sudo chown kafka:kafka /var/log/schema-registry
Download and install Confluent Schema Registry
Download the latest Schema Registry from Confluent and extract it to the installation directory.
cd /tmp
wget https://packages.confluent.io/archive/7.5/confluent-community-2.13-7.5.0.tar.gz
tar -xzf confluent-community-2.13-7.5.0.tar.gz
sudo mv confluent-7.5.0 /opt/schema-registry
sudo chown -R kafka:kafka /opt/schema-registry
Configure Schema Registry properties
Create the main configuration file with Kafka connection settings and storage backend.
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=localhost:9092
kafkastore.topic=_schemas
kafkastore.topic.replication.factor=1
schema.registry.group.id=schema-registry
Security settings
kafkastore.security.protocol=PLAINTEXT
Schema compatibility settings
schema.compatibility.level=BACKWARD
Logging
log4j.configuration=file:/opt/schema-registry/etc/schema-registry/log4j.properties
Configure logging
Set up log4j configuration for proper log management and rotation.
log4j.rootLogger=INFO, stdout, file
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=/var/log/schema-registry/schema-registry.log
log4j.appender.file.MaxFileSize=100MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN
Create systemd service file
Set up Schema Registry as a system service for automatic startup and management.
[Unit]
Description=Confluent Schema Registry
After=network.target kafka.service
Requires=kafka.service
[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/opt/schema-registry/bin/schema-registry-start /opt/schema-registry/etc/schema-registry/schema-registry.properties
ExecStop=/opt/schema-registry/bin/schema-registry-stop
Restart=on-failure
RestartSec=5
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
Environment=SCHEMA_REGISTRY_HEAP_OPTS="-Xmx512M"
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
Start and enable Schema Registry
Enable the service to start automatically on boot and start it now.
sudo systemctl daemon-reload
sudo systemctl enable schema-registry
sudo systemctl start schema-registry
sudo systemctl status schema-registry
Configure Avro serialization settings
Install Avro tools and libraries
Install the necessary Avro libraries for working with schemas and serialization.
cd /opt/schema-registry
sudo -u kafka wget https://repo1.maven.org/maven2/org/apache/avro/avro-tools/1.11.3/avro-tools-1.11.3.jar -P lib/
Create example Avro schema
Define a sample user schema to demonstrate Avro serialization patterns.
{
"type": "record",
"name": "User",
"namespace": "com.example.avro",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "username",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "active",
"type": "boolean",
"default": true
}
]
}
Register the schema
Register your Avro schema with the Schema Registry using the REST API.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"namespace\": \"com.example.avro\", \"fields\": [{\"name\": \"id\", \"type\": \"long\"}, {\"name\": \"username\", \"type\": \"string\"}, {\"name\": \"email\", \"type\": \"string\"}, {\"name\": \"created_at\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\"}, {\"name\": \"active\", \"type\": \"boolean\", \"default\": true}]}"}' \
http://localhost:8081/subjects/user-value/versions
Create and manage schemas
List all registered schemas
View all subjects and their registered schemas in the registry.
curl -X GET http://localhost:8081/subjects
curl -X GET http://localhost:8081/subjects/user-value/versions
Set compatibility levels
Configure schema evolution compatibility to control how schemas can change over time.
# Set global compatibility level
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config
Set subject-specific compatibility
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "FULL"}' \
http://localhost:8081/config/user-value
Test schema compatibility
Validate schema changes before registering new versions to prevent breaking changes.
# Test compatibility of a new schema version
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"namespace\": \"com.example.avro\", \"fields\": [{\"name\": \"id\", \"type\": \"long\"}, {\"name\": \"username\", \"type\": \"string\"}, {\"name\": \"email\", \"type\": \"string\"}, {\"name\": \"created_at\", \"type\": \"long\", \"logicalType\": \"timestamp-millis\"}, {\"name\": \"active\", \"type\": \"boolean\", \"default\": true}, {\"name\": \"phone\", \"type\": [\"null\", \"string\"], \"default\": null}]}"}' \
http://localhost:8081/compatibility/subjects/user-value/versions/latest
Producer and consumer configuration
Configure Java producer with Avro
Set up a Kafka producer to serialize messages using the registered Avro schema.
# Kafka cluster connection
bootstrap.servers=localhost:9092
acks=all
retries=3
max.in.flight.requests.per.connection=1
Avro serialization
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url=http://localhost:8081
Performance settings
batch.size=16384
linger.ms=5
buffer.memory=33554432
compression.type=snappy
Configure Java consumer with Avro
Set up a Kafka consumer to deserialize Avro messages using the schema registry.
# Kafka cluster connection
bootstrap.servers=localhost:9092
group.id=avro-consumer-group
auto.offset.reset=earliest
enable.auto.commit=false
Avro deserialization
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url=http://localhost:8081
specific.avro.reader=true
Performance settings
fetch.min.bytes=1024
fetch.max.wait.ms=500
max.poll.records=500
Create sample Java producer code
Example producer implementation that uses the registered User schema.
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericData;
import java.util.Properties;
public class AvroProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");
Producer producer = new KafkaProducer<>(props);
// Create Avro record
GenericRecord user = new GenericData.Record(userSchema);
user.put("id", 1001L);
user.put("username", "john_doe");
user.put("email", "john@example.com");
user.put("created_at", System.currentTimeMillis());
user.put("active", true);
ProducerRecord record =
new ProducerRecord<>("user-events", "user-1001", user);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent record to topic=%s partition=%d offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
producer.close();
}
}
Test with command line tools
Use Kafka's built-in console tools to test Avro message production and consumption.
# Create topic
kafka-topics.sh --create --topic user-events \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1
Start Avro console consumer
kafka-avro-console-consumer --bootstrap-server localhost:9092 \
--topic user-events \
--property schema.registry.url=http://localhost:8081 \
--from-beginning
Verify your setup
# Check Schema Registry status
sudo systemctl status schema-registry
curl -X GET http://localhost:8081/subjects
Verify schema registration
curl -X GET http://localhost:8081/subjects/user-value/versions/1
Check compatibility settings
curl -X GET http://localhost:8081/config
curl -X GET http://localhost:8081/config/user-value
Test connectivity
telnet localhost 8081
Common issues
| Symptom | Cause | Fix |
|---|---|---|
| Schema Registry won't start | Kafka not running | Start Kafka first: sudo systemctl start kafka |
| "Subject not found" error | Schema not registered | Register schema using REST API or check subject name |
| Serialization errors | Schema mismatch | Verify schema compatibility and field types |
| Connection refused on 8081 | Service not listening | Check logs: sudo journalctl -u schema-registry -f |
| OutOfMemoryError | Insufficient heap size | Increase SCHEMA_REGISTRY_HEAP_OPTS in systemd service |
| Schema evolution errors | Incompatible changes | Use compatibility test endpoint before registering |
Next steps
- Configure Kafka Streams state stores and RocksDB optimization for advanced stream processing
- Set up Kafka Streams testing framework with TopologyTestDriver for automated testing
- Setup Kafka Schema Registry security and authentication
- Configure Kafka Connect with Schema Registry and Avro
- Implement Kafka schema evolution strategies for production
Running this in production?
Automated install script
Run this to automate the entire setup
#!/usr/bin/env bash
set -euo pipefail
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
# Configuration
SCHEMA_REGISTRY_VERSION="7.5.0"
JAVA_VERSION="11"
SCHEMA_REGISTRY_USER="kafka"
SCHEMA_REGISTRY_HOME="/opt/schema-registry"
LOG_DIR="/var/log/schema-registry"
# Usage
usage() {
echo "Usage: $0 [kafka_bootstrap_servers]"
echo "Example: $0 localhost:9092"
exit 1
}
# Cleanup on error
cleanup() {
echo -e "${RED}Error occurred. Cleaning up...${NC}"
systemctl stop schema-registry 2>/dev/null || true
systemctl disable schema-registry 2>/dev/null || true
rm -f /etc/systemd/system/schema-registry.service
systemctl daemon-reload
}
trap cleanup ERR
# Check prerequisites
check_prerequisites() {
if [[ $EUID -ne 0 ]]; then
echo -e "${RED}This script must be run as root${NC}"
exit 1
fi
if ! command -v wget &> /dev/null; then
echo -e "${RED}wget is required but not installed${NC}"
exit 1
fi
}
# Detect distribution
detect_distro() {
if [ -f /etc/os-release ]; then
. /etc/os-release
case "$ID" in
ubuntu|debian)
PKG_MGR="apt"
PKG_INSTALL="apt install -y"
PKG_UPDATE="apt update"
JAVA_HOME="/usr/lib/jvm/java-${JAVA_VERSION}-openjdk-amd64"
;;
almalinux|rocky|centos|rhel|ol|fedora)
PKG_MGR="dnf"
PKG_INSTALL="dnf install -y"
PKG_UPDATE="dnf update -y"
JAVA_HOME="/usr/lib/jvm/java-${JAVA_VERSION}-openjdk"
;;
amzn)
PKG_MGR="yum"
PKG_INSTALL="yum install -y"
PKG_UPDATE="yum update -y"
JAVA_HOME="/usr/lib/jvm/java-${JAVA_VERSION}-openjdk"
;;
*)
echo -e "${RED}Unsupported distro: $ID${NC}"
exit 1
;;
esac
else
echo -e "${RED}Cannot detect distribution${NC}"
exit 1
fi
}
# Main installation
main() {
local kafka_servers="${1:-localhost:9092}"
echo -e "${GREEN}Installing Kafka Schema Registry with Avro serialization${NC}"
check_prerequisites
detect_distro
echo -e "${YELLOW}[1/8] Installing Java and dependencies...${NC}"
$PKG_UPDATE
if [[ "$PKG_MGR" == "apt" ]]; then
$PKG_INSTALL openjdk-${JAVA_VERSION}-jdk wget curl
else
$PKG_INSTALL java-${JAVA_VERSION}-openjdk-devel wget curl
fi
echo -e "${YELLOW}[2/8] Creating dedicated user and directories...${NC}"
if ! id "$SCHEMA_REGISTRY_USER" &>/dev/null; then
useradd --system --create-home --shell /bin/false "$SCHEMA_REGISTRY_USER"
fi
mkdir -p "$SCHEMA_REGISTRY_HOME" "$LOG_DIR"
chown "$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_USER" "$LOG_DIR"
echo -e "${YELLOW}[3/8] Downloading and installing Confluent Schema Registry...${NC}"
cd /tmp
wget -q "https://packages.confluent.io/archive/${SCHEMA_REGISTRY_VERSION}/confluent-community-2.13-${SCHEMA_REGISTRY_VERSION}.tar.gz"
tar -xzf "confluent-community-2.13-${SCHEMA_REGISTRY_VERSION}.tar.gz"
# Remove existing installation if present
rm -rf "$SCHEMA_REGISTRY_HOME"/*
mv "confluent-${SCHEMA_REGISTRY_VERSION}"/* "$SCHEMA_REGISTRY_HOME/"
chown -R "$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_USER" "$SCHEMA_REGISTRY_HOME"
echo -e "${YELLOW}[4/8] Configuring Schema Registry properties...${NC}"
mkdir -p "$SCHEMA_REGISTRY_HOME/etc/schema-registry"
cat > "$SCHEMA_REGISTRY_HOME/etc/schema-registry/schema-registry.properties" << EOF
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=$kafka_servers
kafkastore.topic=_schemas
kafkastore.topic.replication.factor=1
schema.registry.group.id=schema-registry
# Security settings
kafkastore.security.protocol=PLAINTEXT
# Schema compatibility settings
schema.compatibility.level=BACKWARD
EOF
echo -e "${YELLOW}[5/8] Configuring logging...${NC}"
cat > "$SCHEMA_REGISTRY_HOME/etc/schema-registry/log4j.properties" << EOF
log4j.rootLogger=INFO, stdout, file
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=$LOG_DIR/schema-registry.log
log4j.appender.file.MaxFileSize=100MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN
EOF
chown -R "$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_USER" "$SCHEMA_REGISTRY_HOME/etc"
echo -e "${YELLOW}[6/8] Creating systemd service file...${NC}"
cat > /etc/systemd/system/schema-registry.service << EOF
[Unit]
Description=Confluent Schema Registry
After=network.target
Wants=network.target
[Service]
Type=simple
User=$SCHEMA_REGISTRY_USER
Group=$SCHEMA_REGISTRY_USER
ExecStart=$SCHEMA_REGISTRY_HOME/bin/schema-registry-start $SCHEMA_REGISTRY_HOME/etc/schema-registry/schema-registry.properties
ExecStop=$SCHEMA_REGISTRY_HOME/bin/schema-registry-stop
Restart=on-failure
RestartSec=5
Environment=JAVA_HOME=$JAVA_HOME
Environment=SCHEMA_REGISTRY_HEAP_OPTS="-Xmx512M"
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
EOF
echo -e "${YELLOW}[7/8] Installing Avro tools...${NC}"
mkdir -p "$SCHEMA_REGISTRY_HOME/lib"
sudo -u "$SCHEMA_REGISTRY_USER" wget -q "https://repo1.maven.org/maven2/org/apache/avro/avro-tools/1.11.3/avro-tools-1.11.3.jar" -P "$SCHEMA_REGISTRY_HOME/lib/"
# Create example schema directory
mkdir -p "$SCHEMA_REGISTRY_HOME/schemas"
cat > "$SCHEMA_REGISTRY_HOME/schemas/user.avsc" << EOF
{
"type": "record",
"name": "User",
"namespace": "com.example.avro",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "username",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis"
}
]
}
EOF
chown -R "$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_USER" "$SCHEMA_REGISTRY_HOME/schemas"
echo -e "${YELLOW}[8/8] Starting Schema Registry service...${NC}"
systemctl daemon-reload
systemctl enable schema-registry
systemctl start schema-registry
# Wait for service to start
sleep 5
# Verification
echo -e "${YELLOW}Verifying installation...${NC}"
if systemctl is-active --quiet schema-registry; then
echo -e "${GREEN}✓ Schema Registry service is running${NC}"
else
echo -e "${RED}✗ Schema Registry service failed to start${NC}"
exit 1
fi
# Test API endpoint
if curl -s -f http://localhost:8081/subjects >/dev/null 2>&1; then
echo -e "${GREEN}✓ Schema Registry API is responding${NC}"
else
echo -e "${YELLOW}⚠ Schema Registry API not yet responding (may need more time)${NC}"
fi
echo -e "${GREEN}Installation completed successfully!${NC}"
echo -e "${YELLOW}Schema Registry is running on http://localhost:8081${NC}"
echo -e "${YELLOW}Logs are available in: $LOG_DIR/schema-registry.log${NC}"
echo -e "${YELLOW}Example schema created in: $SCHEMA_REGISTRY_HOME/schemas/user.avsc${NC}"
}
# Run with arguments
if [[ $# -gt 1 ]]; then
usage
fi
main "$@"
Review the script before running. Execute with: bash install.sh