Skip to content

Commit

Permalink
[FLINK-36905] Update Chinese doc on serialization to reflect the late…
Browse files Browse the repository at this point in the history
…st changes in Flink 2.0
  • Loading branch information
X-czh committed Jan 18, 2025
1 parent 26d3426 commit 5c58487
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 295 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ to specify the state's name, as well as information about the type of the state.
It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,
simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation:

{{< tabs "ee215ff6-2e21-4a40-a1b4-7f114560546f" >}}
{{< tab "Java" >}}
```java
public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...};

Expand All @@ -54,20 +52,6 @@ ListStateDescriptor<Tuple2<String, Integer>> descriptor =

checkpointedState = getRuntimeContext().getListState(descriptor);
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}

val descriptor = new ListStateDescriptor[(String, Integer)](
"state-name",
new CustomTypeSerializer)
)

checkpointedState = getRuntimeContext.getListState(descriptor)
```
{{< /tab >}}
{{< /tabs >}}

## State serializers and schema evolution

Expand All @@ -84,10 +68,10 @@ mind is how the serialization schema can be changed in the future.
When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state
type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases:

1. Data schema of the state type has evolved, i.e. adding or removing a field from a POJO that is used as state.
2. Generally speaking, after a change to the data schema, the serialization format of the serializer will need to be upgraded.
3. Configuration of the serializer has changed.
1. Data schema of the state type has evolved, i.e. adding or removing a field from a POJO that is used as state.
2. Generally speaking, after a change to the data schema, the serialization format of the serializer will need to be upgraded.
3. Configuration of the serializer has changed.

In order for the new execution to have information about the *written schema* of state and detect whether or not the
schema has changed, upon taking a savepoint of an operator's state, a *snapshot* of the state serializer needs to be
written along with the state bytes. This is abstracted a `TypeSerializerSnapshot`, explained in the next subsection.
Expand Down Expand Up @@ -129,15 +113,15 @@ restored execution of an operator, the old serializer snapshot is provided to th
This method returns a `TypeSerializerSchemaCompatibility` representing the result of the compatibility resolution,
which can be one of the following:

1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**: this result signals that the new serializer is compatible,
meaning that the new serializer has identical schema with the previous serializer. It is possible that the new
serializer has been reconfigured in the `resolveSchemaCompatibility` method so that it is compatible.
2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**: this result signals that the new serializer has a
different serialization schema, and it is possible to migrate from the old schema by using the previous serializer
(which recognizes the old schema) to read bytes into state objects, and then rewriting the object back to bytes with
the new serializer (which recognizes the new schema).
3. **`TypeSerializerSchemaCompatibility.incompatible()`**: this result signals that the new serializer has a
different serialization schema, but it is not possible to migrate from the old schema.
1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**: this result signals that the new serializer is compatible,
meaning that the new serializer has identical schema with the previous serializer. It is possible that the new
serializer has been reconfigured in the `resolveSchemaCompatibility` method so that it is compatible.
2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**: this result signals that the new serializer has a
different serialization schema, and it is possible to migrate from the old schema by using the previous serializer
(which recognizes the old schema) to read bytes into state objects, and then rewriting the object back to bytes with
the new serializer (which recognizes the new schema).
3. **`TypeSerializerSchemaCompatibility.incompatible()`**: this result signals that the new serializer has a
different serialization schema, but it is not possible to migrate from the old schema.

The last bit of detail is how the previous serializer is obtained in the case that migration is required.
Another important role of a serializer's `TypeSerializerSnapshot` is that it serves as a factory to restore
Expand All @@ -151,48 +135,48 @@ To wrap up, this section concludes how Flink, or more specifically the state bac
abstractions. The interaction is slightly different depending on the state backend, but this is orthogonal
to the implementation of state serializers and their serializer snapshots.

#### Off-heap state backends (e.g. `RocksDBStateBackend`)

