[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[jira] [Created] (FLINK-11073) Make serializers immutable / provide option TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer

Tzu-Li (Gordon) Tai created FLINK-11073:

             Summary: Make serializers immutable / provide option TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer
                 Key: FLINK-11073
             Project: Flink
          Issue Type: Improvement
          Components: Type Serialization System
            Reporter: Tzu-Li (Gordon) Tai
            Assignee: Tzu-Li (Gordon) Tai
             Fix For: 1.8.0

h2. Motivation

Right now, when a new serializer is provided to the old serializer (or, to be more specific, the old serializer's snapshot) for state schema compatibility checks, if the new serializer is reconfigurable so that it may be compatible, the only possible way to do this is reconfigure the new serializer in-place and return {{TypeSerializerSchemaCompatibility.compatibleAsIs()}} as the result of the compatibility check.

One solid example is the {{KryoSerializer}}. The {{KryoSerializer}} contains as configuration a map of serialized classes to their registered ids. This mapping may change on restore executions, and the new {{KryoSerializer}} must reconfigure this mapping to match with the previous execution before the new {{KryoSerializer}} can be used for state access.
Right now, this is performed by directly mutating the map in the new serializer instance.

This mutative behaviour is fragile, especially when taking into account scale down / up scenarios which could easily result in mismatching state serializer configurations across TMs.
h2. Proposed Approach
 # The {{TypeSerializerSchemaCompatibility}} result class should be extended to contain an option {{compatibleWithReconfiguredSerializer(TypeSerializer)}}, which would wrap a new instance of a reconfigured version of the new serializer.
 # Callers of the compatibility check needs to be aware of this case and respect it, using the provided reconfigured serializer instance when one is provided. In Flink, there are two places which performs compatibility checks on serializers: 1) composite serializers which contain nested serializers, and therefore needs to check compatibility of its nested serializers, and 2) in state backends, checking the compatibility of the new serializer with the old serializer.
 # Introduce {{CompositeTypeSerializerSnapshot}} to encapsulate logic of handling reconfiguration of nested serializers: if a composite serializer has a nested serializer that returns a new reconfigured instance of itself, than the result of the compatibility check on the composite serializer should also wrap a reconfigured version of the composite serializer that holds the reconfigured nested serializer. This logic should be captured in a base abstract class, say {{CompositeTypeSerializerSnapshot}} so that it can be commonly shared by many of Flink's composite serializers.
 # For composite serializers that is still using the legacy, less-powerful {{TypeSerializerConfigSnapshot}} and {{CompatibilityResult}} abstractions, while its nested serializer is signaling that it has reconfigured itself, this should be detected and an error is thrown complaining that the outer composite serializer needs to be upgraded to use the new serializer snapshot and compatibility abstractions. This approach follows the same way we handled bridging the new {{TypeSerializerSchemaCompatibility}} and old {{CompatibilityResult}} class in Flink 1.7.

This message was sent by Atlassian JIRA