From cf62175d625427cf94960bcfd5ccb0bcd12d2b89 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Wed, 17 Jun 2026 20:54:17 +0800 Subject: [PATCH 1/9] feat: persist native source checkpoint state --- .../operators/GlutenOneInputOperator.java | 2 +- .../operators/GlutenSourceFunction.java | 27 ++++++++++++++++--- .../operators/GlutenTwoInputOperator.java | 2 +- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index 09b059f3bc2..3aa941dbf00 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -279,7 +279,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { @Override public void snapshotState(StateSnapshotContext context) throws Exception { // TODO: implement it - task.snapshotState(0); + task.snapshotState(context.getCheckpointId()); super.snapshotState(context); } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index 53f36fcf67c..ed77629b2f9 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -32,6 +32,8 @@ import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark; import io.github.zhztheplayer.velox4j.type.RowType; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -43,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -65,6 +68,8 @@ public class GlutenSourceFunction extends RichParallelSourceFunction private SerialTask task; private SourceTaskMetrics taskMetrics; private final Class outClass; + private transient ListState checkpointState; + private transient String[] restoredCheckpointRecords = new String[0]; public GlutenSourceFunction( StatefulPlanNode planNode, @@ -205,15 +210,29 @@ public void close() throws Exception { @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { - // TODO: implement it - this.task.snapshotState(0); + checkpointState.clear(); + String[] checkpointRecords = this.task.snapshotState(context.getCheckpointId()); + for (String checkpointRecord : checkpointRecords) { + checkpointState.add(checkpointRecord); + } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { + checkpointState = + context + .getOperatorStateStore() + .getListState( + new ListStateDescriptor<>("gluten-native-source-checkpoint", String.class)); + if (context.isRestored()) { + List records = new ArrayList<>(); + for (String checkpointRecord : checkpointState.get()) { + records.add(checkpointRecord); + } + restoredCheckpointRecords = records.toArray(new String[0]); + } initSession(); - // TODO: implement it - this.task.initializeState(0, null); + this.task.initializeState(0, null, restoredCheckpointRecords); } public String[] notifyCheckpointComplete(long checkpointId) throws Exception { diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index 3f73ad4bbd8..28791a67c3c 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -267,7 +267,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { @Override public void snapshotState(StateSnapshotContext context) throws Exception { // TODO: implement it - task.snapshotState(0); + task.snapshotState(context.getCheckpointId()); super.snapshotState(context); } From 440510ea51c2e037c63b6ed2deae924cc715d652 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Thu, 18 Jun 2026 14:22:05 +0800 Subject: [PATCH 2/9] fix: sync Gluten task output metrics --- .../runtime/metrics/SourceTaskMetrics.java | 103 ++++++++++++-- .../operators/GlutenOneInputOperator.java | 15 ++- .../operators/GlutenTwoInputOperator.java | 15 ++- .../gluten/util/VectorOutputBridge.java | 8 +- .../api/operators/GlutenStreamFilterTest.java | 24 ++++ .../metrics/SourceTaskMetricsTest.java | 126 ++++++++++++++++++ 6 files changed, 276 insertions(+), 15 deletions(-) create mode 100644 gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetricsTest.java diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java index 5bbbb028b6a..9bf775729b9 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java @@ -17,13 +17,18 @@ package org.apache.gluten.table.runtime.metrics; import io.github.zhztheplayer.velox4j.query.SerialTask; +import io.github.zhztheplayer.velox4j.query.SerialTaskStats; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.List; + public class SourceTaskMetrics { private final String keyOperatorType = "operatorType"; @@ -33,26 +38,64 @@ public class SourceTaskMetrics { private final long metricUpdateInterval = 2000; private Counter sourceNumRecordsOut; private Counter sourceNumBytesOut; - private long lastUpdateTime = System.currentTimeMillis(); + private Counter taskNumRecordsIn; + private Counter taskNumRecordsOut; + private Counter taskNumBytesIn; + private Counter taskNumBytesOut; + private long lastUpdateTime = 0; public SourceTaskMetrics(OperatorMetricGroup metricGroup) { sourceNumRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); sourceNumBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); + if (metricGroup instanceof InternalOperatorMetricGroup) { + TaskIOMetricGroup taskIOMetricGroup = + ((InternalOperatorMetricGroup) metricGroup).getTaskIOMetricGroup(); + taskNumRecordsIn = taskIOMetricGroup.getNumRecordsInCounter(); + taskNumRecordsOut = taskIOMetricGroup.getNumRecordsOutCounter(); + taskNumBytesIn = taskIOMetricGroup.getNumBytesInCounter(); + taskNumBytesOut = taskIOMetricGroup.getNumBytesOutCounter(); + } + } + + SourceTaskMetrics(Counter sourceNumRecordsOut, Counter sourceNumBytesOut) { + this(sourceNumRecordsOut, sourceNumBytesOut, null, null, null, null); + } + + SourceTaskMetrics( + Counter sourceNumRecordsOut, + Counter sourceNumBytesOut, + Counter taskNumRecordsIn, + Counter taskNumRecordsOut, + Counter taskNumBytesIn, + Counter taskNumBytesOut) { + this.sourceNumRecordsOut = sourceNumRecordsOut; + this.sourceNumBytesOut = sourceNumBytesOut; + this.taskNumRecordsIn = taskNumRecordsIn; + this.taskNumRecordsOut = taskNumRecordsOut; + this.taskNumBytesIn = taskNumBytesIn; + this.taskNumBytesOut = taskNumBytesOut; } public boolean updateMetrics(SerialTask task, String planId) { + return updateMetrics(task.collectStats(), planId); + } + + boolean updateMetrics(SerialTaskStats taskStats, String planId) { long currentTime = System.currentTimeMillis(); if (currentTime - lastUpdateTime < metricUpdateInterval) { return false; } try { - ObjectNode planStats = task.collectStats().planStats(planId); - JsonNode jsonNode = planStats.get(keyOperatorType); - if (jsonNode.asText().equals(sourceOperatorName)) { - long numRecordsOut = planStats.get(keyInputRows).asInt(); - long numBytesOut = planStats.get(keyInputBytes).asInt(); - sourceNumRecordsOut.inc(numRecordsOut - sourceNumRecordsOut.getCount()); - sourceNumBytesOut.inc(numBytesOut - sourceNumBytesOut.getCount()); + ObjectNode planStats = findSourceStats(taskStats, planId); + if (planStats != null) { + long inputRows = planStats.get(keyInputRows).asLong(); + long inputBytes = planStats.get(keyInputBytes).asLong(); + syncCounter(sourceNumRecordsOut, inputRows); + syncCounter(sourceNumBytesOut, inputBytes); + syncCounter(taskNumRecordsIn, inputRows); + syncCounter(taskNumRecordsOut, inputRows); + syncCounter(taskNumBytesIn, inputBytes); + syncCounter(taskNumBytesOut, inputBytes); } } catch (Exception e) { return false; @@ -60,4 +103,48 @@ public boolean updateMetrics(SerialTask task, String planId) { lastUpdateTime = currentTime; return true; } + + private ObjectNode findSourceStats(SerialTaskStats taskStats, String planId) { + try { + ObjectNode planStats = taskStats.planStats(planId); + if (isSourceStats(planStats)) { + return planStats; + } + } catch (Exception ignored) { + // Fall back to the unique TableScan below. + } + + ObjectNode sourceStats = null; + List allPlanStats = taskStats.planStats(); + for (ObjectNode planStats : allPlanStats) { + if (!isSourceStats(planStats)) { + continue; + } + if (sourceStats != null) { + return null; + } + sourceStats = planStats; + } + return sourceStats; + } + + private boolean isSourceStats(ObjectNode planStats) { + JsonNode operatorType = planStats.get(keyOperatorType); + return operatorType != null + && sourceOperatorName.equals(operatorType.asText()) + && planStats.has(keyInputRows) + && planStats.has(keyInputBytes); + } + + private void syncCounter(Counter counter, long value) { + if (counter == null) { + return; + } + long delta = value - counter.getCount(); + if (delta > 0) { + counter.inc(delta); + } else if (delta < 0) { + counter.dec(-delta); + } + } } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index 3aa941dbf00..94ea05f5017 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -37,6 +37,8 @@ import io.github.zhztheplayer.velox4j.type.RowType; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -66,6 +68,7 @@ public class GlutenOneInputOperator extends TableStreamOperator private final Class outClass; private transient VectorInputBridge inputBridge; private transient VectorOutputBridge outputBridge; + private transient Counter taskNumRecordsOut; private final GlutenMailboxHolder mailboxHolder = new GlutenMailboxHolder(); public GlutenOneInputOperator( @@ -151,6 +154,10 @@ void initSession() { @Override public void open() throws Exception { super.open(); + if (metrics instanceof InternalOperatorMetricGroup) { + taskNumRecordsOut = + ((InternalOperatorMetricGroup) metrics).getTaskIOMetricGroup().getNumRecordsOutCounter(); + } if (!mailboxHolder().get().isMailboxBound()) { ensureMailboxInitialized(getContainingTask()); } @@ -201,8 +208,12 @@ private void drainTaskOutput() { StatefulWatermark watermark = statefulElement.asWatermark(); output.emitWatermark(new Watermark(watermark.getTimestamp())); } else { - outputBridge.collect( - output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType); + long emittedRecords = + outputBridge.collect( + output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType); + if (taskNumRecordsOut != null) { + taskNumRecordsOut.inc(emittedRecords); + } } } finally { statefulElement.close(); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index 28791a67c3c..f18dd8cf9c0 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -33,6 +33,8 @@ import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark; import io.github.zhztheplayer.velox4j.type.RowType; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -71,6 +73,7 @@ public class GlutenTwoInputOperator extends AbstractStreamOperator private final Class outClass; private VectorInputBridge inputBridge; private VectorOutputBridge outputBridge; + private transient Counter taskNumRecordsOut; private String description; private final GlutenMailboxHolder mailboxHolder = new GlutenMailboxHolder(); @@ -118,6 +121,10 @@ public String getDescription() { @Override public void open() throws Exception { super.open(); + if (metrics instanceof InternalOperatorMetricGroup) { + taskNumRecordsOut = + ((InternalOperatorMetricGroup) metrics).getTaskIOMetricGroup().getNumRecordsOutCounter(); + } if (!mailboxHolder().get().isMailboxBound()) { ensureMailboxInitialized(getContainingTask()); } @@ -181,8 +188,12 @@ private void drainTaskOutput() { StatefulWatermark watermark = element.asWatermark(); output.emitWatermark(new Watermark(watermark.getTimestamp())); } else { - outputBridge.collect( - output, element.asRecord(), sessionResource.getAllocator(), outputType); + long emittedRecords = + outputBridge.collect( + output, element.asRecord(), sessionResource.getAllocator(), outputType); + if (taskNumRecordsOut != null) { + taskNumRecordsOut.inc(emittedRecords); + } } } finally { element.close(); diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/VectorOutputBridge.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/VectorOutputBridge.java index 4849f90e946..8d44a18ca2d 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/VectorOutputBridge.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/VectorOutputBridge.java @@ -40,7 +40,7 @@ public interface VectorOutputBridge extends Serializable { * @param allocator buffer allocator for converting RowVector * @param outputType the RowType schema of the output */ - void collect( + long collect( Output> collector, StatefulRecord record, BufferAllocator allocator, @@ -85,7 +85,7 @@ public RowDataOutputBridge() { } @Override - public void collect( + public long collect( Output> collector, StatefulRecord record, BufferAllocator allocator, @@ -96,6 +96,7 @@ public void collect( for (org.apache.flink.table.data.RowData row : rows) { collector.collect(getOrCreateOutputElement().replace(row)); } + return rows.size(); } private StreamRecord getOrCreateOutputElement() { @@ -119,12 +120,13 @@ public StatefulRecordOutputBridge() { } @Override - public void collect( + public long collect( Output> collector, StatefulRecord record, BufferAllocator allocator, RowType outputType) { collector.collect(getOrCreateOutputElement().replace(record)); + return 1; } private StreamRecord getOrCreateOutputElement() { diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamFilterTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamFilterTest.java index 1955ece25ae..11c5c623965 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamFilterTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamFilterTest.java @@ -64,6 +64,30 @@ public void testGreaterThanFilter() throws Exception { expectedOutput); } + @Test + public void testTaskOutputMetricsCountEmittedRows() throws Exception { + RexNode filterCondition = + createFilterCondition(SqlTypeName.INTEGER, 2, 18, SqlStdOperatorTable.GREATER_THAN); + PlanNode veloxPlan = createFilterPlan(filterCondition, rowType); + GlutenOneInputOperator operator = createTestOperator(veloxPlan, typeInfo, typeInfo); + + OneInputStreamOperatorTestHarness harness = + createTestHarness(operator, typeInfo, typeInfo); + + processTestData(harness, testData); + + assertThat( + harness + .getEnvironment() + .getMetricGroup() + .getIOMetricGroup() + .getNumRecordsOutCounter() + .getCount()) + .isEqualTo(4); + + harness.close(); + } + @Test public void testLessThanFilter() throws Exception { List expectedOutput = diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetricsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetricsTest.java new file mode 100644 index 00000000000..2ab5ebf74e4 --- /dev/null +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetricsTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.metrics; + +import io.github.zhztheplayer.velox4j.query.SerialTaskStats; + +import org.apache.flink.metrics.SimpleCounter; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class SourceTaskMetricsTest { + + @Test + void updateMetricsUsesExactTableScanPlanId() { + SimpleCounter rows = new SimpleCounter(); + SimpleCounter bytes = new SimpleCounter(); + SourceTaskMetrics metrics = new SourceTaskMetrics(rows, bytes); + + assertThat(metrics.updateMetrics(statsWithSingleTableScan(), "scan-1")).isTrue(); + + assertThat(rows.getCount()).isEqualTo(5); + assertThat(bytes.getCount()).isEqualTo(123); + } + + @Test + void updateMetricsSyncsTaskIoCounters() { + SimpleCounter sourceRows = new SimpleCounter(); + SimpleCounter sourceBytes = new SimpleCounter(); + SimpleCounter taskRowsIn = new SimpleCounter(); + SimpleCounter taskRowsOut = new SimpleCounter(); + SimpleCounter taskBytesIn = new SimpleCounter(); + SimpleCounter taskBytesOut = new SimpleCounter(); + SourceTaskMetrics metrics = + new SourceTaskMetrics( + sourceRows, sourceBytes, taskRowsIn, taskRowsOut, taskBytesIn, taskBytesOut); + + assertThat(metrics.updateMetrics(statsWithSingleTableScan(), "scan-1")).isTrue(); + + assertThat(sourceRows.getCount()).isEqualTo(5); + assertThat(sourceBytes.getCount()).isEqualTo(123); + assertThat(taskRowsIn.getCount()).isEqualTo(5); + assertThat(taskRowsOut.getCount()).isEqualTo(5); + assertThat(taskBytesIn.getCount()).isEqualTo(123); + assertThat(taskBytesOut.getCount()).isEqualTo(123); + } + + @Test + void updateMetricsFallsBackToUniqueTableScanWhenPlanIdDiffers() { + SimpleCounter rows = new SimpleCounter(); + SimpleCounter bytes = new SimpleCounter(); + SourceTaskMetrics metrics = new SourceTaskMetrics(rows, bytes); + + assertThat(metrics.updateMetrics(statsWithSingleTableScan(), "flink-source-id")).isTrue(); + + assertThat(rows.getCount()).isEqualTo(5); + assertThat(bytes.getCount()).isEqualTo(123); + } + + @Test + void updateMetricsDoesNotGuessWhenMultipleTableScansExist() { + SimpleCounter rows = new SimpleCounter(); + SimpleCounter bytes = new SimpleCounter(); + SourceTaskMetrics metrics = new SourceTaskMetrics(rows, bytes); + + assertThat(metrics.updateMetrics(statsWithMultipleTableScans(), "flink-source-id")).isTrue(); + + assertThat(rows.getCount()).isZero(); + assertThat(bytes.getCount()).isZero(); + } + + private static SerialTaskStats statsWithSingleTableScan() { + return SerialTaskStats.fromJson( + "{" + + "\"planStats\":[" + + "{" + + "\"planNodeId\":\"scan-1\"," + + "\"operatorType\":\"TableScan\"," + + "\"rawInputRows\":5," + + "\"rawInputBytes\":123" + + "}," + + "{" + + "\"planNodeId\":\"project-1\"," + + "\"operatorType\":\"FilterProject\"," + + "\"rawInputRows\":0," + + "\"rawInputBytes\":0" + + "}" + + "]" + + "}"); + } + + private static SerialTaskStats statsWithMultipleTableScans() { + return SerialTaskStats.fromJson( + "{" + + "\"planStats\":[" + + "{" + + "\"planNodeId\":\"scan-1\"," + + "\"operatorType\":\"TableScan\"," + + "\"rawInputRows\":5," + + "\"rawInputBytes\":123" + + "}," + + "{" + + "\"planNodeId\":\"scan-2\"," + + "\"operatorType\":\"TableScan\"," + + "\"rawInputRows\":7," + + "\"rawInputBytes\":456" + + "}" + + "]" + + "}"); + } +} From 11291c1700cc676e071838de5da90c8a1ef35c45 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Thu, 18 Jun 2026 17:45:40 +0800 Subject: [PATCH 3/9] fix: keep Gluten compatible with older velox4j APIs --- .../runtime/metrics/SourceTaskMetrics.java | 16 ++++++++-- .../operators/GlutenSourceFunction.java | 32 +++++++++++++++++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java index 9bf775729b9..cacc8eba9a7 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java @@ -27,8 +27,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import java.util.List; - public class SourceTaskMetrics { private final String keyOperatorType = "operatorType"; @@ -114,8 +112,11 @@ private ObjectNode findSourceStats(SerialTaskStats taskStats, String planId) { // Fall back to the unique TableScan below. } + Iterable allPlanStats = allPlanStats(taskStats); + if (allPlanStats == null) { + return null; + } ObjectNode sourceStats = null; - List allPlanStats = taskStats.planStats(); for (ObjectNode planStats : allPlanStats) { if (!isSourceStats(planStats)) { continue; @@ -128,6 +129,15 @@ private ObjectNode findSourceStats(SerialTaskStats taskStats, String planId) { return sourceStats; } + @SuppressWarnings("unchecked") + private Iterable allPlanStats(SerialTaskStats taskStats) { + try { + return (Iterable) taskStats.getClass().getMethod("planStats").invoke(taskStats); + } catch (Exception ignored) { + return null; + } + } + private boolean isSourceStats(ObjectNode planStats) { JsonNode operatorType = planStats.get(keyOperatorType); return operatorType != null diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index ed77629b2f9..63c3bceb08b 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -211,7 +211,7 @@ public void close() throws Exception { @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { checkpointState.clear(); - String[] checkpointRecords = this.task.snapshotState(context.getCheckpointId()); + String[] checkpointRecords = snapshotNativeState(context.getCheckpointId()); for (String checkpointRecord : checkpointRecords) { checkpointState.add(checkpointRecord); } @@ -232,7 +232,7 @@ public void initializeState(FunctionInitializationContext context) throws Except restoredCheckpointRecords = records.toArray(new String[0]); } initSession(); - this.task.initializeState(0, null, restoredCheckpointRecords); + initializeNativeState(restoredCheckpointRecords); } public String[] notifyCheckpointComplete(long checkpointId) throws Exception { @@ -245,6 +245,34 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { this.task.notifyCheckpointAborted(checkpointId); } + private String[] snapshotNativeState(long checkpointId) throws Exception { + Object checkpointRecords = + task.getClass().getMethod("snapshotState", long.class).invoke(task, checkpointId); + if (checkpointRecords instanceof String[]) { + return (String[]) checkpointRecords; + } + return new String[0]; + } + + private void initializeNativeState(String[] checkpointRecords) throws Exception { + try { + task.getClass() + .getMethod( + "initializeState", + long.class, + io.github.zhztheplayer.velox4j.stateful.KeyedStateBackendParameters.class, + String[].class) + .invoke(task, 0L, null, checkpointRecords); + } catch (NoSuchMethodException e) { + task.getClass() + .getMethod( + "initializeState", + long.class, + io.github.zhztheplayer.velox4j.stateful.KeyedStateBackendParameters.class) + .invoke(task, 0L, null); + } + } + private void initSession() { if (sessionResource != null) { return; From e5a668a0162ad08455b4a77e033938ae8687cd6e Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Thu, 18 Jun 2026 17:57:39 +0800 Subject: [PATCH 4/9] Revert "fix: keep Gluten compatible with older velox4j APIs" This reverts commit 67a3cccbf6013af0e91c3a5b56e85bc4d869af2b. --- .../runtime/metrics/SourceTaskMetrics.java | 16 ++-------- .../operators/GlutenSourceFunction.java | 32 ++----------------- 2 files changed, 5 insertions(+), 43 deletions(-) diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java index cacc8eba9a7..9bf775729b9 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java @@ -27,6 +27,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.List; + public class SourceTaskMetrics { private final String keyOperatorType = "operatorType"; @@ -112,11 +114,8 @@ private ObjectNode findSourceStats(SerialTaskStats taskStats, String planId) { // Fall back to the unique TableScan below. } - Iterable allPlanStats = allPlanStats(taskStats); - if (allPlanStats == null) { - return null; - } ObjectNode sourceStats = null; + List allPlanStats = taskStats.planStats(); for (ObjectNode planStats : allPlanStats) { if (!isSourceStats(planStats)) { continue; @@ -129,15 +128,6 @@ private ObjectNode findSourceStats(SerialTaskStats taskStats, String planId) { return sourceStats; } - @SuppressWarnings("unchecked") - private Iterable allPlanStats(SerialTaskStats taskStats) { - try { - return (Iterable) taskStats.getClass().getMethod("planStats").invoke(taskStats); - } catch (Exception ignored) { - return null; - } - } - private boolean isSourceStats(ObjectNode planStats) { JsonNode operatorType = planStats.get(keyOperatorType); return operatorType != null diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index 63c3bceb08b..ed77629b2f9 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -211,7 +211,7 @@ public void close() throws Exception { @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { checkpointState.clear(); - String[] checkpointRecords = snapshotNativeState(context.getCheckpointId()); + String[] checkpointRecords = this.task.snapshotState(context.getCheckpointId()); for (String checkpointRecord : checkpointRecords) { checkpointState.add(checkpointRecord); } @@ -232,7 +232,7 @@ public void initializeState(FunctionInitializationContext context) throws Except restoredCheckpointRecords = records.toArray(new String[0]); } initSession(); - initializeNativeState(restoredCheckpointRecords); + this.task.initializeState(0, null, restoredCheckpointRecords); } public String[] notifyCheckpointComplete(long checkpointId) throws Exception { @@ -245,34 +245,6 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { this.task.notifyCheckpointAborted(checkpointId); } - private String[] snapshotNativeState(long checkpointId) throws Exception { - Object checkpointRecords = - task.getClass().getMethod("snapshotState", long.class).invoke(task, checkpointId); - if (checkpointRecords instanceof String[]) { - return (String[]) checkpointRecords; - } - return new String[0]; - } - - private void initializeNativeState(String[] checkpointRecords) throws Exception { - try { - task.getClass() - .getMethod( - "initializeState", - long.class, - io.github.zhztheplayer.velox4j.stateful.KeyedStateBackendParameters.class, - String[].class) - .invoke(task, 0L, null, checkpointRecords); - } catch (NoSuchMethodException e) { - task.getClass() - .getMethod( - "initializeState", - long.class, - io.github.zhztheplayer.velox4j.stateful.KeyedStateBackendParameters.class) - .invoke(task, 0L, null); - } - } - private void initSession() { if (sessionResource != null) { return; From 4e4d4839723312577720185d9b26286d47765e34 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Thu, 18 Jun 2026 17:59:44 +0800 Subject: [PATCH 5/9] ci: update velox4j pin for Flink tests --- .github/workflows/flink.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index cf607f5f304..d87b7e917e2 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca + cd velox4j && git reset --hard 2bc5a2cb2588631a2e07097936383fff1fd271bf git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From dcc5bb59152f2c7e1ec8d85339d366c1f5870638 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Mon, 22 Jun 2026 12:31:31 +0800 Subject: [PATCH 6/9] ci: update velox4j pin --- .github/workflows/flink.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index d87b7e917e2..49f000612ff 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 2bc5a2cb2588631a2e07097936383fff1fd271bf + cd velox4j && git reset --hard 5bfa9e69f31cdb95a5b9471b17889ee3cd3b0b1a git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From de4e2343900297b130b509b06c898a9ff246783d Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 09:13:37 +0800 Subject: [PATCH 7/9] ci: fix velox4j ckpt pin --- .github/workflows/flink.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index 49f000612ff..2fab1b5b215 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 5bfa9e69f31cdb95a5b9471b17889ee3cd3b0b1a + cd velox4j && git reset --hard 5bfa9e692ddee42489781d17121ead506751f622 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From 14a044601885759f2321a12240844bb52f2c18f9 Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Tue, 23 Jun 2026 14:26:43 +0800 Subject: [PATCH 8/9] ci: update velox4j ckpt pin --- .github/workflows/flink.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index 2fab1b5b215..7404b4cc308 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 5bfa9e692ddee42489781d17121ead506751f622 + cd velox4j && git reset --hard 8ceb4e5e108588d92b89498bb3f1a3c0c4b029b5 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. From e03c4269a5baff9e5d8aacbfa64fff40fc930c4f Mon Sep 17 00:00:00 2001 From: zhangzhibiao Date: Sat, 27 Jun 2026 13:14:17 +0800 Subject: [PATCH 9/9] [FLINK] Implement prepareSnapshotPreBarrier for Gluten operators Inject barrier into Velox pipeline via injectBarrier(), then synchronously drain task output until the barrier emerges, ensuring all operators have snapshot their state and sinks have flushed before Flink checkpoint proceeds. --- .../operators/GlutenOneInputOperator.java | 44 ++++++++++- .../operators/GlutenTwoInputOperator.java | 41 +++++++++- .../GlutenCheckpointBarrierTest.java | 75 +++++++++++++++++++ 3 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenCheckpointBarrierTest.java diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index 94ea05f5017..c803f0af6f5 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -204,6 +204,10 @@ private void drainTaskOutput() { if (state == UpIterator.State.AVAILABLE) { final StatefulElement statefulElement = task.statefulGet(); try { + if (statefulElement.isBarrier()) { + // Barriers should not appear during normal draining; skip. + continue; + } if (statefulElement.isWatermark()) { StatefulWatermark watermark = statefulElement.asWatermark(); output.emitWatermark(new Watermark(watermark.getTimestamp())); @@ -283,10 +287,48 @@ public String getId() { @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { - // TODO: notify velox + // Inject barrier into Velox operator chain, then drain output until the + // barrier emerges, indicating all operators have snapshot and sinks flushed. + task.injectBarrier(checkpointId); + drainUntilBarrier(checkpointId); super.prepareSnapshotPreBarrier(checkpointId); } + private void drainUntilBarrier(long checkpointId) { + while (true) { + UpIterator.State state = task.advance(); + if (state == UpIterator.State.AVAILABLE) { + final StatefulElement statefulElement = task.statefulGet(); + try { + if (statefulElement.isBarrier()) { + if (statefulElement.asBarrier().getCheckpointId() == checkpointId) { + return; + } + // Stale barrier from a previous checkpoint, ignore. + continue; + } + if (statefulElement.isWatermark()) { + StatefulWatermark watermark = statefulElement.asWatermark(); + output.emitWatermark(new Watermark(watermark.getTimestamp())); + } else { + long emittedRecords = + outputBridge.collect( + output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType); + if (taskNumRecordsOut != null) { + taskNumRecordsOut.inc(emittedRecords); + } + } + } finally { + statefulElement.close(); + } + } else { + // BLOCKED or FINISHED - should not happen during barrier drain since + // processBarrier is synchronous. Spin to retry. + Thread.yield(); + } + } + } + @Override public void snapshotState(StateSnapshotContext context) throws Exception { // TODO: implement it diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index f18dd8cf9c0..69eb99c1b77 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -184,6 +184,10 @@ private void drainTaskOutput() { if (state == UpIterator.State.AVAILABLE) { final StatefulElement element = task.statefulGet(); try { + if (element.isBarrier()) { + // Barriers should not appear during normal draining; skip. + continue; + } if (element.isWatermark()) { StatefulWatermark watermark = element.asWatermark(); output.emitWatermark(new Watermark(watermark.getTimestamp())); @@ -271,10 +275,45 @@ public String getRightId() { @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { - // TODO: notify velox + // Inject barrier into Velox operator chain, then drain output until the + // barrier emerges, indicating all operators have snapshot and sinks flushed. + task.injectBarrier(checkpointId); + drainUntilBarrier(checkpointId); super.prepareSnapshotPreBarrier(checkpointId); } + private void drainUntilBarrier(long checkpointId) { + while (true) { + UpIterator.State state = task.advance(); + if (state == UpIterator.State.AVAILABLE) { + final StatefulElement statefulElement = task.statefulGet(); + try { + if (statefulElement.isBarrier()) { + if (statefulElement.asBarrier().getCheckpointId() == checkpointId) { + return; + } + continue; + } + if (statefulElement.isWatermark()) { + StatefulWatermark watermark = statefulElement.asWatermark(); + output.emitWatermark(new Watermark(watermark.getTimestamp())); + } else { + long emittedRecords = + outputBridge.collect( + output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType); + if (taskNumRecordsOut != null) { + taskNumRecordsOut.inc(emittedRecords); + } + } + } finally { + statefulElement.close(); + } + } else { + Thread.yield(); + } + } + } + @Override public void snapshotState(StateSnapshotContext context) throws Exception { // TODO: implement it diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenCheckpointBarrierTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenCheckpointBarrierTest.java new file mode 100644 index 00000000000..bd09ffc91e1 --- /dev/null +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenCheckpointBarrierTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.streaming.api.operators; + +import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator; +import org.apache.gluten.util.PlanNodeIdGenerator; + +import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle; +import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; +import io.github.zhztheplayer.velox4j.plan.TableScanNode; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Tests that {@link GlutenOneInputOperator#prepareSnapshotPreBarrier} correctly injects a barrier + * into the Velox pipeline and that the barrier propagates through the operator chain. + */ +public class GlutenCheckpointBarrierTest extends GlutenStreamOperatorTestBase { + + @Test + public void testPrepareSnapshotPreBarrierInjectsBarrier() throws Exception { + RowType flinkType = RowType.of(new LogicalType[] {new IntType()}, new String[] {"v"}); + io.github.zhztheplayer.velox4j.type.RowType veloxType = convertToVeloxType(flinkType); + + String scanId = "scan-1"; + TableScanNode scanNode = + new TableScanNode( + scanId, + veloxType, + new ExternalStreamTableHandle("connector-external-stream"), + java.util.List.of()); + + GlutenOneInputOperator operator = + new GlutenOneInputOperator( + new StatefulPlanNode(scanNode.getId(), scanNode), + PlanNodeIdGenerator.newId(), + veloxType, + java.util.Map.of(scanNode.getId(), veloxType), + RowData.class, + RowData.class); + + TypeInformation typeInfo = InternalTypeInfo.of(rowType); + org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness harness = + createTestHarness(operator, typeInfo, typeInfo); + + processTestData(harness, testData); + + assertDoesNotThrow(() -> operator.prepareSnapshotPreBarrier(1L)); + + harness.close(); + } +}