1. **Register new state with a state serializer that has schema _A_**
- the registered `TypeSerializer` for the state is used to read / write state on every state access.
- State is written in schema *A*.
2. **Take a savepoint**
- The serializer snapshot is extracted via the `TypeSerializer#snapshotConfiguration` method.
- The serializer snapshot is written to the savepoint, as well as the already-serialized state bytes (with schema *A*).
3. **Restored execution re-accesses restored state bytes with new state serializer that has schema _B_**
- The previous state serializer's snapshot is restored.
- State bytes are not deserialized on restore, only loaded back to the state backends (therefore, still in schema *A*).
- Upon receiving the new serializer, the previous serializer's snapshot is provided to the new serializer's snapshot via the
#### Off-heap state backends (e.g. `EmbeddedRocksDBStateBackend`)

1. **Register new state with a state serializer that has schema _A_**
- the registered `TypeSerializer` for the state is used to read / write state on every state access.
- State is written in schema *A*.
2. **Take a savepoint**
- The serializer snapshot is extracted via the `TypeSerializer#snapshotConfiguration` method.
- The serializer snapshot is written to the savepoint, as well as the already-serialized state bytes (with schema *A*).
3. **Restored execution re-accesses restored state bytes with new state serializer that has schema _B_**
- The previous state serializer's snapshot is restored.
- State bytes are not deserialized on restore, only loaded back to the state backends (therefore, still in schema *A*).
- Upon receiving the new serializer, the previous serializer's snapshot is provided to the new serializer's snapshot via the
`TypeSerializer#resolveSchemaCompatibility` to check for schema compatibility.
4. **Migrate state bytes in backend from schema _A_ to schema _B_**
- If the compatibility resolution reflects that the schema has changed and migration is possible, schema migration is
4. **Migrate state bytes in backend from schema _A_ to schema _B_**
- If the compatibility resolution reflects that the schema has changed and migration is possible, schema migration is
performed. The previous state serializer which recognizes schema _A_ will be obtained from the serializer snapshot, via
`TypeSerializerSnapshot#restoreSerializer()`, and is used to deserialize state bytes to objects, which in turn
are re-written again with the new serializer, which recognizes schema _B_ to complete the migration. All entries
of the accessed state is migrated all-together before processing continues.
- If the resolution signals incompatibility, then the state access fails with an exception.
#### Heap state backends (e.g. `MemoryStateBackend`, `FsStateBackend`)

1. **Register new state with a state serializer that has schema _A_**
- the registered `TypeSerializer` is maintained by the state backend.
2. **Take a savepoint, serializing all state with schema _A_**
- The serializer snapshot is extracted via the `TypeSerializer#snapshotConfiguration` method.
- The serializer snapshot is written to the savepoint.
- State objects are now serialized to the savepoint, written in schema _A_.
3. **On restore, deserialize state into objects in heap**
- The previous state serializer's snapshot is restored.
- The previous serializer, which recognizes schema _A_, is obtained from the serializer snapshot, via
`TypeSerializerSnapshot#restoreSerializer()`, and is used to deserialize state bytes to objects, which in turn
are re-written again with the new serializer, which recognizes schema _B_ to complete the migration. All entries
of the accessed state is migrated all-together before processing continues.
- If the resolution signals incompatibility, then the state access fails with an exception.

#### Heap state backends (e.g. `HashMapStateBackend`)

