Report this

What is the reason for this report?

Getting started with Kafka Schema Registry on DigitalOcean

Published on December 10, 2025
Getting started with Kafka Schema Registry on DigitalOcean

Imagine your online store is processing orders smoothly. Then your order processing system starts rejecting customer data. Someone on your team changed a field name in the order data, and now your producer and consumer apps no longer “speak the same language.” What was once a working pipeline is now throwing error messages and blocking customer purchases.

This scenario is surprisingly common in distributed systems. Applications that rely on shared data pipelines are fragile when it comes to format changes. Even a small mismatch, like renaming customer_email to email, can cause services to fail.

That’s where Kafka Schema Registry comes in. Schema Registry acts as a central authority for data formats, ensuring that producers (apps sending data) and consumers (apps reading data) always agree on message structure. It makes evolving your data models safer, so you can add new fields without breaking existing services.

In this tutorial, you’ll learn how to set up Kafka Schema Registry on DigitalOcean, step by step. We’ll walk you through building a simple order processing system where:

  • A web store produces order events.
  • A fulfillment service consumes and processes them.
  • Schema Registry enforces the rules so data changes don’t break your apps.

By the end, you’ll know how to produce and consume Avro-encoded messages with Schema Registry, evolve your schemas safely, and monitor your setup on a managed Kafka cluster in DigitalOcean.

Key Takeaways

Before diving into the implementation, here’s what you’ll learn:

  • Schema Registry ensures data consistency by enforcing schemas that producers and consumers must follow, preventing deserialization errors from field name mismatches.
  • Schema evolution works safely with Schema Registry, you can add optional fields without breaking existing consumers, enabling zero-downtime updates.
  • DigitalOcean Managed Kafka includes Schema Registry on dedicated CPU plans, eliminating the need to run and maintain separate infrastructure.
  • Avro serialization reduces message size compared to JSON while maintaining schema validation, improving throughput and storage efficiency.
  • Schema versioning prevents breaking changes by tracking schema history and enforcing compatibility rules automatically.

What is Kafka Schema Registry?

Apache Kafka moves data between applications efficiently. It lets applications publish and subscribe to streams of records, making it a backbone for real-time systems. But while Kafka handles message transport, it doesn’t enforce rules about message structure. Producers can send anything and must ensure that consumers can decode it.

That’s where Schema Registry fits in as the central authority for your Kafka topics. It stores schemas, formal descriptions of your data structure, like the order data in our example, in a centralized service. These schemas are typically defined in formats like Avro, Protobuf, or JSON Schema.

Here’s why this matters:

  • Consistency – Producers must write messages that follow a schema, and consumers must read messages according to that same schema. This prevents mismatches like order_id vs id from causing deserialization errors.
  • Versioning – Over time, your business evolves. Maybe you add a shipping_address field or support discount codes. Schema Registry allows you to evolve schemas safely, ensuring new versions remain compatible with old ones.
  • Validation – Messages are checked against the schema before being accepted. Invalid messages never make it into your topic, keeping your data stream clean.

When a producer sends a message, it registers or references a schema stored in Schema Registry. Consumers automatically fetch that schema (if they don’t already have it cached) and use it to decode messages.

DigitalOcean’s Managed Kafka service (available on dedicated CPU plans) includes Schema Registry out of the box. That means you can focus on building your applications instead of running and maintaining extra infrastructure. For more information on setting up Kafka clusters, see the DigitalOcean Managed Kafka documentation.

An Example: Building an order processing system

To see Schema Registry in action, we’ll build a simple order processing system.

Here’s the scenario:

  • The web store produces order messages whenever a customer checks out. These messages contain details like the order ID, customer email, total amount, and date.
  • The fulfillment service consumes these messages, processes the orders, and prepares them for shipment.
  • Schema Registry ensures that both producer and consumer agree on the structure of the order message.

Later, we’ll evolve our schema to add optional fields like shipping_address and discount_code. Without Schema Registry, these changes could cause errors. With Schema Registry, they’ll reliably work.

Step 1: Create a Kafka cluster with Schema Registry

We’ll start by creating a Kafka cluster on DigitalOcean with Schema Registry enabled. You can do this with Terraform (recommended) or via the DigitalOcean Control Panel.

First, create a file called kafka-cluster.tf:

