Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
export fmt_SOURCE=BUNDLED
export folly_SOURCE=BUNDLED
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca
cd velox4j && git reset --hard 97fc1edafebd0f505e613d260f77f92f5252d048
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
$GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.gluten.util.ReflectUtils;

import io.github.zhztheplayer.velox4j.connector.NexmarkConnectorSplit;
import io.github.zhztheplayer.velox4j.connector.NexmarkGeneratorConfig;
import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode;
Expand All @@ -32,17 +33,25 @@
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.List;
import java.util.Map;

public class NexmarkSourceFactory implements VeloxSourceSinkFactory {
private static final Logger LOG = LoggerFactory.getLogger(NexmarkSourceFactory.class);
private static final ObjectMapper MAPPER =
new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
.setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE);

@SuppressWarnings("rawtypes")
@Override
Expand Down Expand Up @@ -76,9 +85,6 @@ public Transformation<RowData> buildVeloxSource(
Object generatorConfig =
ReflectUtils.getObjectField(
nexmarkSourceSplit.getClass(), nexmarkSourceSplit, "generatorConfig");
Long maxEvents =
(Long)
ReflectUtils.getObjectField(generatorConfig.getClass(), generatorConfig, "maxEvents");
PlanNode tableScan =
new TableScanNode(id, outputType, new NexmarkTableHandle("connector-nexmark"), List.of());
GlutenStreamSource sourceOp =
Expand All @@ -88,8 +94,7 @@ public Transformation<RowData> buildVeloxSource(
Map.of(id, outputType),
id,
new NexmarkConnectorSplit(
"connector-nexmark",
maxEvents > Integer.MAX_VALUE ? Integer.MAX_VALUE : maxEvents.intValue()),
"connector-nexmark", toVeloxNexmarkGeneratorConfig(generatorConfig)),
RowData.class));

