Skip to content

Commit 1a7c0f1

Browse files
committed
[FLINK-38461] Introduce SinkUpsertMaterializerV2
1 parent c39aff1 commit 1a7c0f1

File tree

20 files changed

+820
-92
lines changed

20 files changed

+820
-92
lines changed

docs/layouts/shortcodes/generated/execution_config_configuration.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,24 @@
272272
<td><p>Enum</p></td>
273273
<td>Because of the disorder of ChangeLog data caused by Shuffle in distributed system, the data received by Sink may not be the order of global upsert. So add upsert materialize operator before upsert sink. It receives the upstream changelog records and generate an upsert view for the downstream.<br />By default, the materialize operator will be added when a distributed disorder occurs on unique keys. You can also choose no materialization(NONE) or force materialization(FORCE).<br /><br />Possible values:<ul><li>"NONE"</li><li>"AUTO"</li><li>"FORCE"</li></ul></td>
274274
</tr>
275+
<tr>
276+
<td><h5>table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high</h5><br> <span class="label label-primary">Streaming</span></td>
277+
<td style="word-wrap: break-word;">(none)</td>
278+
<td>Long</td>
279+
<td>When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from VALUE to MAP. If not specified, Flink uses state-backend specific defaults (400 for hashmap state backend and 50 for RocksDB and the rest).<br /></td>
280+
</tr>
281+
<tr>
282+
<td><h5>table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low</h5><br> <span class="label label-primary">Streaming</span></td>
283+
<td style="word-wrap: break-word;">(none)</td>
284+
<td>Long</td>
285+
<td>When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from MAP to VALUE. If not specified, Flink uses state-backend specific defaults (300 for hashmap state backend and 40 for RocksDB and the rest).<br /></td>
286+
</tr>
287+
<tr>
288+
<td><h5>table.exec.sink.upsert-materialize-strategy.type</h5><br> <span class="label label-primary">Streaming</span></td>
289+
<td style="word-wrap: break-word;">LEGACY</td>
290+
<td><p>Enum</p></td>
291+
<td>Which strategy of SinkUpsertMaterializer to use. Supported strategies:<br />LEGACY: Simple implementation based on ValueState&lt;List&gt; (the original implementation).<br />MAP: SequencedMultiSetState implementation based on a combination of several MapState maintaining ordering and fast lookup properties.<br />VALUE: Similar to LEGACY, but compatible with MAP and therefore allows to switch to ADAPTIVE.<br />ADAPTIVE: Alternate between MAP and VALUE depending on the number of entries for the given key starting with VALUE and switching to MAP upon reaching threshold.high value (and back to VALUE, when reaching low).<br /><br />Possible values:<ul><li>"LEGACY"</li><li>"MAP"</li><li>"VALUE"</li><li>"ADAPTIVE"</li></ul></td>
292+
</tr>
275293
<tr>
276294
<td><h5>table.exec.sort.async-merge-enabled</h5><br> <span class="label label-primary">Batch</span></td>
277295
<td style="word-wrap: break-word;">true</td>

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,74 @@ public class ExecutionConfigOptions {
159159
+ "or force materialization(FORCE).")
160160
.build());
161161

