Understand Kafka Clusters, Kafka Consumer Failover, and Kafka Broker Failover with examples
Writing a Kafka Consumer in Java
Show all

Writing a Kafka Producer in Java

9 mins read

In this tutorial, we are going to create a simple Java example that creates a Kafka producer. You create a new replicated Kafka topic called my-example-topic, then you create a Kafka producer that uses this topic to send records. You will send records to the Kafka producer. You will send records synchronously. Later, you will send records asynchronously. Prerequisites to this tutorial are Kafka Basics and Kafka clustering and failover basics.

Create Replicated Kafka Topic

Next, you need to create a replicated topic.


#!/usr/bin/env bash
cd ~/kafka-training

## Create topics
kafka/bin/kafka-topics.sh --create \
    --replication-factor 3 \
    --partitions 13 \
    --topic my-example-topic \
    --zookeeper  localhost:2181

## List created topics
kafka/bin/kafka-topics.sh --list \
    --zookeeper localhost:2181

Above we create a topic named my-example-topic with 13 partitions and a replication factor of 3. Then we list the Kafka topics.

Runs create-topic.sh as follows.

Output from running create-topic.sh

$ ./create-topic.sh
Created topic "my-example-topic".

Gradle Build Script

For this example, we use gradle to build the project.


group 'cloudurable-kafka'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8

