Get Started Free
‹ Back to courses
course: Apache Kafka® 101

Hands On: Consumers

4 min
Danica Fine

Danica Fine

Senior Developer Advocate (Presenter)

Hands On: Consumers

In practice, programmatically producing and consuming messages is an important way to interact with your Apache Kafka cluster and put data into motion. In the first exercise of this course, we gained experience consuming from and producing to a Kafka topic using the command line. With that experience under your belt, let’s explore a more programmatic way of consuming messages by writing a consumer script in Python.

We recommend following these exercises in order so that you have everything you need to successfully complete the exercise. If you haven’t already set up your CLI to connect to your Confluent Cloud cluster, take a look at the previous exercise to get up to speed.

  1. Ensure that Python 3, pip, and virtualenv are installed on your machine. If they’re not, check out the prerequisites from the getting started with Python guide.
  2. From a terminal window, activate a new virtual environment and install the confluent-kafka library.
virtualenv env
source env/bin/activate
pip install confluent-kafka
  1. Determine your cluster endpoint by running:
confluent kafka cluster describe

The output contains an endpoint field; take note of the hostname and port from this value, for example, pkc-abc12.us-central1.gcp.confluent.cloud:9092.

  1. You need an API key and secret in order to proceed. If you need a new one, make note of the cluster ID that was printed in step 3 and use it to run:
confluent api-key create --resource {ID}

Then set the key using:

confluent api-key use {API Key} --resource {ID}
  1. Using your favorite text editor, create a configuration file named config.ini and populate it with the following, substituting in your endpoint, key and secret values:
    [default]
    bootstrap.servers=< Endpoint >
    security.protocol=SASL_SSL
    sasl.mechanisms=PLAIN
    sasl.username=< API Key >
    sasl.password=< API Secret >
    
    [consumer]
    group.id=python_kafka101_group_1
    # 'auto.offset.reset=earliest' to start reading from the beginning of
    # the topic if no committed offsets exist.
    auto.offset.reset=earliest
  2. Create another file named consumer.py that contains the following:
#!/usr/bin/env python
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Consumer
if __name__ == '__main__':
    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])
    config.update(config_parser['consumer'])

    # Create Consumer instance
    consumer = Consumer(config)

    # Subscribe to topic
    topic = "poems"
    consumer.subscribe([topic])

    # Poll for new messages from Kafka and print them.
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                print("Waiting...")
            elif msg.error():
                print("ERROR: %s".format(msg.error()))
            else:
                # Extract the (optional) key and value, and print.
                print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))
    except KeyboardInterrupt:
        pass
    finally:
        # Leave group and commit final offsets
        consumer.close()
  1. Make the script executable and run:
chmod u+x consumer.py
./consumer.py config.ini
  1. Observe the messages being output and stop the consumer script using ctrl+C.

This script was deliberately simple, but the steps of configuring your consumer, subscribing to a topic, and polling for events are common across all consumers. If you want more details on additional configuration changes that you may want to play around with in your consumer, feel free to check out the getting started guides on Confluent Developer.

Use the promo codes KAFKA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud storage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Hands On: Consumers

In practice, programmatically producing and consuming messages is an important way to interact with your Kafka cluster and put your data into motion. To get a better idea of how this is done, let's use Python to explore a programmatic method for consuming messages from a Kafka topic. But before we begin, it's important to note that we recommend following these exercises in order so that you have everything you need to successfully complete the exercise. If you haven't already set up your command line interface to connect to your Confluent Cloud Cluster, take a look at the previous exercise to get up to speed. When you're ready, we can dive in. To begin, you should have Python 3 installed on your machine along with PIP and VirtualEnv. If you're having trouble getting any of that set up, see the exercise notes for a link to detailed instructions. With these prerequisites in place, from a terminal window, we'll get started with a new virtual environment. When that's ready, you'll want to install the Confluent Kafka Library using PIP. Next we'll need some information about our Kafka Cluster in order to connect to it with a Python client. From the terminal, run confluent Kafka cluster describe to get details about your Kafka environment and cluster. We're specifically interested in the endpoint field that's printed. Leave the scheme behind, just copy the host and port. We also need an API key and secret in order to continue. If you're following along an order, you should have already set up and stored an API key and secret for use. However, if you need a new API key and secret, simply run the API-key create command shown here. This will generate a new key and secret which you should then store for safekeeping. Tell the CLI to use this new key, by running the use API-key command. Now you'll create a configuration file called config.ini with a few key pieces of information. Endpoint should be exactly the field that we just captured and you'll need to enter your API key and secret. Next, in your favorite text editor, let's write a simple consumer script and call it consumer.py. Before we consume the messages from the topic, let's quickly review what the script is doing. First, we're parsing the configuration file that we just created and passing that into a new consumer instance. Then we subscribe to a set of topics. In this case, it's just the poems topic that we've already been working with. Next, we create a loop that will poll messages from the topic and react, either by printing a helpful status or the Kafka message itself. Finally, with the keyboard interrupt, we end the loop, close the consumer, and exit the script. Make the script executable, and then run the script by specifying the config file we created. You'll see all the messages that we produced earlier. So that script was deliberately simple, but the steps of configuring your consumer subscribing to a topic and polling for events are pretty much common across all consumers. If you want more details on additional configuration changes that you might wanna play around with in your own consumers, feel free to check out the getting started guides on Confluent Developer.