Get Started Free
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

KTable

This module defines the KTable, explains how it differs from a KStream, and covers its basic operations, as well as its GlobalKTable variant.

Update Streams

The module Basic Operations defined event streams and mentioned that keys across records in event streams are completely independent of one another, even if they are identical.

Update Streams are the exact opposite: if a new record comes in with the same key as an existing record, the existing record will be overwritten.

update-streams

This means that when using a KTable, keys are required, although they aren't required when using a KStream. By overwriting records, a KTable creates a completely different data structure from a KStream, even given the same source records.

Defining a KTable

To define a KTable, you use a StreamsBuilder, as with a KStream, but you call builder.table instead of builder.stream. With the builder.table method, you provide an inputTopic, along with a Materialized configuration object specifying your SerDes (this replaces the Consumed object that you use with a KStream):

 StreamsBuilder builder = new StreamsBuilder();
 KTable<String, String> firstKTable = 
    builder.table(inputTopic, 
    Materialized.with(Serdes.String(), Serdes.String()));

KTable Operations

The KTable API has operations similar to those of the KStream API, including mapping and filtering.

Mapping

ktable-mapping

As with KStream, mapValues transforms values and map lets you transform both keys and values.

firstKTable.mapValues(value -> ..)
firstKTable.map((key,value) -> ..)

Filtering

As with KStream, the filter operation lets you supply a predicate, and only records that match the predicate are forwarded to the next node in the topology:

firstKTable.filter((key, value) -> ..)

ktable-filtering

GlobalKTable

A GlobalKTable is built using the GlobalKTable method on the StreamBuilder. As with a regular KTable, you pass in a Materialized configuration with the SerDes:

 StreamsBuilder builder = new StreamsBuilder();
 GlobalKTable<String, String> globalKTable = 
    builder.globalTable(inputTopic, 
    Materialized.with(Serdes.String(), Serdes.String()));

The main difference between a KTable and a GlobalKTable is that a KTable shards data between Kafka Streams instances, while a GlobalKTable extends a full copy of the data to each instance. You typically use a GlobalKTable with lookup data. There are also some idiosyncrasies regarding joins between a GlobalKTable and a KStream; we’ll cover these later in the course.

Use the promo code STREAMS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

KTable

Hi, I'm Sophie Blee-Goldman with Confluent. In this module, we're going to talk about KTables. So, in the previous module, we talked about event streams, which are series or sequences of key value pairs, which are independent of each other. In contrast to that, an update stream is also sequences of key value pairs, but instead of complimentary events that each represent a single physical event, an update stream is an update that is applied to a previous value. So in this case, records that come in with the same key are actually updating the value of the previous record with that same key. Here, we see two updates, one for the key A and one for the key B. Now, in this update stream, as opposed to the event stream earlier, we do actually only have two final events at the end. One is the latest value of A and one is the latest value of B. So each incoming event replaces the previous one. So how do we actually define a KTable? It's similar to a KStream, but it is a table. And again, you can use the streams builder. So the stream builder can be used for any number of KStreams, and it can also be used for any number of KTables in the same application. The difference with the KStream versus the KTable is that we will now be using the builder.table syntax. Now, again, you pass in the name of a topic and this is the topic that the table will be representing. So unlike a KStream, a KTable can only be subscribed to a single topic at once, And that is the topic that the table really represents. So like we said, a table is representing the latest value of each record. And so unlike a stream, it needs to be storing all of those values somewhere so that it knows what was the latest value at any given time. That means a KTable is going to be backed by a state store. And we'll get into that a little bit in a later module. State store is just a copy of the events that are in the topic that the table is built from. You store on disc, so that you can look up and track the latest value of each of the records in that topic. So to build a KTable, you just call builder.table, pass in the topic, and again, pass in a configuration object that just tells Kafka Streams what the type is and how to read the events in that input topic. Now, an interesting thing with the KTable that is different from the KStream is that a KTable by default does not forward every change. So in the case of a KStream, every event is its own event and every event has its own meeting. So we always forward new events. Whereas with the KTable, all we really care about is what is the latest value for this key? For KTable, it doesn't really matter what each event is that's coming in. All we care about is what is the latest value for that key, for that record. We might buffer the updates for that table in a cache like you might have with any other key value store. And that cache, only when it gets flushed do these updates get forwarded further down the processor topology. Now by default, the cache gets flushed every 30 seconds, which is the commit interval. That's something that you, the user, would define. And that just determines how quickly you might see updates from this KTable. You can set it to zero and see every update, or you can set it to longer and only really get the updates when you need them. And you'll see an example of the Ktable in the exercise later. Now, like the KStream, many of the same simple operators apply. You can map values or map entire key value records. Similarly, the map values and the map operator do this. You specify how the value changes or how the entire key value changes, and you can change the type, but you should not be changing the original record itself. And again, there's also a filter operator for KTables as well, where you can choose to completely drop any updates as they come in. For example, if you think the data is corrupted or invalid for whatever reason, a filter might be an appropriate way to filter out anything that you don't want to cause an update. Now, unlike the KStream, there's actually a special kind of KTable, and this is called a global KTable. To understand the difference between a KTable and a global KTable, you really need to dig down a little deeper into the Kafka Streams architecture, and this really has to do with partitioning. So a topic that we discussed earlier lives on a broker. Now, a topic can be anything that contains events that are related that the user defines. So this might be anything from just a few events to hundreds of thousands or even millions or more events. And that's kind of a lot of data. So what Kafka does is divide these topics up into partitions. A partition is just a logical subset of the data and a topic that is going to be partitioned by key. And that just means that all events with the same key end up in the same partition. So in Kafka Streams partitions are important because Kafka Streams will only deal with one partition at a time. A typical KTable will only see the subset of data for one partition of that topic at a time. And this allows Kafka Streams to scale. It gives it that property because you can increase your number of instances and each one of them will handle only some subset of the total data, as opposed to trying to handle all 1 million records on a single node or on your local laptop. So that's what a KTable does. A global KTable on the other hand actually does hold all the records across all partitions. So this can be useful when you want to actually get a view of all the data across the entire topic. Usually this is for data that is smaller in size and something that is static and not updated that frequently. So this might be something like zip codes, country codes, things like that, which don't really change in a continuous way, the way that event streams do. So now we'll see a exercise about how to use KTables in Kafka Streams.