1. **Register new state with a state serializer that has schema _A_**
- the registered `TypeSerializer` is maintained by the state backend.
2. **Take a savepoint, serializing all state with schema _A_**
- The serializer snapshot is extracted via the `TypeSerializer#snapshotConfiguration` method.
- The serializer snapshot is written to the savepoint.
- State objects are now serialized to the savepoint, written in schema _A_.
3. **On restore, deserialize state into objects in heap**
- The previous state serializer's snapshot is restored.
- The previous serializer, which recognizes schema _A_, is obtained from the serializer snapshot, via
`TypeSerializerSnapshot#restoreSerializer()`, and is used to deserialize state bytes to objects.
- From now on, all of the state is already deserialized.
4. **Restored execution re-accesses previous state with new state serializer that has schema _B_**
- Upon receiving the new serializer, the previous serializer's snapshot is provided to the new serializer's snapshot via the
- From now on, all of the state is already deserialized.
4. **Restored execution re-accesses previous state with new state serializer that has schema _B_**
- Upon receiving the new serializer, the previous serializer's snapshot is provided to the new serializer's snapshot via the
`TypeSerializer#resolveSchemaCompatibility` to check for schema compatibility.
- If the compatibility check signals that migration is required, nothing happens in this case since for
heap backends, all state is already deserialized into objects.
- If the resolution signals incompatibility, then the state access fails with an exception.
5. **Take another savepoint, serializing all state with schema _B_**
- Same as step 2., but now state bytes are all in schema _B_.
- If the compatibility check signals that migration is required, nothing happens in this case since for
heap backends, all state is already deserialized into objects.
- If the resolution signals incompatibility, then the state access fails with an exception.
5. **Take another savepoint, serializing all state with schema _B_**
- Same as step 2., but now state bytes are all in schema _B_.

## Predefined convenient `TypeSerializerSnapshot` classes

Expand All @@ -211,9 +195,9 @@ essentially meaning that the serialization schema of the serializer is solely de
There will only be 2 possible results of the compatibility resolution when using the `SimpleTypeSerializerSnapshot`
as your serializer's snapshot class:

- `TypeSerializerSchemaCompatibility.compatibleAsIs()`, if the new serializer class remains identical, or
- `TypeSerializerSchemaCompatibility.incompatible()`, if the new serializer class is different then the previous one.
- `TypeSerializerSchemaCompatibility.compatibleAsIs()`, if the new serializer class remains identical, or
- `TypeSerializerSchemaCompatibility.incompatible()`, if the new serializer class is different then the previous one.

Below is an example of how the `SimpleTypeSerializerSnapshot` is used, using Flink's `IntSerializer` as an example:

```java
Expand Down Expand Up @@ -284,13 +268,14 @@ public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot
}
```


When implementing a new serializer snapshot as a subclass of `CompositeTypeSerializerSnapshot`,
the following three methods must be implemented:
* `#getCurrentOuterSnapshotVersion()`: This method defines the version of
the current outer serializer snapshot's serialized binary format.
* `#getNestedSerializers(TypeSerializer)`: Given the outer serializer, returns its nested serializers.
* `#createOuterSerializerWithNestedSerializers(TypeSerializer[])`:
Given the nested serializers, create an instance of the outer serializer.
* `#getCurrentOuterSnapshotVersion()`: This method defines the version of
the current outer serializer snapshot's serialized binary format.
* `#getNestedSerializers(TypeSerializer)`: Given the outer serializer, returns its nested serializers.
* `#createOuterSerializerWithNestedSerializers(TypeSerializer[])`:
Given the nested serializers, create an instance of the outer serializer.

The above example is a `CompositeTypeSerializerSnapshot` where there are no extra information to be snapshotted
apart from the nested serializers' snapshots. Therefore, its outer snapshot version can be expected to never
Expand All @@ -300,9 +285,9 @@ that needs to be persisted along with the nested component serializer. An exampl
the nested element serializer.

In these cases, an additional three methods need to be implemented on the `CompositeTypeSerializerSnapshot`:
* `#writeOuterSnapshot(DataOutputView)`: defines how the outer snapshot information is written.
* `#readOuterSnapshot(int, DataInputView, ClassLoader)`: defines how the outer snapshot information is read.
* `#resolveOuterSchemaCompatibility(TypeSerializerSnapshot)`: checks the compatibility based on the outer snapshot information.
* `#writeOuterSnapshot(DataOutputView)`: defines how the outer snapshot information is written.
* `#readOuterSnapshot(int, DataInputView, ClassLoader)`: defines how the outer snapshot information is read.
* `#resolveOuterSchemaCompatibility(TypeSerializerSnapshot)`: checks the compatibility based on the outer snapshot information.