162+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
163+
public static final ConfigOption<Long>
164+
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
165+
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.low")
166+
.longType()
167+
.noDefaultValue()
168+
.withDescription(
169+
Description.builder()
170+
.text(
171+
"When using strategy="
172+
+ SinkUpsertMaterializeStrategy.ADAPTIVE
173+
+ ", defines the number of entries per key when the implementation is changed from "
174+
+ SinkUpsertMaterializeStrategy.MAP
175+
+ " to "
176+
+ SinkUpsertMaterializeStrategy.VALUE
177+
+ ". "
178+
+ "If not specified, Flink uses state-backend specific defaults (300 for hashmap state backend and 40 for RocksDB and the rest).")
179+
.linebreak()
180+
.build());
181+
182+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
183+
public static final ConfigOption<Long>
184+
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
185+
key("table.exec.sink.upsert-materialize-strategy.adaptive.threshold.high")
186+
.longType()
187+
.noDefaultValue()
188+
.withDescription(
189+
Description.builder()
190+
.text(
191+
"When using strategy="
192+
+ SinkUpsertMaterializeStrategy.ADAPTIVE
193+
+ ", defines the number of entries per key when the implementation is changed from "
194+
+ SinkUpsertMaterializeStrategy.VALUE
195+
+ " to "
196+
+ SinkUpsertMaterializeStrategy.MAP
197+
+ ". "
198+
+ "If not specified, Flink uses state-backend specific defaults (400 for hashmap state backend and 50 for RocksDB and the rest).")
199+
.linebreak()
200+
.build());
201+
202+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
203+
public static final ConfigOption<SinkUpsertMaterializeStrategy>
204+
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
205+
key("table.exec.sink.upsert-materialize-strategy.type")
206+
.enumType(SinkUpsertMaterializeStrategy.class)
207+
.defaultValue(SinkUpsertMaterializeStrategy.LEGACY)
208+
.withDescription(
209+
Description.builder()
210+
.text(
211+
"Which strategy of SinkUpsertMaterializer to use. Supported strategies:")
212+
.linebreak()
213+
.text(
214+
SinkUpsertMaterializeStrategy.LEGACY
215+
+ ": Simple implementation based on ValueState<List> (the original implementation).")
216+
.linebreak()
217+
.text(
218+
SinkUpsertMaterializeStrategy.MAP
219+
+ ": SequencedMultiSetState implementation based on a combination of several MapState maintaining ordering and fast lookup properties.")
220+
.linebreak()
221+
.text(
222+
SinkUpsertMaterializeStrategy.VALUE
223+
+ ": Similar to LEGACY, but compatible with MAP and therefore allows to switch to ADAPTIVE.")
224+
.linebreak()
225+
.text(
226+
SinkUpsertMaterializeStrategy.ADAPTIVE
227+
+ ": Alternate between MAP and VALUE depending on the number of entries for the given key starting with VALUE and switching to MAP upon reaching threshold.high value (and back to VALUE, when reaching low).")
228+
.build());
229+
162230
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
163231
public static final ConfigOption<SinkKeyedShuffle> TABLE_EXEC_SINK_KEYED_SHUFFLE =
164232
key("table.exec.sink.keyed-shuffle")
@@ -928,6 +996,43 @@ public enum RetryStrategy {
928996
FIXED_DELAY
929997
}
930998

999+
/** SinkUpsertMaterializer strategy. */
1000+
@PublicEvolving
1001+
public enum SinkUpsertMaterializeStrategy {
1002+
/**
1003+
* Simple implementation based on {@code ValueState<List>} (the original implementation).
1004+
*
1005+
* <ul>
1006+
* <li>optimal for cases with history under approx. 100 elements
1007+
* <li>limited TTL support (per key granularity, i.e. no expiration for old history
1008+
* elements)
1009+
* </ul>
1010+
*/
1011+
LEGACY,
1012+
/**
1013+
* OrderedMultiSetState-based implementation based on a combination of several MapState
1014+
* maintaining ordering and fast lookup properties.
1015+
*
1016+
* <ul>
1017+
* <li>faster and more memory-efficient on long histories
1018+
* <li>slower on short histories
1019+
* <li>currently, no TTL support (to be added in the future)
1020+
* <li>requires more space
1021+
* </ul>
1022+
*/
1023+
MAP,
1024+
/**
1025+
* Similar to LEGACY, but compatible with MAP and therefore allows to switch to ADAPTIVE.
1026+
*/
1027+
VALUE,
1028+
/**
1029+
* Alternate between MAP and VALUE depending on the number of entries for the given key
1030+
* starting with VALUE and switching to MAP upon reaching threshold.high value (and back to
1031+
* VALUE, when reaching low).
1032+
*/
1033+
ADAPTIVE
1034+
}
1035+
9311036
/** Determine if CAST operates using the legacy behaviour or the new one. */
9321037
@Deprecated
9331038
public enum LegacyCastBehaviour implements DescribedEnum {

flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,8 @@ public String toString() {
190190
+ Arrays.toString(indexMapping)
191191
+ ", mutableRow="
192192
+ row
193+
+ ", isNullAtNonProjected="
194+
+ isNullAtNonProjected
193195
+ '}';
194196
}
195197

0 commit comments

Comments
 (0)