Big data messaging with Kafka, Part 2

Use Kafka's partitions, message offsets, and consumer groups to handle up to millions of messages per day

In Part 1 you developed a couple of small-scale producer/consumer applications using Kafka. From these exercises you should be familiar with the basics of the Kafka messaging system. In Part 2 you'll learn how to use partitions to distribute load and scale your application horizontally, handling up to millions of messages per day. You'll also learn how Kafka uses message offsets to track and manage complex message processing, and how to protect your Kafka messaging system against failure should a consumer go down. We'll develop the example application from Part 1 for both publish-subscribe and point-to-point use cases.

Partitions in Kafka

Topics in Kafka can be subdivided into partitions. For example, while creating a topic named Demo, you might configure it to have three partitions. The server would create three log files, one for each of the demo partitions. When a producer published a message to the topic, it would assign a partition ID for that message. The server would then append the message to the log file for that partition only.

If you then started two consumers, the server might assign partitions 1 and 2 to the first consumer, and partition 3 to the second consumer. Each consumer would read only from its assigned partitions. You can see the Demo topic configured for three partitions in Figure 1.

A partitioned topic in Apache Kafka

Figure 1. A partitioned topic in Apache Kafka

To expand the scenario, imagine a Kafka cluster with two brokers, housed in two machines. When you partitioned the demo topic, you would configure it to have two partitions and two replicas. For this type of configuration, the Kafka server would assign the two partitions to the two brokers in your cluster. Each broker would be the leader for one of the partitions.

When a producer published a message, it would go to the partition leader. The leader would take the message and append it to the log file on the local machine. The second broker would passively replicate that commit log to its own machine. If the partition leader went down, the second broker would become the new leader and start serving client requests. In the same way, when a consumer sent a request to a partition, that request would go first to the partition leader, which would return the requested messages.

Benefits of partitioning

Consider the benefits of partitioning a Kafka-based messaging system:

  1. Scalability: In a system with just one partition, messages published to a topic are stored in a log file, which exists on a single machine. The number of messages for a topic must fit into a single commit log file, and the size of messages stored can never be more than that machine's disk space. Partitioning a topic lets you scale your system by storing messages on different machines in a cluster. If you wanted to store 30 gigabytes (GB) of messages for the Demo topic, for instance, you could build a Kafka cluster of three machines, each with 10 GB of disk space. Then you would configure the topic to have three partitions.
  2. Server-load balancing: Having multiple partitions lets you spread message requests across brokers. For example, If you had a topic that processed 1 million messages per second, you could divide it into 100 partitions and add 100 brokers to your cluster. Each broker would be the leader for single partition, responsible for responding to just 10,000 client requests per second.
  3. Consumer-load balancing: Similar to server-load balancing, hosting multiple consumers on different machine lets you spread the consumer load. Let's say you wanted to consume 1 million messages per second from a topic with 100 partitions. You could create 100 consumers and run them in parallel. The Kafka server would assign one partition to each of the consumers, and each consumer would process 10,000 messages in parallel. Since Kafka assigns each partition to only one consumer, within the partition each message would be consumed in order.

Two ways to partition

The producer is responsible for deciding what partition a message will go to. The producer has two options for controlling this assignment:

  • Custom partitioner: You can create a class implementing the org.apache.kafka.clients.producer.Partitioner interface. This custom Partitioner will implement the business logic to decide where messages are sent.
  • DefaultPartitioner: If you don't create a custom partitioner class, then by default the org.apache.kafka.clients.producer.internals.DefaultPartitioner class will be used. The default partitioner is good enough for most cases, providing three options:
    1. Manual: When you create a ProducerRecord, use the overloaded constructor new ProducerRecord(topicName, partitionId,messageKey,message) to specify a partition ID.
    2. Hashing(Locality sensitive): When you create a ProducerRecord, specify a messageKey, by calling new ProducerRecord(topicName,messageKey,message). DefaultPartitioner will use the hash of the key to ensure that all messages for the same key go to same producer. This is the easiest and most common approach.
    3. Spraying(Random Load Balancing): If you don't want to control which partition messages go to, simply call new ProducerRecord(topicName, message) to create your ProducerRecord. In this case the partitioner will send messages to all the partitions in round-robin fashion, ensuring a balanced server load.

Partitioning a Kafka application

