Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Transforming Data in Flink

Overview

The real power of Flink comes from its ability to transform data in a distributed streaming pipeline. It contains a variety of operators that enable both the transformation and the distribution of data. These operators include common functions such as map, flat map, and filter, but they also include more advanced techniques. This video will show a few of the basic operations that can be performed on a Flink datastream as well as how they might fit into a streaming pipeline.

Topics:

  • Flink Datastream Operators
  • Process Functions and Keyed Process Functions
  • Map
  • FlatMap
  • Filter
  • KeyBy
  • Reduce

Code

ProcessFunction - Mapping Elements

public class MyProcessFunction 
  extends ProcessFunction<Input, Output> {
  @Override
  public void processElement(
    Input input,
    ProcessFunction<Input, Output>.Context ctx,
    Collector<Output> collector
  ) {
    collector.collect(new Output(input));
  }
}

ProcessFunction - Flattening Mapped Elements

public class MyProcessFunction 
  extends ProcessFunction<Input[], Output> {
  @Override
  public void processElement(
    Input[] collection,
    ProcessFunction<Input[], Output>.Context ctx,
    Collector<Output> collector
  ) {
    for(Input input : collection) {
      collector.collect(new Output(input));
    }
  }
}

ProcessFunction - Filtering Elements

  public class MyProcessFunction 
  extends ProcessFunction<Input, Input> {
  @Override
  public void processElement(
    Input input,
    ProcessFunction<Input, Input>.Context ctx,
    Collector<Input> collector
  ) {
    if(condition) {
      collector.collect(input);
    }
  }
}

Process

stream.process(new MyProcessFunction());

Map

stream.map(input -> new Output(input));
DataStream<Double> doubles = integers.map(
	input -> Double.valueOf(input) / 2
);

FlatMap

stream.flatMap((collection,collector) -> {
	for(Input input: collection) {
		collector.collect(new Output(input));
	}
});
DataStream<Integer> letterCount = sentences
	.map(input -> input.split(" "))
	.flatMap((words, collector) -> {
		for (String word : words) {
			collector.collect(word.length());
		}
	});

Filter

stream.filter(input -> condition);
DataStream<Integer> evenIntegers = integers
	.filter(input -> input % 2 == 0);

KeyBy

stream.keyBy(
	input -> input.getKey()
)

KeyedProcessFunction

class MyKeyedProcessFunction
  extends KeyedProcessFunction<String, Input, Output> {
  @Override
  public void processElement(
    Input input,
    KeyedProcessFunction<String, Input, Output>.Context ctx,
    Collector<Output> collector) {
      String key = ctx.getCurrentKey();
      ...
  }
}

Reduce

stream
  .keyBy(input -> input.key)
  .reduce((s1, s2) -> s1.merge(s2));
DataStream<Tuple2<String, Integer>> wordCountsByFirstLetter = 
	itemIdsAndCounts
		.keyBy(tuple -> tuple.f0)
		.reduce((l1, l2) -> new Tuple2(l1.f0, l1.f1 + l2.f1));

Resources

Use the promo code FLINKJAVA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Transforming Data in Flink

Hi, I'm Wade from Confluent. In this video, we're going to see a few of the ways that we can transform data as it moves through a Flink data stream. Previously, we've compared a Flink data stream to a plumbing system. The water pump is similar to our Flink source. The sink that the water drains into is similar to our Flink data sink, and the pipes in between represent all of the transformative operations in our stream. But what are those operations? And what form do they take? And what can we use them to accomplish? The Process Function The primary building block of our transformations is something known as a ProcessFunction. A ProcessFunction takes an Input and converts it to an Output. At its core, this is very simple, but it's also flexible enough that we can do a lot with it. In fact, most of the operations we'll be looking at in this section can be translated into a basic ProcessFunction. Much of the power of the ProcessFunction comes from the signature of the processElement method that it implements. The method takes an element that matches the first type defined in the generic signature, a context that contains some helper methods and objects, and a collector, which is basically a collection of the second type in the generic signature. This collector is what provides a lot of flexibility. Rather than returning a single element, we can return multiple elements or none at all. This allows us to do some interesting things. The most basic use case is to simply map from one type to another as we see here. Each time we receive an element, we convert it from the input to the output and then collect the result. But what if the input type was actually an array? In that case, we could convert each element in the array to the appropriate output type and collect each of the results. This would essentially take a stream of arrays and flatten it into a stream of elements of the new type. Of course, we could also do the reverse and take a single element and expand it into multiple results. Furthermore, we might choose to omit some results. Each time an element is handled by our function, we could check some condition and only collect the result if the condition is true. This would allow us to filter out some of the records in the stream. Attaching the Process Function Once we have defined our ProcessFunction, the next step is to attach it to our stream. We do this by calling process on the stream and passing at an instance of the function. When we later execute the stream, this function will be called for each element. If you are familiar with functional programming, these operations might look familiar. Essentially, we've implemented a map, flatmap, and filter. Because these specific operations are both powerful and common, they've been implemented to reduce the amount of boilerplate you need. Map and Flat Map The map function allows you to pass a Java lambda that will convert from the input type to the output type. For example, if you had a stream of integers and you wanted to divide each one by two, you could do that with a map function. The result would be a new stream that contains doubles instead of integers. The flatMap function takes a lambda that converts from the input type and pushes the result into a collector of the output type. This can be used to produce a flattened stream. In this example, we start with a stream of sentences. We use a map function to split the stream into an array of words. Then we use a flatMap function to collapse the array into account of the number of letters in each word. We can do all of this in a single step using just a flatMap. However, this shows the power of composing multiple operations together, Filter and the filter function takes a predicate that converts from each element to a boolean value. If the boolean value is true, then the element will be left in the stream. If it's false, then the element is omitted. Here we start with a stream of integers. We then filter that stream to omit all of the odd numbers. KeyBy Another powerful operation we can perform on our data stream is keyBy. You can think of this as a little like a groupBy in a database. Essentially, it groups all elements with a specific key. KeyBy isn't like the other operations we've seen so far. It isn't transforming the data. Instead, it's providing instructions for how the data should be routed in the Flink cluster. Specifically, it partitions the stream and redistributes the data across the cluster. As a result, it is handled quite differently from something like a ProcessFunction. When we use keyBy, it causes a shuffling effect as each element is potentially moved to a different task manager. The resulting network hops can be expensive. However, once the shuffling has finished, you end up with a partitioned stream where each partition contains elements with the same key. Downstream Operations When we use keyBy, it has an impact on downstream operations. Our ProcessFunction becomes a KeyedProcessFunction. We can then obtain the key for the current element using the getCurrentKey method in the context object. Reduce The keyBy also gives us access to some new downstream operations such as the reduce function. It allows us to take the current element in the stream and combine it with the previous element using a function that we define. This can be useful for computations, such as counting all the elements with the same key. Complex Example For a more complex example, imagine a stream that contains line items for many orders stored as a tuple2 of strings and integers. Each tuple contains the identifier of the item as a string and the quantity as an integer. If we wanted to do a sum of all of the quantities grouped by item ID, we could first use a keyBy to partition our stream according to the ID. We could then use reduce to do a sum of each of the counts. This gives us a short tour of some of the operations available in our Flink data stream. There's more beyond what we've covered here, so make sure to check out the documentation for a deeper look. But this gives us enough that we should be able to start trying out some of these functions in the hands-on exercises. If you aren't already on Confluent Developer, head there now using the link in the video description to access the rest of this course, and its hands-on exercises.