repositories {

dependencies {
    compile 'org.apache.kafka:kafka-clients:'
    compile 'ch.qos.logback:logback-classic:1.2.2'

Notice that we import the jar file kafka-clients: Apache Kafka uses sl4j so to setup logging we use logback (ch.qos.logback:logback-classic:1.2.2).

Construct a Kafka Producer

To create a Kafka producer, you will need to pass it a list of bootstrap servers (a list of Kafka brokers).

You will also specify a client.idthat uniquely identifies this Producer client.

In this example, we are going to send messages with ids. The message body is a string, so we need a record value serializer as we will send the message body in Kafka’s records value field. The message id (long), will be sent as Kafka’s records key. You will need to specify a Key serializer and a value serializer, which Kafka will use to encode the message id as a Kafka record key, and the message body as the Kafka record value.

Common Kafka imports and constants

Next, we will import the Kafka packages and define a constant for the topic and a constant to define the list of bootstrap servers that the producer will connect.

KafkaProducerExample.java – imports and constants


package com.cloudurable.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class KafkaProducerExample {

    private final static String TOPIC = "my-example-topic";
    private final static String BOOTSTRAP_SERVERS =

Notice that KafkaProducerExample imports LongSerializer which gets configured as the Kafka record key serializer, and imports StringSerializer which gets configured as the record value serializer. The constant BOOTSTRAP_SERVERS is set tolocalhost:9092, localhost:9093, and localhost:9094 which are the three Kafka servers that we started up in the last lesson. Go ahead and make sure all three Kafka servers are running. The constant TOPIC is set to the replicated Kafka topic that we just created.

Create Kafka Producer to send records

Now, that we imported the Kafka classes and defined some constants, let’s create a Kafka producer.

KafkaProducerExample.java – Create a Producer to send Records


public class KafkaProducerExample {
    private static Producer<Long, String> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
        return new KafkaProducer<>(props);

To create a Kafka producer, you use java.util.Properties and define certain properties that we pass to the constructor of a KafkaProducer.

Above KafkaProducerExample.createProducer sets the BOOTSTRAP_SERVERS_CONFIG (“bootstrap.servers) property to the list of broker addresses we defined earlier. BOOTSTRAP_SERVERS_CONFIG value is a comma-separated list of host/port pairs that the Producer uses to establish an initial connection to the Kafka cluster. The producer uses all servers in the cluster no matter which ones we list here. This list only specifies the initial Kafka brokers used to discover the full set of servers of the Kafka cluster. If a server in this list is down, the producer will just go to the next broker in the list to discover the full topology of the Kafka cluster.

The CLIENT_ID_CONFIG (“client.id”) is an id to pass to the server when making requests so the server can track the source of requests beyond just IP/port by passing a producer name for things like server-side request logging.

The KEY_SERIALIZER_CLASS_CONFIG (“key.serializer”) is a Kafka Serializer class for Kafka record keys that implements the Kafka Serializer interface. Notice that we set this to LongSerializer as the message ids in our example are longs.

The VALUE_SERIALIZER_CLASS_CONFIG (“value.serializer”) is a Kafka Serializer class for Kafka record values that implements the Kafka Serializer interface. Notice that we set this to StringSerializer as the message body in our example are strings.

Sending Approaches

The asynchronous approach

producer.send(record, new Callback(){
    onComplete(RecordMetadata rm, Exception ex){...}

gives you better throughput compared to the synchronous.

RecordMetadata rm = producer.send(record).get();

since you don’t wait for acknowledgments in the first case.

Also in an asynchronous way ordering is not guaranteed, whereas in the synchronous it is – message is sent only after the acknowledgment is received.

Another difference could be that in a synchronous call in case of an exception you can stop sending messages straightaway after the exception occurs, whereas in the second case some messages will be sent before you discover that something is wrong and perform some actions.

Also, note that in the asynchronous approach the number of messages which are “in flight” is controlled by max.in.flight.requests.per.connection parameter.

Apart from synchronous and asynchronous approaches you can use the Fire and Forget approach, which is almost the same as synchronous, but without processing the returned metadata – just send the message and hope that it will reach the broker (knowing that most likely it will happen, and producer will retry in case of recoverable errors), but there is a chance that some messages will be lost:

RecordMetadata rm = producer.send(record);

To summarize:

  • Fire and Forget – the fastest one, but some messages could be lost;
  • Synchronous – slowest, use it if you cannot afford to lose messages;
  • Asynchronous – something in between.

Send records synchronously with Kafka Producer

Kafka provides a synchronous send method to send a record to a topic. Let’s use this method to send some message ids and messages to the Kafka topic we created earlier.

KafkaProducerExample.java – Send Records Synchronously


public class KafkaProducerExample {

  static void runProducer(final int sendMessageCount) throws Exception {
      final Producer<Long, String> producer = createProducer();
      long time = System.currentTimeMillis();

      try {
          for (long index = time; index < time + sendMessageCount; index++) {
              final ProducerRecord<Long, String> record =
                      new ProducerRecord<>(TOPIC, index,
                                  "Hello Mom " + index);

              RecordMetadata metadata = producer.send(record).get();

              long elapsedTime = System.currentTimeMillis() - time;
              System.out.printf("sent record(key=%s value=%s) " +
                              "meta(partition=%d, offset=%d) time=%d\n",
                      record.key(), record.value(), metadata.partition(),
                      metadata.offset(), elapsedTime);

      } finally {

The above just iterates through a for loop, creating a ProducerRecord sending an example message ("Hello Mom " + index) as the record value and the for loop index as the record key. For each iteration, runProducer calls the send method of the producer(RecordMetadata metadata = producer.send(record).get()). The send method returns a Java Future.

The response RecordMetadata has ‘partition’ where the record was written and the ‘offset’ of the record in that partition.

Notice the call to flush and close. Kafka will auto flush on its own, but you can also call flush explicitly which will send the accumulated records now. It is polite to close the connection when we are done.

Running the Kafka Producer

Next, you define the main method.

KafkaProducerExample.java – Running the Producer


public static void main(String... args) throws Exception {
    if (args.length == 0) {
    } else {

The main method just calls runProducer.

Send records asynchronously with Kafka Producer

Kafka provides an asynchronous send method to send a record to a topic. Let’s use this method to send some message ids and messages to the Kafka topic we created earlier. The big difference here will be that we use a lambda expression to define a callback.

KafkaProducerExample.java – Send Records Asynchronously with Kafka Producer


static void runProducer(final int sendMessageCount) throws InterruptedException {
    final Producer<Long, String> producer = createProducer();
    long time = System.currentTimeMillis();
    final CountDownLatch countDownLatch = new CountDownLatch(sendMessageCount);

    try {
        for (long index = time; index < time + sendMessageCount; index++) {
            final ProducerRecord<Long, String> record =
                    new ProducerRecord<>(TOPIC, index, "Hello Mom " + index);
            producer.send(record, (metadata, exception) -> {
                long elapsedTime = System.currentTimeMillis() - time;
                if (metadata != null) {
                    System.out.printf("sent record(key=%s value=%s) " +
                                    "meta(partition=%d, offset=%d) time=%d\n",
                            record.key(), record.value(), metadata.partition(),
                            metadata.offset(), elapsedTime);
                } else {
        countDownLatch.await(25, TimeUnit.SECONDS);
    }finally {

Notice the use of a CountDownLatch so we can send all N messages and then wait for them all to send.

Async Interface Callback and Async Send Method

Kafka defines a Callback interface that you use for asynchronous operations. The callback interface allows code to execute when the request is complete. The callback executes in a background I/O thread so it should be fast (don’t block it). The onCompletion(RecordMetadata metadata, Exception exception) gets called when the asynchronous operation completes. The metadata gets set (not null) if the operation was a success, and the exception gets set (not null) if the operation had an error.

The async send method is used to send a record to a topic, and the provided callback gets called when the send is acknowledged. The send method is asynchronous, and when called returns immediately once the record gets stored in the buffer of records waiting to post to the Kafka broker. The send method allows sending many records in parallel without blocking to wait for the response after each one.

Since the send call is asynchronous it returns a Future for the RecordMetadata that will be assigned to this record. Invoking get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the recordKafkaProducer

Conclusion Kafka Producer example

We created a simple example that creates a Kafka Producer. First, we created a new replicated Kafka topic; then we created Kafka Producer in Java that uses the Kafka replicated topic to send records. We sent records with the Kafka Producer using async and sync send methods.

Review Kafka Producer

What does the Callback lambda do?

The callback gets notified when the request is complete.

What will happen if the first server is down in the bootstrap list? Can the producer still connect to the other Kafka brokers in the cluster?

The producer will try to contact the next broker on the list. Any of the brokers once contacted, will let the producer know about the entire Kafka cluster. The Producer will connect as long as at least one of the brokers on the list is running. If you have 100 brokers and two of the brokers in a list of three servers in the bootstrap list are down, the producer can still use the 98 remaining brokers.

When would you use Kafka async send vs. sync send?

If you were already using an async code (Akka, QBit, Reakt, Vert.x) base, and you wanted to send records quickly.

Why do you need two serializers for a Kafka record?

One of the serializers is for the Kafka record key, and the other is for the Kafka record value.

Amir Masoud Sefidian
Amir Masoud Sefidian
Machine Learning Engineer

Comments are closed.