# Configure the DigitalOcean provider
terraform {
  required_providers {
    digitalocean = {
      source  = "digitalocean/digitalocean"
      version = "~> 2.0"
    }
  }
}

provider "digitalocean" {
  token = var.do_token
}

# Variables
variable "do_token" {
  type        = string
  description = "Your DigitalOcean API token"
  sensitive   = true
}

# Create Kafka cluster with Schema Registry
resource "digitalocean_database_cluster" "kafka_cluster" {
  engine     = "kafka"
  version    = "3.8"
  name       = "orders-kafka-cluster"
  node_count = 3
  region     = "nyc3"
  size       = "gd-2vcpu-8gb"
}

# Create topics for our example
resource "digitalocean_database_kafka_topic" "orders" {
  cluster_id         = digitalocean_database_cluster.kafka_cluster.id
  name               = "orders"
  partition_count    = 3
  replication_factor = 3
}

# Create our order schema
resource "digitalocean_database_kafka_schema_registry" "order_schema" {
  cluster_id   = digitalocean_database_cluster.kafka_cluster.id
  subject_name = "orders-value"
  schema_type  = "avro"
  schema       = jsonencode({
    "type": "record",
    "name": "Order",
    "fields": [
      {"name": "order_id", "type": "string"},
      {"name": "customer_email", "type": "string"},
      {"name": "total_amount", "type": "double"},
      {"name": "order_date", "type": "string"}
    ]
  })
}

# Output connection details
output "kafka_host" {
  value = digitalocean_database_cluster.kafka_cluster.host
}

output "kafka_port" {
  value = digitalocean_database_cluster.kafka_cluster.port
}

output "schema_registry_url" {
  value = "https://${digitalocean_database_cluster.kafka_cluster.host}:25065"
}

output "kafka_user" {
  value = digitalocean_database_cluster.kafka_cluster.user
}

output "kafka_password" {
  value     = digitalocean_database_cluster.kafka_cluster.password
  sensitive = true
}

Note: You’ll need to create a Personal Access Token in your DigitalOcean account to use as the do_token value. Follow the instructions in the DigitalOcean API documentation to generate your token. For more on using Terraform with DigitalOcean, see our Terraform with DigitalOcean tutorial.

Deploy the cluster:

# Set your DigitalOcean token
export TF_VAR_do_token="your_digitalocean_token_here"

# Initialize and apply
terraform init
terraform apply

After a few minutes, Terraform will output the connection details for your Kafka cluster and Schema Registry.

Alternative: Using the DigitalOcean control panel

If you prefer a click-based setup:

  1. In the DigitalOcean console, go to Databases → Create Database.
  2. Select Kafka, version 3.8.
  3. Choose a General Purpose plan (Schema Registry requires dedicated CPU; Basic plans don’t support it).
  4. Pick a region and cluster size i.e. New York, 6 vCPU / 24 GB RAM / Storage minimum: 150 GiB
  5. After the cluster is created, go to the Settings tab and enable Schema Registry.

Either way, you now have a Kafka cluster with Schema Registry enabled that is ready to store schemas and enforce data rules.

Step 2: Verify Schema Registry setup

Once your cluster is deployed, let’s confirm that Schema Registry is working.

Run:

# Get your cluster details
terraform output

Test your Schema Registry connection by running the following, replace “your-kafka-host” and “username:password” with your actual host configuration from terraform output (you can view your password string with terraform output -raw kafka_password):

curl -u "username:password" "https://your-kafka-host:25065/subjects"

You should see something like:

["orders-value"]

This tells you that:

  • Schema Registry is running and reachable.
  • You have one subject (orders-value) registered, which corresponds to the schema we defined for order messages.

A subject is essentially a namespace under which schemas are versioned. If you update your schema later, you’ll see multiple versions under the same subject.

Download SSL certificates

Before running your producer and consumer applications, you’ll need to download the SSL certificates from your DigitalOcean Kafka cluster:

  1. In the DigitalOcean control panel, navigate to your Kafka cluster
  2. Go to the Connection Details tab
  3. Download the following file to your project directory:
    • ca-certificate.crt (Certificate Authority)

Note: For detailed instructions on downloading and configuring SSL certificates for your Kafka cluster, see the DigitalOcean Managed Kafka documentation.

