Get Started Free
‹ Back to courses
course: Apache Kafka® 101

Hands On: ksqlDB

6 min
Danica Fine

Danica Fine

Senior Developer Advocate (Presenter)

Hands On: ksqIDB

In this exercise, you’ll learn how to manipulate your data using ksqlDB. Up until now, we’ve been producing data to and reading data from an Apache Kafka topic without any intermediate steps. With data transformation and aggregation, we can do so much more!

In the last exercise, we created a Datagen Source Connector to produce a stream of orders data to a Kafka topic, serializing it in Avro using Confluent Schema Registry along the way. This exercise relies on the data that our connector is producing, so if you haven’t completed the previous exercise, we encourage you to do so beforehand.

Before we start, make sure that your Datagen Source Connector is still up and running.

  1. From the cluster landing page in the Confluent Cloud Console, select ksqlDB from the left-hand menu. Select Create cluster myself.

    ksqIDB-create-cluster-myself

  2. Choose Global access from the “Access Control” page; select Continue to give the ksqlDB cluster a name and then Launch cluster. When the ksqlDB cluster is finished provisioning, you’ll be taken to the ksqlDB editor. From this UI, you can enter SQL statements to access, aggregate, and transform your data.

  3. Add the orders topic from the previous exercise to the ksqlDB application. Register it as a stream by running:

    CREATE STREAM orders_stream WITH (
      KAFKA_TOPIC='orders', 
      VALUE_FORMAT='AVRO',
      PARTITIONS=6,
      TIMESTAMP='ordertime');

    ksqIDB-cluster

  4. When the stream has been created, navigate to the Streams tab and select the orders_stream to view more details about the stream, including fields and their types, how the data is serialized, and information about the underlying topic.

    orders-stream

  5. Click Query stream to be taken directly back to the editor, where the editor will be automatically populated with a query for that stream of data. If your Datagen connector is still running (which it should be), you’ll also see data being output in real time at the bottom of the screen.

    query-stream

  6. The ordertime field of each message is in milliseconds since epoch, which isn’t very friendly to look at. To transform this field into something more readable, execute the following push query (which will output results as new messages are written to the underlying Kafka topic):

    SELECT 
        TIMESTAMPTOSTRING(ORDERTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS ORDERTIME_FORMATTED,
        orderid,
        itemid,
        orderunits,
        address->city, 
        address->state,
        address->zipcode
    from ORDERS_STREAM;

    This query also extracts the nested data from the address struct.

  7. The results look good, so add a CREATE STREAM line to the beginning of the query to persist the results to a new stream.

    CREATE STREAM ORDERS_STREAM_TS AS
    SELECT 
        TIMESTAMPTOSTRING(ORDERTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS ORDERTIME_FORMATTED,
        orderid,
        itemid,
        orderunits,
        address->city, 
        address->state,
        address->zipcode 
    from ORDERS_STREAM;
  8. Using data aggregations, you can determine how many orders are made per state. ksqlDB also makes it easy to window data so that you can easily determine how many orders are made per state per week. Run the following ksqlDB statement to create a table:

    CREATE TABLE STATE_COUNTS AS 
    SELECT 
      address->state,
      COUNT_DISTINCT(ORDERID) AS DISTINCT_ORDERS
    FROM ORDERS_STREAM
    WINDOW TUMBLING (SIZE 7 DAYS) 
    GROUP BY address->state;
  9. When the table has been created, navigate to the Tables tab above to see data related to the table.

  10. Click Query table to be taken back to the ksqlDB editor and start an open-ended query that will provide an output for each update to the table; as new orders are made, you’ll see an ever-increasing number of orders per state per defined window period.

  11. In order to just see the latest values from a table—rather than an open-ended query—we can execute a pull query. We can see a current snapshot containing the states that had over two orders per one-week period by running the following:

    SELECT
        *
    FROM STATE_COUNTS
    WHERE DISTINCT_ORDERS > 2;

    pull-query

We've only scratched the surface of what you can do with ksqlDB, but we’ve shown you how to create streams and tables as well as how to transform and aggregate your data. With these tools in your toolbelt, you’ll be able to hit the ground running with your next ksqlDB application. If you’d like to learn about ksqlDB in more depth, definitely check out the ksqlDB 101 and Inside ksqlDB courses.

Note

A final note to you as we wrap up the exercises for this course: Don’t forget to delete your resources and cluster in order to avoid exhausting the free Confluent Cloud usage that is provided to you.

Use the promo codes KAFKA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud storage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Hands On: ksqlDB

In this exercise, you'll learn how to manipulate your data using ksqlDB. Up until now, we've been producing data to and reading data from a Kafka topic without any extra steps. With data transformation and aggregation thrown into the mix, we can do so much more. In the last exercise, we created a datagen source connector to produce a stream of orders data to a Kafka topic, serializing it in Avro using the Confluent Schema Registry along the way. This exercise relies on the data that our connector is producing. So if you haven't completed the previous exercise, do so now. And when you're ready, we can dive in. Before we start, you should make sure that your datagen source connector is still up and running. Let's look more closely at the data that our connector is generating by first creating a ksqlDB application. Navigate to ksqlDB and select Add Application. It will take a few minutes to provision your ksqldb application, but once it's done, you'll be taken to the ksqlDB editor. From this UI, you can enter SQL statements to access, aggregate and transform your data. To begin, let's use the orders topic from the previous exercise and write a query to register this data as a stream. Since the data from the previous exercise was serialized in Avro, we need to let ksqlDB know how to parse that data using the value format configuration. An awesome benefit of this is that we don't need to specify the schema of the stream since as a consumer, ksqlDB can simply get the schema directly from the schema registry. Contrast this to if we were using JSON or delimited data, then we'd need to type in every single field and data type with all the time and inevitable errors that that would entail. Note how we've also specified a value for the timestamp parameter. Because we're going to process the data with some time-based logic, it's important that ksqlDB understands the logical time field within the data to use. Click Run Query to create a new stream object which will read the data from the underlying orders topic. After the stream has been created, you see it populated in the sidebar with some quick reference details. Now we can go to the streams tab and see the ksqlDB stream that was created reading from the orders topic. By selecting the stream, we can see more details about it including the of fields and their types, how the data is serialized and information on the topic itself. Click Query Stream to be taken directly back to the editor where the editor will be automatically populated with a query for that stream of data. If your datagen connector is still running which it should be, you'll also see data being output in real time at the bottom of the screen. You'll notice that the time for each order is in milliseconds since epoch which isn't very friendly to look at. Let's write a query to transform this data so the timestamp field is more human readable. For now, we'll use a push query telling ksqlDB to continue to output results as new messages are written to the underlying Kafka topic. we specify a push query using emit changes. In addition to formatting the timestamp, we'll also extract the nested data from the address struct. After running this query, we see that the results look great. The messages timestamps are formatted exactly as we want them to be and the address fields are no longer nested. Let's persist this result to a topic by creating a new stream from it. To do so, we'll add the create stream syntax to the beginning of the query. Click Run Query again to register this new stream. Note that compared to the last ksql query where we were creating a stream that read from an underlying topic, in this case we're creating a new topic which will be populated with the data described in the select portion of the ksql statement. When we query this stream, you'll start to see data with the newly formatted time stamp column. All right, up until this point, we've been exploring data transformations. Let's switch gears and see how data aggregation works. Each order has an associated state from its address. We can easily conduct an aggregation to see how many orders are made per state. Further, using windowing, we can conduct this aggregation over windows of a specific size. Now we can see the number of orders per state per seven day period. Running this query will create a table. Again, you'll see the new table populated on the right hand sidebar and we can navigate to the tables tab and see this newly created table. Let's query it. When we run a query against a table with emit changes, we are starting an open-ended query that will provide an output for each update to the table. Thus, as new orders are made, you'll see an ever increasing number of order units per state per defined window period. In order to just see the latest values from a table rather than an open ended query, we can execute a pull query. We can see a current snapshot containing the states that had over two orders per one week period by running the following. This is really just the surface of what you can do with ksqlDB. We've shown you how to create streams and tables as well as how to transform and aggregate your data. With these tools in your tool belt, you'll really be able to hit the ground running with your next ksqlDB application. A final note to you as we wrap up the exercises for this course. Don't forget to delete your resources and cluster in order to avoid exhausting the free Confluent cloud usage that's provided to you. I hope you'll join us for another course as you continue on with your Kafka journey. See you there.