Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Serializers & Deserializers

Overview

Serialization is a critical topic when working with Flink because it often serializes things you might not expect. Flink has two different types of serialization: Internal and External. And within each type, there are multiple formats that might be used. Depending on your choices it can have an impact on message size, flexibility, schema evolution, and more. This video will outline the different ways that Flink uses serializers and show you how to implement a few of the basics.

Topics:

  • Serialization and Deserialization
  • Internal and External Serialization
  • POJO serialization vs Kryo serialization
  • JSON Serialization

Code

A Simple POJO

public class Person {
	public String name;
	private String email;

	public Person() {}

	public String getEmail() {return email;}
	public void setEmail(String email) {this.email = email;}
}

Registring Kryo Serializers

env.getConfig().registerKryoType(MyCustomType.class);

Disabling Kryo Serialization

env.getConfig().disableGenericTypes();

JsonSerializationSchema

JsonSerializationSchema<MyClass> serializer = 
	new JsonSerializationSchema<>();

JsonDeserializationSchema

JsonDeserializationSchema<MyClass> deserializer = 
	new JsonDeserializationSchema<>(MyClass.class);

Custom Object Mapper

JsonSerializationSchema<MyClass> serializer = 
	new JsonSerializationSchema<>(() -> 
		new ObjectMapper()
			.registerModule(new JavaTimeModule())
	);

Resources

Use the promo code FLINKJAVA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Serializers & Deserializers

Hi, I'm Wade from Confluent. In this video, we're going to learn how Flink handles serialization and deserialization. This is a critical topic in Flink, because it often serializes more than you expect. Internal vs External Serialization When we work with Flink data streams, we need to consider serialization and deserialization from two different perspectives. As data is pulled into the stream and pushed out the other side, it may need to undergo a serialization process. We can think of this as external serialization, because the objects are being used by an external system. However, Flink also does a significant amount of internal serialization as well. When data is transferred between operators in Flink, it will need to be serialized. However, the operators themselves typically include a user function that needs to be sent to the task managers to be executed. This means that not only do we need to serialize the data moving between the operators, but we need to serialize the user function as well. Depending on how they are used, classes that the user function depends on may also need to be serializable. Sometimes this requires us to use factory methods, static objects, and other techniques to avoid serialization issues. Internal serialization can also be tricky, Efficient Internal Serialization because although Flink is very good at serializing most Java types, it can be more efficient if the types follow certain rules. In particular, if a class meets the criteria of a Plain Old Java Object, or POJO, then Flink can serialize it in a more efficient manner. A POJO can be defined by the following criteria: It must be public, it must have a parameter list default constructor, all fields must be either public or accessible through standard getter and setter function, and all fields in the class need to be supported by an appropriate serializer. If your class meets these criteria, then Flink can use the faster POJO serializer. If your class doesn't meet the criteria, then Flink will fall back to the slower Kryo serializer. This can reduce performance by as much as 75%. Registering Types with Kryo You can improve the Kryo serialization by registering your types with the serializer. This will bring Kryo performance closer to what you get from the POJO serializer, but the POJO serializer will still be faster. Alternatively, if you want to prevent the serializer Disabling Kryo Serialization from using Kryo, you can disable it completely. In that case, you'll need to ensure your types match the POJO requirements, or you will need to register your own serializers. If you decide to use an alternative serializer, Avro is considered a good choice. Schema Evolution Another advantage to using POJOs is that they support state schema evolution out of the box. This means that you can alter some properties of a message without having to worry about compatibility. For example, fields can be removed and the value of those fields will be dropped in future check points and save points. Make sure check out our Flink 101 course for a deeper exploration of both check points and save points. Fields can also be added. In this case, the field will be initialized using the default value defined by Java. Schema evolution is a critical consideration when working with message-driven systems. The fact that the POJO serializer includes built-in support for it is a significant benefit. Combined with the benefits to performance and general ease of use, it is recommended to use Plain Old Java Objects when defining your types in Flink. That takes care of data inside Flink, but what about data from the outside? It is typically interfacing with Flink through one of the many Flink connectors, such as the Kafka connector. This data can take many forms, including Json CSV, Protobuf, Avro, and more. Flink has several built-in serialization schemas for handling the more popular types. The JsonSerializationSchema can be used to convert JsonSerializationSchema from a Java object into Json, and if we want to convert the Json back to a Java object, we can use the JsonDeserializationSchema. Sometimes, we may need to provide a custom ObjectMapper for the schema. For example, if we wanted to use the JavaTimeModule, then we'd need a custom mapper. In that case, we have to register it with the schema using a factory method. We do this because parts of the ObjectMapper, such as the JavaTimeModule, aren't serializable. Remember that many components in a Flink job are going to be serialized internally, including the external schema. By using a factory method, the mapper doesn't have to be serialized. Instead, it's just created on demand whenever it is needed. Of course, this all deals specifically with Json serialization. If you're planning to serialize using something other than Json, you'll need to consult the documentation for the corresponding serializers. If you aren't already on Confluent Developer, head there now using the link in the video description to access the rest of this course and its hands-on exercises.