Make sure these certificate files are in the same directory as your Python scripts, or update the file paths in your configuration accordingly.

Step 3: Create an order producer (Python)

Now that the infrastructure is ready, let’s create a Python producer that sends orders into Kafka. This example uses the confluent-kafka library, which provides robust support for Schema Registry integration. For more Python examples with DigitalOcean services, see our Python tutorials.

Create a file called order_producer.py using the example below with the placeholder values for your-username, your-password, and your-kafka-host:your-kafka-port updated to match your Terraform output:

from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json
from datetime import datetime

# Configuration (replace with your actual values)
kafka_config = {
    'bootstrap.servers': 'your-kafka-host:your-kafka-port',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'SCRAM-SHA-256',
    'sasl.username': 'your-username',
    'sasl.password': 'your-password',
    'ssl.ca.location': 'ca-certificate.crt',
}

schema_registry_config = {
    'url': 'https://your-kafka-host:25065',
    'basic.auth.user.info': 'your-username:your-password'
}

# Our order schema (matches what we created in Terraform)
order_schema = """
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_email", "type": "string"},
    {"name": "total_amount", "type": "double"},
    {"name": "order_date", "type": "string"}
  ]
}
"""

def create_producer():
    """Create producer with Avro serialization"""
    # Create Schema Registry client
    schema_registry_client = SchemaRegistryClient(schema_registry_config)
    
    # Create Avro serializer
    avro_serializer = AvroSerializer(
        schema_registry_client,
        order_schema
    )
    
    # Create producer
    producer = Producer(kafka_config)
    
    return producer, avro_serializer

def send_order(producer, avro_serializer, order_data):
    """Send order to Kafka topic"""
    try:
        serialized_value = avro_serializer(
            order_data, 
            SerializationContext('orders', MessageField.VALUE)
        )
        
        producer.produce(
            topic='orders', 
            value=serialized_value
        )
        producer.flush()
        print(f"✅ Sent order: {order_data['order_id']}")
    except Exception as e:
        print(f"❌ Error sending order: {e}")

# Example usage
if __name__ == "__main__":
    producer, avro_serializer = create_producer()
    
    # Send sample orders
    orders = [
        {
            "order_id": "ORDER-001",
            "customer_email": "john@example.com", 
            "total_amount": 99.99,
            "order_date": "2025-01-15"
        },
        {
            "order_id": "ORDER-002",
            "customer_email": "jane@example.com",
            "total_amount": 149.50,
            "order_date": "2025-01-15"
        }
    ]
    
    for order in orders:
        send_order(producer, avro_serializer, order)
    
    print("All orders sent!")

Now, install the required packages:

pip install 'confluent-kafka[avro]'

Run the producer:

python3 order_producer.py

You should see output like:

✅ Sent order: ORDER-001
✅ Sent order: ORDER-002
All orders sent!

So, what’s happening behind the scenes?

  • Each order is validated against the schema.
  • If the data doesn’t match the schema (say you forget order_id), it won’t be sent.
  • Valid messages are serialized into Avro format and stored in Kafka.

Step 4: Create an order consumer (Python)

Next, let’s build the fulfillment service, a consumer that reads orders from Kafka.

Create a file called order_consumer.py:

order_consumer.py
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
import json

# Configuration (same as producer)
kafka_config = {
    'bootstrap.servers': 'your-kafka-host:your-kafka-port',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'SCRAM-SHA-256',
    'sasl.username': 'your-username',
    'sasl.password': 'your-password',
    'ssl.ca.location': 'ca-certificate.crt',
    'group.id': 'order-processors',
    'auto.offset.reset': 'earliest'
}

schema_registry_config = {
    'url': 'https://your-kafka-host:25065',
    'basic.auth.user.info': 'your-username:your-password'
}

# Our order schema (matches what we created in Terraform)
order_schema = """
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "customer_email", "type": "string"},
    {"name": "total_amount", "type": "double"},
    {"name": "order_date", "type": "string"}
  ]
}
"""

def create_consumer():
    """Create consumer with Avro deserialization"""
    # Create Schema Registry client
    schema_registry_client = SchemaRegistryClient(schema_registry_config)
    
    # Create Avro deserializer
    avro_deserializer = AvroDeserializer(
        schema_registry_client,
        order_schema
    )
    
    # Create consumer
    consumer = Consumer(kafka_config)
    
    return consumer, avro_deserializer

