Get Started Free
‹ Back to courses
course: Event Sourcing and Event Storage with Apache Kafka®

Hands On: Trying Out Event Sourcing with Confluent Cloud

3 min
Anna Mcdonald

Anna McDonald

Principal Customer Success Technical Architect (Presenter)

Ben Stopford

Ben Stopford

Lead Technologist, Office of the CTO (Author)

Hands On: Trying Out Event Sourcing with Confluent Cloud

Simulate event sourcing by completing this exercise, which uses ksqlDB on Confluent Cloud. You'll begin by creating a stream of e-commerce events backed by an Apache Kafka topic. Then you'll write a streaming query to transform them into a table in ksqlDB. Finally, you'll query your summary table to fetch the contents of the shopping cart back to your application.

  1. Sign up for Confluent Cloud if you have not already done so, and respond to the verification email to confirm your account.

  2. On Confluent Cloud, open the shopping_cart_database that you set up in the module Thinking in Events (if you haven't set it up yet, please return there for instructions and a discount code). If you started your ksqlDB instance then, the status field should now say "up" and not "provisioning".

  3. Open the query editor and set two additional parameters under Add query properties:

    • auto.offset.reset = Earliest. This ensures that you always return to the start of a Kafka topic when you run a new query.
    • ksql.streams.commit.interval.ms = 3000. This overwrites the default of thirty seconds, making your queries more responsive.
  4. In the editor, create a stream to hold your shopping cart events with the following ksqlDB command:

    CREATE STREAM shopping_cart_events (customer VARCHAR, item VARCHAR, qty INT)
    WITH (kafka_topic='shopping_cart_events', value_format='json', partitions=1);
  5. Select Run query and look for the output below the editor confirming that the event stream was created.

    {
      "@type": "currentStatus",
      "statementText": "CREATE STREAM SHOPPING_CART_EVENTS (CUSTOMER STRING, ITEM STRING, QTY INTEGER) WITH (KAFKA_TOPIC='shopping_cart_events', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUE_FORMAT='JSON');",
      "commandId": "stream/`SHOPPING_CART_EVENTS`/create",
      "commandStatus": {
        "status": "SUCCESS",
        "message": "Stream created",
        "queryId": null
      },
      "commandSequenceNumber": 2,
      "warnings": []
    }
  6. Create four INSERT statements to push events into the shopping_cart_events topic:

    --add two pairs of pants
    INSERT INTO shopping_cart_events (customer, item, qty)
    VALUES ('bob', 'pants', 2);
    --add a t-shirt
    INSERT INTO shopping_cart_events (customer, item, qty)
    VALUES ('bob', 't-shirts', 1);
    --remove one pair of pants
    INSERT INTO shopping_cart_events (customer, item, qty)
    VALUES ('bob', 'pants', -1);
    --add a hat
    INSERT INTO shopping_cart_events (customer, item, qty)
    VALUES ('bob', 'hats', 1);	

    Select Run query.

  7. Use a SELECT statement to return your four events:

    SELECT * FROM shopping_cart_events EMIT CHANGES;

    Click Run query to see the results and then click Stop to terminate the push query.

  8. Create a ksqlDB query to transform your events into a summary table by using a SUM on the qty field, with an associated GROUP BY that aggregates the events by item:

    CREATE TABLE current_shopping_cart WITH (KEY_FORMAT='JSON') AS
      SELECT customer, item, SUM(qty) as total_qty 
      FROM   shopping_cart_events 
      GROUP BY customer, item 
      EMIT CHANGES;

    Click Run Query.

    The result is a summary table that reflects how many items are in the shopping cart right now. The removals should cancel the items that were added.

  9. Now run a query against the table just like you would with a regular database table:

    SELECT * FROM current_shopping_cart EMIT CHANGES;

    Click Run Query.

    You should see three records, not four, i.e. one record per item. Specifically, you should see only one pair of pants, as the two events for pants have been summarized into one row.

    Click Stop to terminate the push query.

  10. Go to Cluster settings on the left-hand side menu, then click Delete cluster. Enter your cluster's name, then select Continue.

  11. Delete your ksqlDB application by clicking ksqlDB on the left-hand side menu, then selecting Delete under Actions. Enter your application's name, then click Continue.

In this exercise, you used event sourcing to recreate the current state of a shopping cart from an unbounded event stream containing all user cart interactions.

Use the promo code EVENTS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Hands On: Trying Out Event Sourcing with Confluent Cloud

Go ahead and open the shopping cart database. Now that we know the status field says up and not provisioning. The first thing we need to do is set two configuration parameters. The offset reset should be set to earliest. This ensures that we will always return to the start of a Kafka topic when we run a new query. Next, we set the parameter ksql.streams.commit.interval.ms which we will configure to three seconds. This overwrites the default of 30 seconds, making our queries a bit more responsive. The first thing to do is to create a stream to hold our shopping cart events. The stream is materialized in a Kafka topic of the same name. So let's go ahead and run that statement. We can now see the output that confirms that the shopping cart event stream was created. We create four insert statements that push events into the shopping cart events topic. The first event adds two pairs of pants to the cart. The next adds a t-shirt. Then we remove one of the pairs of pants. Finally, we add a hat. So let's go ahead and run that query. Run the select statement. We should see four events returned. Indeed. We see four events were returned. We create the ksqlDB query that transforms the events into the summary table. To do this, we must include a SUM on the quantity field with an associated GROUP BY that aggregates the events by item. The result will be a table that reflects how many items are in the shopping cart right now. So the removal should cancel the items we added. Now go ahead and run that query. There you go. Our current shopping cart summary table has been created. Run a query against the table, just like you would query a regular database table. Select start from the current shopping cart, admitting changes. So we expect to see three records, not four. One record per item with only one pair of pants, as the two events for pants have been summarized into one row. That's what we see. There is one hat, one pair of pants, one t-shirt. So, in summary whether your name is Sanjana or Bob, we simulate events coming from the user, adding and removing items from their shopping cart. We store those events in a Kafka topic. We then created a streaming query to transform them into a table in ksqlDB. This all happens at right time. Finally, we query the summary table to get the contents of the shopping cart back to our application. And that's it. In the next module, we'll be discussing incorporating event storage into your system design.