diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index cf607f5f304..7404b4cc308 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca + cd velox4j && git reset --hard 8ceb4e5e108588d92b89498bb3f1a3c0c4b029b5 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java index 5bbbb028b6a..9bf775729b9 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetrics.java @@ -17,13 +17,18 @@ package org.apache.gluten.table.runtime.metrics; import io.github.zhztheplayer.velox4j.query.SerialTask; +import io.github.zhztheplayer.velox4j.query.SerialTaskStats; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.List; + public class SourceTaskMetrics { private final String keyOperatorType = "operatorType"; @@ -33,26 +38,64 @@ public class SourceTaskMetrics { private final long metricUpdateInterval = 2000; private Counter sourceNumRecordsOut; private Counter sourceNumBytesOut; - private long lastUpdateTime = System.currentTimeMillis(); + private Counter taskNumRecordsIn; + private Counter taskNumRecordsOut; + private Counter taskNumBytesIn; + private Counter taskNumBytesOut; + private long lastUpdateTime = 0; public SourceTaskMetrics(OperatorMetricGroup metricGroup) { sourceNumRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); sourceNumBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); + if (metricGroup instanceof InternalOperatorMetricGroup) { + TaskIOMetricGroup taskIOMetricGroup = + ((InternalOperatorMetricGroup) metricGroup).getTaskIOMetricGroup(); + taskNumRecordsIn = taskIOMetricGroup.getNumRecordsInCounter(); + taskNumRecordsOut = taskIOMetricGroup.getNumRecordsOutCounter(); + taskNumBytesIn = taskIOMetricGroup.getNumBytesInCounter(); + taskNumBytesOut = taskIOMetricGroup.getNumBytesOutCounter(); + } + } + + SourceTaskMetrics(Counter sourceNumRecordsOut, Counter sourceNumBytesOut) { + this(sourceNumRecordsOut, sourceNumBytesOut, null, null, null, null); + } + + SourceTaskMetrics( + Counter sourceNumRecordsOut, + Counter sourceNumBytesOut, + Counter taskNumRecordsIn, + Counter taskNumRecordsOut, + Counter taskNumBytesIn, + Counter taskNumBytesOut) { + this.sourceNumRecordsOut = sourceNumRecordsOut; + this.sourceNumBytesOut = sourceNumBytesOut; + this.taskNumRecordsIn = taskNumRecordsIn; + this.taskNumRecordsOut = taskNumRecordsOut; + this.taskNumBytesIn = taskNumBytesIn; + this.taskNumBytesOut = taskNumBytesOut; } public boolean updateMetrics(SerialTask task, String planId) { + return updateMetrics(task.collectStats(), planId); + } + + boolean updateMetrics(SerialTaskStats taskStats, String planId) { long currentTime = System.currentTimeMillis(); if (currentTime - lastUpdateTime < metricUpdateInterval) { return false; } try { - ObjectNode planStats = task.collectStats().planStats(planId); - JsonNode jsonNode = planStats.get(keyOperatorType); - if (jsonNode.asText().equals(sourceOperatorName)) { - long numRecordsOut = planStats.get(keyInputRows).asInt(); - long numBytesOut = planStats.get(keyInputBytes).asInt(); - sourceNumRecordsOut.inc(numRecordsOut - sourceNumRecordsOut.getCount()); - sourceNumBytesOut.inc(numBytesOut - sourceNumBytesOut.getCount()); + ObjectNode planStats = findSourceStats(taskStats, planId); + if (planStats != null) { + long inputRows = planStats.get(keyInputRows).asLong(); + long inputBytes = planStats.get(keyInputBytes).asLong(); + syncCounter(sourceNumRecordsOut, inputRows); + syncCounter(sourceNumBytesOut, inputBytes); + syncCounter(taskNumRecordsIn, inputRows); + syncCounter(taskNumRecordsOut, inputRows); + syncCounter(taskNumBytesIn, inputBytes); + syncCounter(taskNumBytesOut, inputBytes); } } catch (Exception e) { return false; @@ -60,4 +103,48 @@ public boolean updateMetrics(SerialTask task, String planId) { lastUpdateTime = currentTime; return true; } + + private ObjectNode findSourceStats(SerialTaskStats taskStats, String planId) { + try { + ObjectNode planStats = taskStats.planStats(planId); + if (isSourceStats(planStats)) { + return planStats; + } + } catch (Exception ignored) { + // Fall back to the unique TableScan below. + } + + ObjectNode sourceStats = null; + List allPlanStats = taskStats.planStats(); + for (ObjectNode planStats : allPlanStats) { + if (!isSourceStats(planStats)) { + continue; + } + if (sourceStats != null) { + return null; + } + sourceStats = planStats; + } + return sourceStats; + } + + private boolean isSourceStats(ObjectNode planStats) { + JsonNode operatorType = planStats.get(keyOperatorType); + return operatorType != null + && sourceOperatorName.equals(operatorType.asText()) + && planStats.has(keyInputRows) + && planStats.has(keyInputBytes); + } + + private void syncCounter(Counter counter, long value) { + if (counter == null) { + return; + } + long delta = value - counter.getCount(); + if (delta > 0) { + counter.inc(delta); + } else if (delta < 0) { + counter.dec(-delta); + } + } } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index 09b059f3bc2..061f5d1706b 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -37,6 +37,8 @@ import io.github.zhztheplayer.velox4j.type.RowType; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -66,6 +68,7 @@ public class GlutenOneInputOperator extends TableStreamOperator private final Class outClass; private transient VectorInputBridge inputBridge; private transient VectorOutputBridge outputBridge; + private transient Counter taskNumRecordsOut; private final GlutenMailboxHolder mailboxHolder = new GlutenMailboxHolder(); public GlutenOneInputOperator( @@ -151,6 +154,10 @@ void initSession() { @Override public void open() throws Exception { super.open(); + if (metrics instanceof InternalOperatorMetricGroup) { + taskNumRecordsOut = + ((InternalOperatorMetricGroup) metrics).getTaskIOMetricGroup().getNumRecordsOutCounter(); + } if (!mailboxHolder().get().isMailboxBound()) { ensureMailboxInitialized(getContainingTask()); } @@ -201,8 +208,12 @@ private void drainTaskOutput() { StatefulWatermark watermark = statefulElement.asWatermark(); output.emitWatermark(new Watermark(watermark.getTimestamp())); } else { - outputBridge.collect( - output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType); + long emittedRecords = + outputBridge.collect( + output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType); + if (taskNumRecordsOut != null) { + taskNumRecordsOut.inc(emittedRecords); + } } } finally { statefulElement.close(); @@ -272,14 +283,16 @@ public String getId() { @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { - // TODO: notify velox + // Drain any in-flight data from the Velox pipeline before snapshot. + // Flink has already aligned barriers, so no new input will arrive. + drainTaskOutput(); super.prepareSnapshotPreBarrier(checkpointId); } @Override public void snapshotState(StateSnapshotContext context) throws Exception { // TODO: implement it - task.snapshotState(0); + task.snapshotState(context.getCheckpointId()); super.snapshotState(context); } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java index 53f36fcf67c..ed77629b2f9 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java @@ -32,6 +32,8 @@ import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark; import io.github.zhztheplayer.velox4j.type.RowType; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -43,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -65,6 +68,8 @@ public class GlutenSourceFunction extends RichParallelSourceFunction private SerialTask task; private SourceTaskMetrics taskMetrics; private final Class outClass; + private transient ListState checkpointState; + private transient String[] restoredCheckpointRecords = new String[0]; public GlutenSourceFunction( StatefulPlanNode planNode, @@ -205,15 +210,29 @@ public void close() throws Exception { @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { - // TODO: implement it - this.task.snapshotState(0); + checkpointState.clear(); + String[] checkpointRecords = this.task.snapshotState(context.getCheckpointId()); + for (String checkpointRecord : checkpointRecords) { + checkpointState.add(checkpointRecord); + } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { + checkpointState = + context + .getOperatorStateStore() + .getListState( + new ListStateDescriptor<>("gluten-native-source-checkpoint", String.class)); + if (context.isRestored()) { + List records = new ArrayList<>(); + for (String checkpointRecord : checkpointState.get()) { + records.add(checkpointRecord); + } + restoredCheckpointRecords = records.toArray(new String[0]); + } initSession(); - // TODO: implement it - this.task.initializeState(0, null); + this.task.initializeState(0, null, restoredCheckpointRecords); } public String[] notifyCheckpointComplete(long checkpointId) throws Exception { diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index 3f73ad4bbd8..a602578018d 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -33,6 +33,8 @@ import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark; import io.github.zhztheplayer.velox4j.type.RowType; +import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -71,6 +73,7 @@ public class GlutenTwoInputOperator extends AbstractStreamOperator private final Class outClass; private VectorInputBridge inputBridge; private VectorOutputBridge outputBridge; + private transient Counter taskNumRecordsOut; private String description; private final GlutenMailboxHolder mailboxHolder = new GlutenMailboxHolder(); @@ -118,6 +121,10 @@ public String getDescription() { @Override public void open() throws Exception { super.open(); + if (metrics instanceof InternalOperatorMetricGroup) { + taskNumRecordsOut = + ((InternalOperatorMetricGroup) metrics).getTaskIOMetricGroup().getNumRecordsOutCounter(); + } if (!mailboxHolder().get().isMailboxBound()) { ensureMailboxInitialized(getContainingTask()); } @@ -181,8 +188,12 @@ private void drainTaskOutput() { StatefulWatermark watermark = element.asWatermark(); output.emitWatermark(new Watermark(watermark.getTimestamp())); } else { - outputBridge.collect( - output, element.asRecord(), sessionResource.getAllocator(), outputType); + long emittedRecords = + outputBridge.collect( + output, element.asRecord(), sessionResource.getAllocator(), outputType); + if (taskNumRecordsOut != null) { + taskNumRecordsOut.inc(emittedRecords); + } } } finally { element.close(); @@ -260,14 +271,16 @@ public String getRightId() { @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { - // TODO: notify velox + // Drain any in-flight data from the Velox pipeline before snapshot. + // Flink has already aligned barriers, so no new input will arrive. + drainTaskOutput(); super.prepareSnapshotPreBarrier(checkpointId); } @Override public void snapshotState(StateSnapshotContext context) throws Exception { // TODO: implement it - task.snapshotState(0); + task.snapshotState(context.getCheckpointId()); super.snapshotState(context); } diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/VectorOutputBridge.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/VectorOutputBridge.java index 4849f90e946..8d44a18ca2d 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/VectorOutputBridge.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/VectorOutputBridge.java @@ -40,7 +40,7 @@ public interface VectorOutputBridge extends Serializable { * @param allocator buffer allocator for converting RowVector * @param outputType the RowType schema of the output */ - void collect( + long collect( Output> collector, StatefulRecord record, BufferAllocator allocator, @@ -85,7 +85,7 @@ public RowDataOutputBridge() { } @Override - public void collect( + public long collect( Output> collector, StatefulRecord record, BufferAllocator allocator, @@ -96,6 +96,7 @@ public void collect( for (org.apache.flink.table.data.RowData row : rows) { collector.collect(getOrCreateOutputElement().replace(row)); } + return rows.size(); } private StreamRecord getOrCreateOutputElement() { @@ -119,12 +120,13 @@ public StatefulRecordOutputBridge() { } @Override - public void collect( + public long collect( Output> collector, StatefulRecord record, BufferAllocator allocator, RowType outputType) { collector.collect(getOrCreateOutputElement().replace(record)); + return 1; } private StreamRecord getOrCreateOutputElement() { diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamFilterTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamFilterTest.java index 1955ece25ae..11c5c623965 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamFilterTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/streaming/api/operators/GlutenStreamFilterTest.java @@ -64,6 +64,30 @@ public void testGreaterThanFilter() throws Exception { expectedOutput); } + @Test + public void testTaskOutputMetricsCountEmittedRows() throws Exception { + RexNode filterCondition = + createFilterCondition(SqlTypeName.INTEGER, 2, 18, SqlStdOperatorTable.GREATER_THAN); + PlanNode veloxPlan = createFilterPlan(filterCondition, rowType); + GlutenOneInputOperator operator = createTestOperator(veloxPlan, typeInfo, typeInfo); + + OneInputStreamOperatorTestHarness harness = + createTestHarness(operator, typeInfo, typeInfo); + + processTestData(harness, testData); + + assertThat( + harness + .getEnvironment() + .getMetricGroup() + .getIOMetricGroup() + .getNumRecordsOutCounter() + .getCount()) + .isEqualTo(4); + + harness.close(); + } + @Test public void testLessThanFilter() throws Exception { List expectedOutput = diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetricsTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetricsTest.java new file mode 100644 index 00000000000..2ab5ebf74e4 --- /dev/null +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/metrics/SourceTaskMetricsTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.metrics; + +import io.github.zhztheplayer.velox4j.query.SerialTaskStats; + +import org.apache.flink.metrics.SimpleCounter; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class SourceTaskMetricsTest { + + @Test + void updateMetricsUsesExactTableScanPlanId() { + SimpleCounter rows = new SimpleCounter(); + SimpleCounter bytes = new SimpleCounter(); + SourceTaskMetrics metrics = new SourceTaskMetrics(rows, bytes); + + assertThat(metrics.updateMetrics(statsWithSingleTableScan(), "scan-1")).isTrue(); + + assertThat(rows.getCount()).isEqualTo(5); + assertThat(bytes.getCount()).isEqualTo(123); + } + + @Test + void updateMetricsSyncsTaskIoCounters() { + SimpleCounter sourceRows = new SimpleCounter(); + SimpleCounter sourceBytes = new SimpleCounter(); + SimpleCounter taskRowsIn = new SimpleCounter(); + SimpleCounter taskRowsOut = new SimpleCounter(); + SimpleCounter taskBytesIn = new SimpleCounter(); + SimpleCounter taskBytesOut = new SimpleCounter(); + SourceTaskMetrics metrics = + new SourceTaskMetrics( + sourceRows, sourceBytes, taskRowsIn, taskRowsOut, taskBytesIn, taskBytesOut); + + assertThat(metrics.updateMetrics(statsWithSingleTableScan(), "scan-1")).isTrue(); + + assertThat(sourceRows.getCount()).isEqualTo(5); + assertThat(sourceBytes.getCount()).isEqualTo(123); + assertThat(taskRowsIn.getCount()).isEqualTo(5); + assertThat(taskRowsOut.getCount()).isEqualTo(5); + assertThat(taskBytesIn.getCount()).isEqualTo(123); + assertThat(taskBytesOut.getCount()).isEqualTo(123); + } + + @Test + void updateMetricsFallsBackToUniqueTableScanWhenPlanIdDiffers() { + SimpleCounter rows = new SimpleCounter(); + SimpleCounter bytes = new SimpleCounter(); + SourceTaskMetrics metrics = new SourceTaskMetrics(rows, bytes); + + assertThat(metrics.updateMetrics(statsWithSingleTableScan(), "flink-source-id")).isTrue(); + + assertThat(rows.getCount()).isEqualTo(5); + assertThat(bytes.getCount()).isEqualTo(123); + } + + @Test + void updateMetricsDoesNotGuessWhenMultipleTableScansExist() { + SimpleCounter rows = new SimpleCounter(); + SimpleCounter bytes = new SimpleCounter(); + SourceTaskMetrics metrics = new SourceTaskMetrics(rows, bytes); + + assertThat(metrics.updateMetrics(statsWithMultipleTableScans(), "flink-source-id")).isTrue(); + + assertThat(rows.getCount()).isZero(); + assertThat(bytes.getCount()).isZero(); + } + + private static SerialTaskStats statsWithSingleTableScan() { + return SerialTaskStats.fromJson( + "{" + + "\"planStats\":[" + + "{" + + "\"planNodeId\":\"scan-1\"," + + "\"operatorType\":\"TableScan\"," + + "\"rawInputRows\":5," + + "\"rawInputBytes\":123" + + "}," + + "{" + + "\"planNodeId\":\"project-1\"," + + "\"operatorType\":\"FilterProject\"," + + "\"rawInputRows\":0," + + "\"rawInputBytes\":0" + + "}" + + "]" + + "}"); + } + + private static SerialTaskStats statsWithMultipleTableScans() { + return SerialTaskStats.fromJson( + "{" + + "\"planStats\":[" + + "{" + + "\"planNodeId\":\"scan-1\"," + + "\"operatorType\":\"TableScan\"," + + "\"rawInputRows\":5," + + "\"rawInputBytes\":123" + + "}," + + "{" + + "\"planNodeId\":\"scan-2\"," + + "\"operatorType\":\"TableScan\"," + + "\"rawInputRows\":7," + + "\"rawInputBytes\":456" + + "}" + + "]" + + "}"); + } +}