def process_order(order_data):
    """Process received order (your business logic here)"""
    print(f"📦 Processing order {order_data['order_id']}")
    print(f"   Customer: {order_data['customer_email']}")
    print(f"   Amount: ${order_data['total_amount']}")
    print(f"   Date: {order_data['order_date']}")
    print("   Order processed successfully! ✅\n")

# Example usage
if __name__ == "__main__":
    consumer, avro_deserializer = create_consumer()
    consumer.subscribe(['orders'])
    
    print("🚀 Order consumer started. Waiting for orders...")
    print("Press Ctrl+C to stop\n")
    
    try:
        while True:
            msg = consumer.poll(timeout=1.0)
            
            if msg is None:
                continue
            if msg.error():
                print(f"❌ Consumer error: {msg.error()}")
                continue
            
            # Deserialize the message
            try:
                order_data = avro_deserializer(
                    msg.value(),
                    SerializationContext('orders', MessageField.VALUE)
                )
                process_order(order_data)
            except Exception as e:
                print(f"❌ Error deserializing message: {e}")
            
    except KeyboardInterrupt:
        print("👋 Shutting down consumer...")
    finally:
        consumer.close()

Now, let’s run the consumer:

python order_consumer.py

Expected output:

🚀 Order consumer started. Waiting for orders...

📦 Processing order ORDER-001
   Customer: john@example.com
   Amount: $99.99
   Date: 2025-01-15
   Order processed successfully!

The consumer automatically retrieves the schema from Schema Registry, deserializes the message, and makes sure the data structure is exactly what it expects.

Step 5: Test schema evolution

Now for the magic: evolving schemas safely.

Let’s add two new optional fields: shipping_address and discount_code. Update your Terraform schema:

resource "digitalocean_database_kafka_schema_registry" "order_schema" {
  cluster_id   = digitalocean_database_cluster.kafka_cluster.id
  subject_name = "orders-value"
  schema_type  = "avro"
  schema       = jsonencode({
    "type": "record",
    "name": "Order", 
    "fields": [
      {"name": "order_id", "type": "string"},
      {"name": "customer_email", "type": "string"},
      {"name": "total_amount", "type": "double"},
      {"name": "order_date", "type": "string"},
      {"name": "shipping_address", "type": ["null", "string"], "default": null},
      {"name": "discount_code", "type": ["null", "string"], "default": null}
    ]
  })
}

Apply the update:

terraform apply

Now try sending a new order by updating order_producer.py:

order_producer.py
orders = [
    {
        "order_id": "ORDER-003",
        "customer_email": "bob@example.com",
        "total_amount": 199.99,
        "order_date": "2025-01-15",
        "shipping_address": "123 Main St, City, State",
        "discount_code": "SAVE10"
    }
]

Run:

python3 order_producer.py

What happens in the background?

  • The producer validates the new order against the updated schema.
  • The old consumer still works, because Schema Registry ensures backward compatibility.
  • The new consumer can read both old and new messages without errors.

No downtime. No broken services. Just smooth schema evolution.

Step 6: Monitor your setup

You can monitor Schema Registry directly via its API:

# List all schemas
curl -u "username:password" \
  "https://your-kafka-host:25065/subjects"

# Get latest schema version
curl -u "username:password" \
  "https://your-kafka-host:25065/subjects/orders-value/versions/latest"

You’ll see the schema JSON returned, including version numbers.

In addition, DigitalOcean’s control panel gives you:

  • Cluster health at a glance.
  • Message throughput per topic.
  • Lag monitoring, to see if consumers are falling behind.

Together, these tools help you keep your pipeline reliable as you scale. For more on monitoring and observability, see the DigitalOcean Monitoring documentation.

Troubleshooting common issues with Schema Registry

Even with managed infrastructure, you may run into common issues. Here’s a quick reference table for resolving them:

Issue Cause / Explanation Solution
Schema Registry not found Likely you’re on a Basic Kafka plan. Schema Registry requires a General Purpose (dedicated CPU) plan. In the Control Panel, go to your cluster’s Settings and confirm Schema Registry is enabled.
SSL connection failed Kafka certificates (CA, client cert, client key) may not be downloaded or are missing. Ensure the file paths in your Python config point to the correct certificates downloaded from DigitalOcean.
Schema compatibility error Happens when you add new fields without defaults. Schema Registry enforces compatibility rules. Make new fields optional with ["null", "string"] or provide a default value.

