Get Started Free
Untitled design (21)

Tim Berglund

VP Developer Relations

Kafka Consumers

Using the consumer API is similar in principle to the producer. You use a class called KafkaConsumer to connect to the cluster (passing a configuration map to specify the address of the cluster, security, and other parameters). Then you use that connection to subscribe to one or more topics. When messages are available on those topics, they come back in a collection called ConsumerRecords, which contains individual instances of messages in the form of ConsumerRecord objects. A ConsumerRecord object represents the key/value pair of a single Apache Kafka message.

try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props))
     {consumer.subscribe(Collections.singletonList(TOPIC));

     while (true) {
        ConsumerRecords<String, Payment> records = consumer.poll(100);
        for (ConsumerRecord<String, Payment> record : records) {
            String key = record.key();
            Payment value = record.value();
            System.out.printf("key = %s, value = %s%n", key, value);
        }
      }
    }

KafkaConsumer manages connection pooling and the network protocol just like KafkaProducer does, but there is a much bigger story on the read side than just the network plumbing. First of all, Kafka is different from legacy message queues in that reading a message does not destroy it; it is still there to be read by any other consumer that might be interested in it. In fact, it’s perfectly normal in Kafka for many consumers to read from one topic. This one small fact has a positively disproportionate impact on the kinds of software architectures that emerge around Kafka, which is a topic covered very well elsewhere.

Also, consumers need to be able to handle the scenario in which the rate of message consumption from a topic combined with the computational cost of processing a single message are together too high for a single instance of the application to keep up. That is, consumers need to scale. In Kafka, scaling consumer groups is more or less automatic.

Use the promo codes KAFKA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud storage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Consumers

Hey, Tim Berglund, to talk to you about Kafka consumers. Using the consumer API is similar in principle to the producer. In Java, you use a class called Kafka consumer to connect to the cluster. You pass it a map of configuration parameters to specify the address of the cluster or the address of a few brokers in the cluster, security parameters, any other kind of network tweaks you wanna make or any other config that you need to do. Then, you use that connection to subscribe to one or more topics we're reading here. So we wanna tell the consumer, Hey this topic is the one that I wanna see new messages for. And that can be a list of topics for the stout hearted, it can be a regular expression that matches some topics. These are all possible to do. When messages are available on those topics, they come back in a collection called, again in the Java case, consumer records and consumer records contains individual instances of messages in the form of instances of an object called consumer record. So three classes, that's really your surface area for the consumer. It seems so simple, right? And that consumer record object again, is that the actual key value pair of a single message. Kafka consumer manages connection pooling to the cluster, keeping up-to-date with cluster metadata. You did a node fail, did a partition get reassigned, all that kind of stuff that you don't wanna think about, it does that and manages the network protocol, just like the Kafka producer class does in the producer case. But there's a much bigger story on the read side than just the network plumbing. Consumers are a lot more complicated. So let's talk through some of that. First, a reminder Kafka is different from legacy message cues in that reading a message does not destroy it. It's still there to be read by any other consumer that might be interested in. In fact, it's perfectly normal in Kafka for many consumers to read from one topic. There's just nothing different that you have to do, you just write those different consumers you have them subscribed to that topic and away they go, they're all reading. And as they read, they don't destroy, they just read. Now there's another thing consumers need to worry about. On the producer's side, if I've got lots of events sources, I can have lots of producers writing into one topic. And as long as that topic is partitioned adequately, Kafka says, "Fine, Okay, there's all these producers. I got this." On the read side, what if I have one program that needs to be able to handle the scenario where the rate of message consumption from a topic combined with the computational cost of processing a single message in that topic put together are too much for a single instance of the application to be able to keep up with. This is to say nothing of fault tolerance, right? I've just kind of throwing that concern to the wind but just in terms of scalability I might need multiple consumers reading a topic, right? Put differently, consumers need to scale. In Kafka, scaling consumer groups is more or less automatic. This is really neat. A single instance of a consuming application will always receive the messages from all of the partitions in the topic it's subscribed to. Remember topics are probably things that have more than one partition. I've just got one instance of my consuming application. It's gonna get all those messages from all those partitions. Each message in each partition will come in order, messages between the partitions, I need roughly kind of in order but I don't have any guarantee of the ordering between partitions. I'm just strictly ordered inside a partition. They all come to that one instance. If I add a second instance of that consuming application, so same program running on a different server, a new container, whatever it is, doesn't matter. Just same program, connecting, presenting the same group ID, configuration parameter to the cluster that triggers an automatic rebalancing process in which the cluster of the Kafka cluster combined with those client nodes, they work together. They will attempt to distribute partitions fairly between the two instances, now two instances of the application. So that rebalancing process repeats each time you add or remove a consumer group instance whether that's by an operator initiated scale out like I'm trying to add nodes because we're scaling and that's all good news or something bad happens and a node dies. And we need to reshuffle and see where the work gets assigned to. That makes each consuming application horizontally and elastically scalable by default. And I emphasize there is nothing you do in the API to make this happen. There's a parameter called group ID that you have to specify when you're creating the consumer but you can't instantiate the consumer without specifying it. So it's not like you get a medal for picking the group idea. You have to do it. Then there's nothing else you have to do to make this scale out happen. You can just keep adding instances to that consumer application for as long as you want, or actually as long as you have topic partitions to distribute to them. So if you had a topic with 10 partitions, you could deploy as many as 10 consumer group instances and expect them all to participate in processing events. If you deployed an 11th instance, it would be idle 'cause there'll be no partitioned to assigned to it. Now this rebalanced protocol forms the backbone of a number of important Kafka features and provides critical and otherwise kind of difficult to implement functionality right out of the box. And it might help at this point to consider a contrast with traditional messaging systems. In a traditional message queue, you can often scale the number of consumers. It's not like this is a new idea with Kafka but then you usually miss out on ordering guarantees altogether. And in Kafka, if your key is not known, you still have that ordering by key guarantee, which is preserved as scale out the consumer group. In a traditional messaging topic, remember topic and queue being different things in a legacy messaging system, in a topic you keep ordering guarantees in place but then you sacrifice the ability to scale out consumers. In Kafka, consumer groups provide the ability to guarantee order delivery across an arbitrary number of consumers. So you get horizontal scale in the consuming application with the strongest ordering guarantee that is logically possible to preserve at scale. So there's a quick tour through Kafka consumers, again an elemental part of the Kafka ecosystem. And this is an API, so it's going to make a lot more sense if you get your hands dirty with some code.