Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
export fmt_SOURCE=BUNDLED
export folly_SOURCE=BUNDLED
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca
cd velox4j && git reset --hard 1ee4116de355cb618acf4c45e857ced16acb990f
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
$GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
Expand Down
2 changes: 1 addition & 1 deletion gluten-flink/docs/Flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ As some features have not been committed to upstream, you have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca
git reset --hard f6ea2d7f9c79a3476827dd7fd4c16a2b67a17cc3
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
```
**Get gluten**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,30 @@
*/
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.gluten.rexnode.RexConversionContext;
import org.apache.gluten.rexnode.RexNodeConverter;
import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
import org.apache.gluten.velox.VeloxSourceSinkFactory;

import io.github.zhztheplayer.velox4j.expression.TypedExpr;
import io.github.zhztheplayer.velox4j.plan.EmptyNode;
import io.github.zhztheplayer.velox4j.plan.ProjectNode;

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
Expand All @@ -34,13 +48,20 @@
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.RowType.RowField;
import org.apache.flink.table.types.logical.TimestampType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.calcite.rex.RexNode;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Stream {@link ExecNode} to read data from an external source defined by a {@link
Expand Down Expand Up @@ -98,6 +119,51 @@ public Transformation<RowData> createInputFormatTransformation(
return env.createInput(inputFormat, outputTypeInfo).name(operatorName).getTransformation();
}

private ProjectNode translateWatermarkExpr(
LogicalType inputType, LogicalType outputType, RexNode watermarkExpr) {
List<String> inNames = Utils.getNamesFromRowType(inputType);
RexConversionContext conversionContext = new RexConversionContext(inNames);
TypedExpr watermarkExprs = RexNodeConverter.toTypedExpr(watermarkExpr, conversionContext);
io.github.zhztheplayer.velox4j.type.RowType outputRowType =
(io.github.zhztheplayer.velox4j.type.RowType) LogicalTypeConverter.toVLType(outputType);
return new ProjectNode(
PlanNodeIdGenerator.newId(),
List.of(new EmptyNode(outputRowType)),
List.of("TIMESTAMP"),
List.of(watermarkExprs));
}

private Optional<io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec>
getWatermarkPushDownSpec(Transformation<RowData> transformation, ExecNodeConfig config) {
io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec watermarkPushDownSpecNode = null;
if (transformation instanceof SourceTransformation) {
List<SourceAbilitySpec> sourceAbilities = getTableSourceSpec().getSourceAbilities();
if (sourceAbilities != null) {
for (SourceAbilitySpec sourceAbility : sourceAbilities) {
if (sourceAbility instanceof WatermarkPushDownSpec) {
final long idleTimeout =
config.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT).toMillis();
final long watermarkInterval =
config.get(PipelineOptions.AUTO_WATERMARK_INTERVAL).toMillis();
WatermarkPushDownSpec watermarkPushDownSpec = (WatermarkPushDownSpec) sourceAbility;
RowField watermarkField = new RowField("watermark", new TimestampType(3));
ProjectNode project =
translateWatermarkExpr(
getOutputType(),
new RowType(List.of(watermarkField)),
watermarkPushDownSpec.getWatermarkExpr());
watermarkPushDownSpecNode =
new io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec(
project, idleTimeout, watermarkInterval, -1);
}
}
}
}
return watermarkPushDownSpecNode != null
? Optional.of(watermarkPushDownSpecNode)
: Optional.empty();
}

@Override
protected Transformation<RowData> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
Expand All @@ -106,14 +172,18 @@ protected Transformation<RowData> translateToPlanInternal(
getTableSourceSpec()
.getScanTableSource(
planner.getFlinkContext(), ShortcutUtils.unwrapTypeFactory(planner));
Transformation<RowData> sourceTransformation = super.translateToPlanInternal(planner, config);
Transformation<RowData> transformation = super.translateToPlanInternal(planner, config);
Optional<io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec> watermarkPushDownSpec =
getWatermarkPushDownSpec(transformation, config);
return VeloxSourceSinkFactory.buildSource(
sourceTransformation,
transformation,
Map.of(
ScanTableSource.class.getName(),
tableSource,
"checkpoint.enabled",
planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled()));
planner.getExecEnv().getCheckpointConfig().isCheckpointingEnabled(),
"watermarkPushDownSpec",
watermarkPushDownSpec));
// --- End Gluten-specific code changes ---
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context) {
&& params.get(1).getReturnType() instanceof BigIntType) {

Type bigIntType = new BigIntType();
TypedExpr castExpr = new CallTypedExpr(bigIntType, List.of(params.get(0)), "cast");
TypedExpr castExpr = new CallTypedExpr(bigIntType, List.of(params.get(0)), "unix_millis");

List<TypedExpr> newParams = List.of(castExpr, params.get(1));
return new CallTypedExpr(bigIntType, newParams, functionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@

import io.github.zhztheplayer.velox4j.connector.KafkaConnectorSplit;
import io.github.zhztheplayer.velox4j.connector.KafkaTableHandle;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
import io.github.zhztheplayer.velox4j.plan.TableScanNode;
import io.github.zhztheplayer.velox4j.plan.TableScanWithWatermarkNode;
import io.github.zhztheplayer.velox4j.plan.WatermarkPushDownSpec;
import io.github.zhztheplayer.velox4j.type.RowType;

import org.apache.flink.api.connector.source.Source;
Expand All @@ -41,6 +44,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;

Expand Down Expand Up @@ -68,6 +72,8 @@ public Transformation<RowData> buildVeloxSource(
ScanTableSource tableSource =
(ScanTableSource) parameters.get(ScanTableSource.class.getName());
boolean checkpointEnabled = (Boolean) parameters.get("checkpoint.enabled");
Optional<WatermarkPushDownSpec> watermarkPushDownSpec =
(Optional<WatermarkPushDownSpec>) parameters.get("watermarkPushDownSpec");
Class<?> tableSourceClazz =
Class.forName("org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource");
Properties properties =
Expand Down Expand Up @@ -112,7 +118,12 @@ public Transformation<RowData> buildVeloxSource(
Boolean.valueOf(kafkaTableParameters.getOrDefault("enable.auto.commit", "false")),
"latest",
List.of());
TableScanNode kafkaScan = new TableScanNode(planId, outputType, kafkaTableHandle, List.of());

PlanNode kafkaScan =
watermarkPushDownSpec.isPresent()
? new TableScanWithWatermarkNode(
planId, outputType, kafkaTableHandle, List.of(), watermarkPushDownSpec.get())
: new TableScanNode(planId, outputType, kafkaTableHandle, List.of());
GlutenStreamSource sourceOp =
new GlutenStreamSource(
new GlutenSourceFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -104,6 +105,45 @@ static void setup() {
tEnv = StreamTableEnvironment.create(env, settings);
}

@Test
void testNexmarkSourceSqlDoesNotPushDownWatermark() {
String createNexmarkSource = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/ddl_gen.sql");
createNexmarkSource = replaceVariables(createNexmarkSource, NEXMARK_VARIABLES);
try {
tEnv.executeSql(createNexmarkSource);
String explain = tEnv.explainSql("SELECT * FROM datagen");

assertThat(explain).contains("WatermarkAssigner");
List<String> tableSourceScanLines =
Arrays.stream(explain.split("\\R"))
.filter(line -> line.contains("TableSourceScan"))
.collect(Collectors.toList());
assertThat(tableSourceScanLines).isNotEmpty();
assertThat(tableSourceScanLines).noneMatch(line -> line.contains("watermark=["));
} finally {
tEnv.executeSql("drop table if exists datagen");
}
}

@Test
void testKafkaSourceSqlPushesDownWatermark() {
String createKafkaSource = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/ddl_kafka.sql");
createKafkaSource = replaceVariables(createKafkaSource, KAFKA_VARIABLES);
try {
tEnv.executeSql(createKafkaSource);
String explain = tEnv.explainSql("SELECT * FROM kafka");

List<String> tableSourceScanLines =
Arrays.stream(explain.split("\\R"))
.filter(line -> line.contains("TableSourceScan"))
.collect(Collectors.toList());
assertThat(tableSourceScanLines).isNotEmpty();
assertThat(tableSourceScanLines).anyMatch(line -> line.contains("watermark=["));
} finally {
tEnv.executeSql("drop table if exists kafka");
}
}

@Test
void testAllNexmarkSourceQueries()
throws ExecutionException, InterruptedException, TimeoutException {
Expand Down Expand Up @@ -174,6 +214,8 @@ private static void clearEnvironment(StreamTableEnvironment tEnv) {
String sql = String.format("drop function if exists %s", func);
tEnv.executeSql(sql);
}
tEnv.executeSql("drop table if exists datagen");
tEnv.executeSql("drop table if exists kafka");
}

private void executeQuery(StreamTableEnvironment tEnv, String queryFileName, boolean kafkaSource)
Expand Down
Loading
Loading