Pro tip: When evolving schemas, always test with a consumer running before deploying to production.

Frequently Asked Questions

1. What is Kafka Schema Registry, and why do I need it?

Kafka Schema Registry is a centralized service that stores and manages data schemas for Kafka topics. It ensures that producers and consumers agree on message structure, preventing errors from schema mismatches. Without Schema Registry, a simple field rename like changing customer_email to email can break your entire pipeline. Schema Registry validates messages before they enter topics and handles schema versioning automatically.

2. Is Schema Registry available on all DigitalOcean Kafka plans?

No. Schema Registry requires a General Purpose (dedicated CPU) plan. Basic plans don’t support Schema Registry. When creating your Kafka cluster in the DigitalOcean Control Panel, select a General Purpose plan to enable Schema Registry. You can find more details in the DigitalOcean Managed Kafka documentation.

3. How does Schema Registry handle schema evolution and compatibility?

Schema Registry enforces compatibility rules when you update schemas. By default, it uses backward compatibility, meaning new schema versions must be readable by consumers using older versions. To add new fields safely, make them optional with ["null", "string"] or provide default values. Breaking changes (like removing required fields) are rejected unless you change the compatibility mode. This prevents production outages from schema updates.

4. Can I use Schema Registry with Protobuf or JSON Schema instead of Avro?

Yes. DigitalOcean’s Schema Registry supports Avro, Protobuf, and JSON Schema formats. This tutorial uses Avro because it’s widely adopted and provides efficient binary serialization. Protobuf offers similar benefits with a different schema definition language, while JSON Schema is more human-readable but less efficient. Choose based on your team’s preferences and existing tooling.

5. What happens if I send a message that doesn’t match the schema?

Schema Registry validates messages before they’re accepted. If a message doesn’t match the registered schema, the producer will receive an error and the message won’t be written to the topic. This keeps your data stream clean and prevents downstream consumers from encountering deserialization errors. Always validate your data structure matches the schema before sending.

6. How do I monitor Schema Registry performance and usage?

You can monitor Schema Registry through its REST API using curl commands to list subjects and view schema versions. Additionally, DigitalOcean’s control panel provides cluster health metrics, message throughput per topic, and consumer lag monitoring. For production deployments, consider setting up alerts for schema compatibility errors and consumer lag thresholds.

What you’ve built

Congratulations! You now have a working Kafka pipeline with Schema Registry on DigitalOcean.

Next steps

Now that your foundation is in place, here are some ideas to take it further:

  • Add new fields to your schema, like payment_status or delivery_date.
  • Experiment with different schema formats such as Protobuf or JSON Schema.
  • Integrate DigitalOcean monitoring with alerts so you know when consumers lag.
  • Scale your Kafka cluster to handle higher throughput using the DigitalOcean Control Panel.

From here, you can expand your pipeline into a complete event-driven architecture. With Schema Registry on DigitalOcean, you can build confidently, knowing your data integrity is protected.

Ready to go deeper? Check out these resources:

Continue learning and evolving your architecture with DigitalOcean Managed Kafka and the tutorials above.

Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.

Learn more about our products

About the author(s)

Zach Peirce
Zach Peirce
Author
Anish Singh Walia
Anish Singh Walia
Editor
Sr Technical Writer
See author profile

I help Businesses scale with AI x SEO x (authentic) Content that revives traffic and keeps leads flowing | 3,000,000+ Average monthly readers on Medium | Sr Technical Writer @ DigitalOcean | Ex-Cloud Consultant @ AMEX | Ex-Site Reliability Engineer(DevOps)@Nutanix

Still looking for an answer?

Was this helpful?


This textbox defaults to using Markdown to format your answer.

You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!

Creative CommonsThis work is licensed under a Creative Commons Attribution-NonCommercial- ShareAlike 4.0 International License.
Join the Tech Talk
Success! Thank you! Please check your email for further details.

Please complete your information!

The developer cloud

Scale up as you grow — whether you're running one virtual machine or ten thousand.

Get started for free

Sign up and get $200 in credit for your first 60 days with DigitalOcean.*

*This promotional offer applies to new accounts only.