return new LegacySourceTransformation<RowData>(
Expand All @@ -106,4 +111,13 @@ public Transformation<RowData> buildVeloxSink(
Transformation<RowData> transformation, Map<String, Object> parameters) {
throw new UnsupportedOperationException("Unimplemented method 'buildSink'");
}

private static NexmarkGeneratorConfig toVeloxNexmarkGeneratorConfig(Object javaConfig) {
try {
String json = MAPPER.writeValueAsString(javaConfig);
return MAPPER.readValue(json, NexmarkGeneratorConfig.class);
} catch (JsonProcessingException e) {
throw new TableException("Failed to convert nexmark NexmarkGeneratorConfig to velox4j", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.gluten.table.runtime.operators.GlutenOneInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
import org.apache.gluten.util.ReflectUtils;

import io.github.zhztheplayer.velox4j.connector.CommitStrategy;
import io.github.zhztheplayer.velox4j.connector.PrintTableHandle;
Expand All @@ -30,9 +31,6 @@
import io.github.zhztheplayer.velox4j.type.RowType;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
Expand Down Expand Up @@ -71,33 +69,54 @@ public Transformation<RowData> buildVeloxSource(
throw new FlinkRuntimeException("Unimplemented method 'buildSource'");
}

// Pulls print-identifier/standard-error from RowDataPrintFunction via reflection.
// Flink 1.19.x field names: sinkIdentifier (print-identifier), target (standard-error, true =
// stderr).
static PrintOptions extractPrintOptions(Transformation<RowData> transformation) {
SimpleOperatorFactory operatorFactory =
(SimpleOperatorFactory) ((LegacySinkTransformation) transformation).getOperatorFactory();
SinkOperator sinkOp = (SinkOperator) operatorFactory.getOperator();
Object rowDataPrintFn = sinkOp.getUserFunction();
Object writer =
ReflectUtils.getObjectField(rowDataPrintFn.getClass(), rowDataPrintFn, "writer");
String printIdentifier =
(String) ReflectUtils.getObjectField(writer.getClass(), writer, "sinkIdentifier");
boolean isStdErr = (boolean) ReflectUtils.getObjectField(writer.getClass(), writer, "target");
return new PrintOptions(printIdentifier == null ? "" : printIdentifier, isStdErr);
}

static final class PrintOptions {
private final String printIdentifier;
private final boolean stdErr;

PrintOptions(String printIdentifier, boolean stdErr) {
this.printIdentifier = printIdentifier;
this.stdErr = stdErr;
}

public String getPrintIdentifier() {
return printIdentifier;
}

public boolean isStdErr() {
return stdErr;
}
}

@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public Transformation buildVeloxSink(
Transformation<RowData> transformation, Map<String, Object> parameters) {
Transformation inputTrans = (Transformation) transformation.getInputs().get(0);
InternalTypeInfo inputTypeInfo = (InternalTypeInfo) inputTrans.getOutputType();
Configuration config = (Configuration) parameters.get(Configuration.class.getName());
String logDir = config.get(CoreOptions.FLINK_LOG_DIR);
String printPath;
if (logDir != null) {
printPath = String.format("file://%s/%s", logDir, "taskmanager.out");
} else {
String flinkHomeDir = System.getenv(ConfigConstants.ENV_FLINK_HOME_DIR);
if (flinkHomeDir == null) {
String flinkConfDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
if (flinkConfDir == null) {
throw new FlinkRuntimeException(
"Can not get flink home directory, please set FLINK_HOME.");
}
printPath = String.format("file://%s/../log/%s", flinkConfDir, "taskmanager.out");
} else {
printPath = String.format("file://%s/log/%s", flinkHomeDir, "taskmanager.out");
}
}

PrintOptions printOpts = extractPrintOptions(transformation);

RowType inputColumns = (RowType) LogicalTypeConverter.toVLType(inputTypeInfo.toLogicalType());
RowType ignore = new RowType(List.of("num"), List.of(new BigIntType()));
PrintTableHandle tableHandle = new PrintTableHandle("print-table", inputColumns, printPath);
PrintTableHandle tableHandle =
new PrintTableHandle(
"print-table", inputColumns, printOpts.getPrintIdentifier(), printOpts.isStdErr());
TableWriteNode tableWriteNode =
new TableWriteNode(
PlanNodeIdGenerator.newId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ public class VeloxConnectorConfig {
"connector-from-elements",
"connector-print");
private static final String keyTaskIndex = "task_index";
private static final String keyParallelism = "parallelism";
private static final String keyQueryUUId = "query_uuid";

public static ConnectorConfig getConfig(RuntimeContext context) {
Map<String, String> configMap = new HashMap<>();
TaskInfo taskInfo = context.getTaskInfo();
configMap.put(keyTaskIndex, String.valueOf(taskInfo.getIndexOfThisSubtask()));
configMap.put(keyParallelism, String.valueOf(taskInfo.getNumberOfParallelSubtasks()));
configMap.put(
keyQueryUUId,
UUID.nameUUIDFromBytes(context.getJobInfo().getJobId().toHexString().getBytes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.gluten.table.runtime.stream.common;

import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
Expand All @@ -29,6 +28,8 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.FlinkRuntimeException;

import com.sun.jna.Library;
import com.sun.jna.Native;
import org.junit.jupiter.api.BeforeAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,6 +48,23 @@ public class GlutenStreamingTestBase extends StreamingTestBase {
private static final String EXECUTION_PLAN_PREIFX = "== Physical Execution Plan ==";
private static final long timeoutMS = 30000;

// dup2 fd=1 onto a file: Velox print sink writes to std::cout, which bypasses System.setOut and
// goes straight to the process's fd=1.
private interface CLibrary extends Library {
int dup(int oldfd);

int dup2(int oldfd, int newfd);

int open(String path, int flags, int mode);

int close(int fd);
}

private static final CLibrary C_LIB = Native.load("c", CLibrary.class);
private static final int O_WRONLY = 1;
private static final int O_CREAT = 0100;
private static final int O_TRUNC = 01000;

@BeforeAll
public static void setup() throws Exception {
LOG.info("GlutenStreamingTestBase setup");
Expand Down Expand Up @@ -114,42 +132,57 @@ protected String explainExecutionPlan(String query) {

protected void runAndCheck(String query, List<String> expected) {
String printResultDirPath = System.getProperty("user.dir") + "/log/";
tEnv().getConfig().set(CoreOptions.FLINK_LOG_DIR, printResultDirPath);
String printResultFilePath = String.format("%s%s", printResultDirPath, "taskmanager.out");
new File(printResultDirPath).mkdirs();
String printResultFilePath = printResultDirPath + "taskmanager.out";
File printResultFile = new File(printResultFilePath);
boolean deleteResultFile = true;
if (printResultFile.exists()) {
deleteResultFile = printResultFile.delete();
printResultFile.delete();
}
Table table = tEnv().sqlQuery(query);
createPrintSinkTable("printT", table.getResolvedSchema());
String newQuery = String.format("insert into %s %s", "printT", query);
TableResult tableResult = tEnv().executeSql(newQuery);
assertTrue(tableResult.getJobClient().isPresent());

int savedStdout = C_LIB.dup(1);
int fileFd = C_LIB.open(printResultFilePath, O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fileFd < 0) {
C_LIB.close(savedStdout);
throw new FlinkRuntimeException("Failed to open " + printResultFilePath);
}
C_LIB.dup2(fileFd, 1);
try {
Table table = tEnv().sqlQuery(query);
createPrintSinkTable("printT", table.getResolvedSchema());
String newQuery = String.format("insert into %s %s", "printT", query);
TableResult tableResult = tEnv().executeSql(newQuery);
assertTrue(tableResult.getJobClient().isPresent());
JobClient jobClient = tableResult.getJobClient().get();
if (deleteResultFile) {
try {
long startTime = System.currentTimeMillis();
while (!printResultFile.exists()) {
if (System.currentTimeMillis() - startTime > timeoutMS) {
break;
}
Thread.sleep(10);
try {
long startTime = System.currentTimeMillis();
while (printResultFile.length() == 0) {
if (System.currentTimeMillis() - startTime > timeoutMS) {
break;
}
long fileSize = -1L;
startTime = System.currentTimeMillis();
while (printResultFile.length() > fileSize) {
if (System.currentTimeMillis() - startTime > timeoutMS) {
break;
}
fileSize = printResultFile.length();
Thread.sleep(3000);
Thread.sleep(10);
}
long fileSize = -1L;
startTime = System.currentTimeMillis();
while (printResultFile.length() > fileSize) {
if (System.currentTimeMillis() - startTime > timeoutMS) {
break;
}
} finally {
jobClient.cancel();
fileSize = printResultFile.length();
Thread.sleep(3000);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new FlinkRuntimeException(ie);
} finally {
jobClient.cancel();
}
} finally {
C_LIB.dup2(savedStdout, 1);
C_LIB.close(fileFd);
C_LIB.close(savedStdout);
}

try {
List<String> result = new ArrayList<>();
try (FileReader fr = new FileReader(printResultFile);
BufferedReader br = new BufferedReader(fr)) {
Expand Down
Loading
Loading