For the simple producer/consumer example in Part 1, we used a DefaultPartitioner. Now we'll try creating a custom partitioner instead. For this example, let's assume that we have a retail site that consumers can use to order products anywhere in the world. Based on usage, we know that most consumers are in either the United States or India. We want to partition our application to send orders from the US or India to their own respective consumers, while orders from anywhere else will go to a third consumer.

To start, we'll create a CountryPartitioner that implements the org.apache.kafka.clients.producer.Partitioner interface. We must implement the following methods:

  1. Kafka will call configure() when we initialize the Partitioner class, with a Map of configuration properties. This method initializes functions specific to the application's business logic, such as connecting to a database. In this case we want a fairly generic partitioner that takes countryName as a property. We can then use configProperties.put("partitions.0","USA") to map the flow of messages to partitions. In the future we can use this format to change which countries get their own partition.
  2. The Producer API calls partition() once for every message. In this case we'll use it to read the message and parse the name of the country from the message. If the name of the country is in the countryToPartitionMap, it will return partitionId stored in the Map. If not, it will hash the value of the country and use it to calculate which partition it should go to.
  3. We call close() to shut down the partitioner. Using this method ensures that any resources acquired during initialization are cleaned up during shutdown.

Note that when Kafka calls configure(), the Kafka producer will pass all the properties that we've configured for the producer to the Partitioner class. It is essential that we read only those properties that start with partitions., parse them to get the partitionId, and store the ID in countryToPartitionMap.

Below is our custom implementation of the Partitioner interface.

Listing 1. CountryPartitioner

    public class CountryPartitioner implements Partitioner {
        private static Map<String,Integer> countryToPartitionMap;

        public void configure(Map<String, ?> configs) {
            System.out.println("Inside CountryPartitioner.configure " + configs);
            countryToPartitionMap = new HashMap<String, Integer>();
            for(Map.Entry<String,?> entry: configs.entrySet()){
                    String keyName = entry.getKey();
                    String value = (String)entry.getValue();
                    System.out.println( keyName.substring(11));
                    int paritionId = Integer.parseInt(keyName.substring(11));

        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                             Cluster cluster) {
            List partitions = cluster.availablePartitionsForTopic(topic);
            String valueStr = (String)value;
            String countryName = ((String) value).split(":")[0];
                //If the country is mapped to particular partition return it
                return countryToPartitionMap.get(countryName);
            }else {
                //If no country is mapped to particular partition distribute between remaining partitions
                int noOfPartitions = cluster.topics().size();
                return  value.hashCode()%noOfPartitions + countryToPartitionMap.size() ;

        public void close() {}

The Producer class in Listing 2 (below) is very similar to our simple producer from Part 1, with two changes marked in bold:

  1. We set a config property with a key equal to the value of ProducerConfig.PARTITIONER_CLASS_CONFIG, which matches the fully qualified name of our CountryPartitioner class. We also set countryName to partitionId, thus mapping the properties that we want to pass to CountryPartitioner.
  2. We pass an instance of a class implementing the org.apache.kafka.clients.producer.Callback interface as a second argument to the producer.send() method. The Kafka client will call its onCompletion() method once a message is successfully published, attaching a RecordMetadata object. We'll be able to use this object to find out which partition a message was sent to, as well as the offset assigned to the published message.

Listing 2. A partitioned producer

public class Producer {
    private static Scanner in;
    public static void main(String[] argv)throws Exception {
        if (argv.length != 1) {
            System.err.println("Please specify 1 parameters ");
        String topicName = argv[0];
        in = new Scanner(;
        System.out.println("Enter message(type exit to quit)");

        //Configure the Producer
        Properties configProperties = new Properties();

        org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
        String line = in.nextLine();
        while(!line.equals("exit")) {
            ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, null, line);
            producer.send(rec, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset());
            line = in.nextLine();

Assigning partitions to consumers

The Kafka server guarantees that a partition is assigned to only one consumer, thereby guaranteeing the order of message consumption. You can manually assign a partition or have it assigned automatically.

If your business logic demands more control, then you'll need to manually assign partitions. In this case you would use KafkaConsumer.assign(<listOfPartitions>) to pass a list of partitions that each consumer was interested in to the Kakfa server.

Having partitions assigned automatically is the default and most common choice. In this case, the Kafka server will assign a partition to each consumer, and will reassign partitions to scale for new consumers.

1 2 3 Page 1
Shop Tech Products at Amazon