By default, the `CompositeTypeSerializerSnapshot` assumes that there isn't any outer snapshot information to
read / write, and therefore have empty default implementations for the above methods. If the subclass
Expand All @@ -311,6 +296,7 @@ has outer snapshot information, then all three methods must be implemented.
Below is an example of how the `CompositeTypeSerializerSnapshot` is used for composite serializer snapshots
that do have outer snapshot information, using Flink's `GenericArraySerializer` as an example:


```java
public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {

Expand Down Expand Up @@ -365,6 +351,7 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
}
```


There are two important things to notice in the above code snippet. First of all, since this
`CompositeTypeSerializerSnapshot` implementation has outer snapshot information that is written as part of the snapshot,
the outer snapshot version, as defined by `getCurrentOuterSnapshotVersion()`, must be upticked whenever the
Expand All @@ -387,8 +374,8 @@ Flink restores serializer snapshots by first instantiating the `TypeSerializerSn
along with the snapshot bytes). Therefore, to avoid being subject to unintended classname changes or instantiation
failures, `TypeSerializerSnapshot` classes should:

- avoid being implemented as anonymous classes or nested classes,
- have a public, nullary constructor for instantiation
- avoid being implemented as anonymous classes or nested classes,
- have a public, nullary constructor for instantiation

#### 2. Avoid sharing the same `TypeSerializerSnapshot` class across different serializers

Expand Down Expand Up @@ -431,19 +418,19 @@ and could be problematic once you want to upgrade serializer classes or perform
To be future-proof and have flexibility to migrate your state serializers and schema, it is highly recommended to
migrate from the old abstractions. The steps to do this is as follows:

1. Implement a new subclass of `TypeSerializerSnapshot`. This will be the new snapshot for your serializer.
2. Return the new `TypeSerializerSnapshot` as the serializer snapshot for your serializer in the
`TypeSerializer#snapshotConfiguration()` method.
3. Restore the job from the savepoint that existed before Flink 1.7, and then take a savepoint again.
Note that at this step, the old `TypeSerializerConfigSnapshot` of the serializer must still exist in the classpath,
and the implementation for the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` method must not be
removed. The purpose of this process is to replace the `TypeSerializerConfigSnapshot` written in old savepoints
with the newly implemented `TypeSerializerSnapshot` for the serializer.
4. Once you have a savepoint taken with Flink 1.7, the savepoint will contain `TypeSerializerSnapshot` as the
state serializer snapshot, and the serializer instance will no longer be written in the savepoint.
At this point, it is now safe to remove all implementations of the old abstraction (remove the old
`TypeSerializerConfigSnapshot` implementation as will as the
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the serializer).
1. Implement a new subclass of `TypeSerializerSnapshot`. This will be the new snapshot for your serializer.
2. Return the new `TypeSerializerSnapshot` as the serializer snapshot for your serializer in the
`TypeSerializer#snapshotConfiguration()` method.
3. Restore the job from the savepoint that existed before Flink 1.7, and then take a savepoint again.
Note that at this step, the old `TypeSerializerConfigSnapshot` of the serializer must still exist in the classpath,
and the implementation for the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` method must not be
removed. The purpose of this process is to replace the `TypeSerializerConfigSnapshot` written in old savepoints
with the newly implemented `TypeSerializerSnapshot` for the serializer.
4. Once you have a savepoint taken with Flink 1.7, the savepoint will contain `TypeSerializerSnapshot` as the
state serializer snapshot, and the serializer instance will no longer be written in the savepoint.
At this point, it is now safe to remove all implementations of the old abstraction (remove the old
`TypeSerializerConfigSnapshot` implementation as will as the
`TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the serializer).

## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.19

Expand Down
Loading

0 comments on commit 5c58487

Please sign in to comment.