Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
export fmt_SOURCE=BUNDLED
export folly_SOURCE=BUNDLED
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca
cd velox4j && git reset --hard 8ceb4e5e108588d92b89498bb3f1a3c0c4b029b5
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
$GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
package org.apache.gluten.table.runtime.metrics;

import io.github.zhztheplayer.velox4j.query.SerialTask;
import io.github.zhztheplayer.velox4j.query.SerialTaskStats;

import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.util.List;

public class SourceTaskMetrics {

private final String keyOperatorType = "operatorType";
Expand All @@ -33,31 +38,113 @@ public class SourceTaskMetrics {
private final long metricUpdateInterval = 2000;
private Counter sourceNumRecordsOut;
private Counter sourceNumBytesOut;
private long lastUpdateTime = System.currentTimeMillis();
private Counter taskNumRecordsIn;
private Counter taskNumRecordsOut;
private Counter taskNumBytesIn;
private Counter taskNumBytesOut;
private long lastUpdateTime = 0;

public SourceTaskMetrics(OperatorMetricGroup metricGroup) {
sourceNumRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
sourceNumBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
if (metricGroup instanceof InternalOperatorMetricGroup) {
TaskIOMetricGroup taskIOMetricGroup =
((InternalOperatorMetricGroup) metricGroup).getTaskIOMetricGroup();
taskNumRecordsIn = taskIOMetricGroup.getNumRecordsInCounter();
taskNumRecordsOut = taskIOMetricGroup.getNumRecordsOutCounter();
taskNumBytesIn = taskIOMetricGroup.getNumBytesInCounter();
taskNumBytesOut = taskIOMetricGroup.getNumBytesOutCounter();
}
}

SourceTaskMetrics(Counter sourceNumRecordsOut, Counter sourceNumBytesOut) {
this(sourceNumRecordsOut, sourceNumBytesOut, null, null, null, null);
}

SourceTaskMetrics(
Counter sourceNumRecordsOut,
Counter sourceNumBytesOut,
Counter taskNumRecordsIn,
Counter taskNumRecordsOut,
Counter taskNumBytesIn,
Counter taskNumBytesOut) {
this.sourceNumRecordsOut = sourceNumRecordsOut;
this.sourceNumBytesOut = sourceNumBytesOut;
this.taskNumRecordsIn = taskNumRecordsIn;
this.taskNumRecordsOut = taskNumRecordsOut;
this.taskNumBytesIn = taskNumBytesIn;
this.taskNumBytesOut = taskNumBytesOut;
}

public boolean updateMetrics(SerialTask task, String planId) {
return updateMetrics(task.collectStats(), planId);
}

boolean updateMetrics(SerialTaskStats taskStats, String planId) {
long currentTime = System.currentTimeMillis();
if (currentTime - lastUpdateTime < metricUpdateInterval) {
return false;
}
try {
ObjectNode planStats = task.collectStats().planStats(planId);
JsonNode jsonNode = planStats.get(keyOperatorType);
if (jsonNode.asText().equals(sourceOperatorName)) {
long numRecordsOut = planStats.get(keyInputRows).asInt();
long numBytesOut = planStats.get(keyInputBytes).asInt();
sourceNumRecordsOut.inc(numRecordsOut - sourceNumRecordsOut.getCount());
sourceNumBytesOut.inc(numBytesOut - sourceNumBytesOut.getCount());
ObjectNode planStats = findSourceStats(taskStats, planId);
if (planStats != null) {
long inputRows = planStats.get(keyInputRows).asLong();
long inputBytes = planStats.get(keyInputBytes).asLong();
syncCounter(sourceNumRecordsOut, inputRows);
syncCounter(sourceNumBytesOut, inputBytes);
syncCounter(taskNumRecordsIn, inputRows);
syncCounter(taskNumRecordsOut, inputRows);
syncCounter(taskNumBytesIn, inputBytes);
syncCounter(taskNumBytesOut, inputBytes);
}
} catch (Exception e) {
return false;
}
lastUpdateTime = currentTime;
return true;
}

private ObjectNode findSourceStats(SerialTaskStats taskStats, String planId) {
try {
ObjectNode planStats = taskStats.planStats(planId);
if (isSourceStats(planStats)) {
return planStats;
}
} catch (Exception ignored) {
// Fall back to the unique TableScan below.
}

ObjectNode sourceStats = null;
List<ObjectNode> allPlanStats = taskStats.planStats();
for (ObjectNode planStats : allPlanStats) {
if (!isSourceStats(planStats)) {
continue;
}
if (sourceStats != null) {
return null;
}
sourceStats = planStats;
}
return sourceStats;
}

private boolean isSourceStats(ObjectNode planStats) {
JsonNode operatorType = planStats.get(keyOperatorType);
return operatorType != null
&& sourceOperatorName.equals(operatorType.asText())
&& planStats.has(keyInputRows)
&& planStats.has(keyInputBytes);
}

private void syncCounter(Counter counter, long value) {
if (counter == null) {
return;
}
long delta = value - counter.getCount();
if (delta > 0) {
counter.inc(delta);
} else if (delta < 0) {
counter.dec(-delta);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.github.zhztheplayer.velox4j.type.RowType;

import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand Down Expand Up @@ -66,6 +68,7 @@ public class GlutenOneInputOperator<IN, OUT> extends TableStreamOperator<OUT>
private final Class<OUT> outClass;
private transient VectorInputBridge<IN> inputBridge;
private transient VectorOutputBridge<OUT> outputBridge;
private transient Counter taskNumRecordsOut;
private final GlutenMailboxHolder mailboxHolder = new GlutenMailboxHolder();

public GlutenOneInputOperator(
Expand Down Expand Up @@ -151,6 +154,10 @@ void initSession() {
@Override
public void open() throws Exception {
super.open();
if (metrics instanceof InternalOperatorMetricGroup) {
taskNumRecordsOut =
((InternalOperatorMetricGroup) metrics).getTaskIOMetricGroup().getNumRecordsOutCounter();
}
if (!mailboxHolder().get().isMailboxBound()) {
ensureMailboxInitialized(getContainingTask());
}
Expand Down Expand Up @@ -197,12 +204,20 @@ private void drainTaskOutput() {
if (state == UpIterator.State.AVAILABLE) {
final StatefulElement statefulElement = task.statefulGet();
try {
if (statefulElement.isBarrier()) {
// Barriers should not appear during normal draining; skip.
continue;
}
if (statefulElement.isWatermark()) {
StatefulWatermark watermark = statefulElement.asWatermark();
output.emitWatermark(new Watermark(watermark.getTimestamp()));
} else {
outputBridge.collect(
output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType);
long emittedRecords =
outputBridge.collect(
output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType);
if (taskNumRecordsOut != null) {
taskNumRecordsOut.inc(emittedRecords);
}
}
} finally {
statefulElement.close();
Expand Down Expand Up @@ -272,14 +287,52 @@ public String getId() {

@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
// TODO: notify velox
// Inject barrier into Velox operator chain, then drain output until the
// barrier emerges, indicating all operators have snapshot and sinks flushed.
task.injectBarrier(checkpointId);
drainUntilBarrier(checkpointId);
super.prepareSnapshotPreBarrier(checkpointId);
}

private void drainUntilBarrier(long checkpointId) {
while (true) {
UpIterator.State state = task.advance();
if (state == UpIterator.State.AVAILABLE) {
final StatefulElement statefulElement = task.statefulGet();
try {
if (statefulElement.isBarrier()) {
if (statefulElement.asBarrier().getCheckpointId() == checkpointId) {
return;
}
// Stale barrier from a previous checkpoint, ignore.
continue;
}
if (statefulElement.isWatermark()) {
StatefulWatermark watermark = statefulElement.asWatermark();
output.emitWatermark(new Watermark(watermark.getTimestamp()));
} else {
long emittedRecords =
outputBridge.collect(
output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType);
if (taskNumRecordsOut != null) {
taskNumRecordsOut.inc(emittedRecords);
}
}
} finally {
statefulElement.close();
}
} else {
// BLOCKED or FINISHED - should not happen during barrier drain since
// processBarrier is synchronous. Spin to retry.
Thread.yield();
}
}
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
// TODO: implement it
task.snapshotState(0);
task.snapshotState(context.getCheckpointId());
super.snapshotState(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.github.zhztheplayer.velox4j.stateful.StatefulWatermark;
import io.github.zhztheplayer.velox4j.type.RowType;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
Expand All @@ -43,6 +45,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand All @@ -65,6 +68,8 @@ public class GlutenSourceFunction<OUT> extends RichParallelSourceFunction<OUT>
private SerialTask task;
private SourceTaskMetrics taskMetrics;
private final Class<OUT> outClass;
private transient ListState<String> checkpointState;
private transient String[] restoredCheckpointRecords = new String[0];

public GlutenSourceFunction(
StatefulPlanNode planNode,
Expand Down Expand Up @@ -205,15 +210,29 @@ public void close() throws Exception {

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// TODO: implement it
this.task.snapshotState(0);
checkpointState.clear();
String[] checkpointRecords = this.task.snapshotState(context.getCheckpointId());
for (String checkpointRecord : checkpointRecords) {
checkpointState.add(checkpointRecord);
}
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
checkpointState =
context
.getOperatorStateStore()
.getListState(
new ListStateDescriptor<>("gluten-native-source-checkpoint", String.class));
if (context.isRestored()) {
List<String> records = new ArrayList<>();
for (String checkpointRecord : checkpointState.get()) {
records.add(checkpointRecord);
}
restoredCheckpointRecords = records.toArray(new String[0]);
}
initSession();
// TODO: implement it
this.task.initializeState(0, null);
this.task.initializeState(0, null, restoredCheckpointRecords);
}

public String[] notifyCheckpointComplete(long checkpointId) throws Exception {
Expand Down
Loading
Loading