This tutorial demonstrates how to build Kafka producer and consumer applications in Java that use Schema Registry for message schema management. You'll learn how to configure your Java applications to serialize and deserialize records, ensuring type safety and schema evolution compatibility. By the end of this tutorial, you'll have working applications that produce and consume device temperature reading records.
The applications in this tutorial use Avro-formatted messages. In order to use Protobuf or JSON Schema formatting, you would need to use a different serializer / deserializer, but otherwise the applications would be similarly structured.
The steps in this tutorial outline how to set up the required Kafka infrastructure and run the provided producer / consumer applications. For a deeper look at the application source code, refer to the Code explanation section at the bottom.
The following steps use Confluent Cloud. To run the tutorial locally with Docker, skip to the Docker instructions section at the bottom.
- Java 17
- A Confluent Cloud account
- The Confluent CLI installed on your machine
- Clone the
confluentinc/tutorialsrepository and navigate into its top-level directory:git clone git@github.com:confluentinc/tutorials.git cd tutorials
Log in to your Confluent Cloud account:
confluent login --prompt --saveInstall a CLI plugin that will streamline the creation of resources in Confluent Cloud:
confluent plugin install confluent-quickstartRun the plugin from the top-level directory of the tutorials repository to create the Confluent Cloud resources needed for this tutorial.
Note: You may specify a different cloud provider (gcp or azure) or region. You can find supported regions in a given cloud provider by running confluent kafka region list --cloud <CLOUD>.
confluent quickstart \
--environment-name kafka-sr-env \
--kafka-cluster-name kafka-sr-cluster \
--create-kafka-key \
--create-sr-key \
--kafka-java-properties-file ./schema-registry-java/src/main/resources/cloud.propertiesThe plugin should complete in under a minute.
Create the topic for the application:
confluent kafka topic create readingsCompile the application from the top-level tutorials repository directory:
./gradlew schema-registry-java:shadowJarNavigate into the application's home directory:
cd schema-registry-javaRun the producer application, passing the Kafka client configuration file generated when you created Confluent Cloud resources:
java -cp ./build/libs/schema-registry-java-standalone.jar \
io.confluent.developer.AvroProducer \
./src/main/resources/cloud.propertiesValidate that you see temperature reading Avro records in the readings topic.
confluent kafka topic consume readings \
--value-format avro \
--from-beginningYou should see output similar to the following:
{"deviceId":"3","temperature":99.5231}
{"deviceId":"3","temperature":70.56588}
{"deviceId":"1","temperature":99.817894}
{"deviceId":"1","temperature":98.89636}
{"deviceId":"0","temperature":96.56193}
{"deviceId":"2","temperature":97.53318}
{"deviceId":"2","temperature":75.94116}
{"deviceId":"0","temperature":74.87793}
{"deviceId":"0","temperature":76.37975}
{"deviceId":"0","temperature":83.31611}These messages correspond to the Avro schema in src/main/avro/temp-reading.avsc:
{
"namespace": "io.confluent.developer.avro",
"type": "record",
"name": "TempReading",
"fields": [
{ "name": "deviceId", "type": "string" },
{ "name": "temperature", "type": "float" }
]
}Run the consumer application:
java -cp ./build/libs/schema-registry-java-standalone.jar \
io.confluent.developer.AvroConsumer \
./src/main/resources/cloud.propertiesYou should see output similar to the following:
[main] INFO - Consumed event: key = 4, value = {"deviceId": "4", "temperature": 99.00065}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 95.12411}
[main] INFO - Consumed event: key = 0, value = {"deviceId": "0", "temperature": 99.8184}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 92.55404}
[main] INFO - Consumed event: key = 3, value = {"deviceId": "3", "temperature": 79.467354}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 77.81964}
[main] INFO - Consumed event: key = 1, value = {"deviceId": "1", "temperature": 87.234375}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 78.16981}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 97.42639}
[main] INFO - Consumed event: key = 1, value = {"deviceId": "1", "temperature": 98.66289}When you are finished, delete the kafka-sr-env environment by first getting the environment ID of the form env-123456 corresponding to it:
confluent environment listDelete the environment, including all resources created for this tutorial:
confluent environment delete <ENVIRONMENT ID>Docker instructions
- Java 17
- Docker running via Docker Desktop or Docker Engine
- Docker Compose. Ensure that the command
docker compose versionsucceeds. - Clone the
confluentinc/tutorialsrepository and navigate into its top-level directory:git clone git@github.com:confluentinc/tutorials.git cd tutorials
Start Kafka and Schema Registry with the following command run from the top-level tutorials repository directory:
docker compose -f ./docker/docker-compose-kafka-sr.yml up -dOpen a shell in the broker container:
docker exec -it broker /bin/bashCreate the topic for the application:
kafka-topics --bootstrap-server localhost:9092 --create --topic readingsExit the broker container by entering Ctrl+D.
On your local machine, compile the app:
./gradlew schema-registry-java:shadowJarNavigate into the application's home directory:
cd schema-registry-javaRun the producer application, passing the local.properties Kafka client configuration file that points to the broker's bootstrap servers endpoint at localhost:9092 and Schema Registry at http://localhost:8081:
java -cp ./build/libs/schema-registry-java-standalone.jar \
io.confluent.developer.AvroProducer \
./src/main/resources/local.propertiesValidate that you see temperature reading Avro records in the readings topic. Open a shell in the Schema Registry container:
docker exec -it schema-registry /bin/bashRun a console consumer:
kafka-avro-console-consumer \
--topic readings \
--bootstrap-server broker:29092 \
--property schema.registry.url=http://localhost:8081 \
--from-beginningYou should see output similar to this:
{"deviceId":"2","temperature":96.71052551269531}
{"deviceId":"2","temperature":78.42681121826172}
{"deviceId":"3","temperature":95.85462951660156}
{"deviceId":"2","temperature":83.17869567871094}
{"deviceId":"3","temperature":79.87565612792969}
{"deviceId":"1","temperature":79.03103637695312}
{"deviceId":"0","temperature":87.11306762695312}
{"deviceId":"0","temperature":76.37906646728516}
{"deviceId":"3","temperature":75.17118072509766}
{"deviceId":"2","temperature":84.00798034667969}These messages correspond to the Avro schema in src/main/avro/temp-reading.avsc:
{
"namespace": "io.confluent.developer.avro",
"type": "record",
"name": "TempReading",
"fields": [
{ "name": "deviceId", "type": "string" },
{ "name": "temperature", "type": "float" }
]
}Run the consumer application:
java -cp ./build/libs/schema-registry-java-standalone.jar \
io.confluent.developer.AvroConsumer \
./src/main/resources/local.propertiesYou should see output similar to the following:
[main] INFO - Consumed event: key = 4, value = {"deviceId": "4", "temperature": 99.00065}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 95.12411}
[main] INFO - Consumed event: key = 0, value = {"deviceId": "0", "temperature": 99.8184}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 92.55404}
[main] INFO - Consumed event: key = 3, value = {"deviceId": "3", "temperature": 79.467354}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 77.81964}
[main] INFO - Consumed event: key = 1, value = {"deviceId": "1", "temperature": 87.234375}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 78.16981}
[main] INFO - Consumed event: key = 2, value = {"deviceId": "2", "temperature": 97.42639}
[main] INFO - Consumed event: key = 1, value = {"deviceId": "1", "temperature": 98.66289}From your local machine, stop the broker and Schema Registry containers. From the top-level tutorials directory:
docker compose -f ./docker/docker-compose-kafka-sr.yml downCode explanation
This section summarizes the application source files in src/main/java/io/confluent/developer/.
The AvroProducer class demonstrates how to produce Avro-encoded messages to a Kafka topic using Schema Registry. The producer:
-
Loads configuration: Reads Kafka and Schema Registry connection properties from a file.
-
Configures serializers: Sets the key serializer to
StringSerializerand the value serializer toKafkaAvroSerializer. TheKafkaAvroSerializerautomatically registers the Avro schema with Schema Registry on first use and embeds a schema ID in each message. -
Creates producer instance: Instantiates a
KafkaProducerparameterized withStringkeys andTempReading(the generated Avro class) values. -
Generates and sends records: Creates random temperature readings corresponding to random device IDs and produces messages to the
readingstopic. -
Cleans up: Flushes any pending records and closes the producer to ensure all messages are sent before the application terminates.
The AvroConsumer class demonstrates how to consume Avro-encoded messages from a Kafka topic using Schema Registry. The consumer:
-
Loads configuration: Reads Kafka and Schema Registry connection properties from a properties file.
-
Configures deserializers: Sets the key deserializer to
StringDeserializerand the value deserializer toKafkaAvroDeserializer. TheKafkaAvroDeserializerautomatically retrieves the schema from Schema Registry using the schema ID embedded in each message. -
Sets consumer properties:
AUTO_OFFSET_RESET_CONFIGis set to"earliest"to read from the beginning of the topicGROUP_ID_CONFIGis set to"avro-consumer-group"to identify this consumer groupSPECIFIC_AVRO_READER_CONFIGis set totrueto deserialize messages into the specificTempReadingclass rather than a genericGenericRecord. See here for details.
-
Subscribes and polls: Subscribes to the
readingstopic and enters a polling loop that continuously fetches records in batches. Each consumed record is logged with its key and deserializedTempReadingvalue.