diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml index cf607f5f304..45b4fd40f90 100644 --- a/.github/workflows/flink.yml +++ b/.github/workflows/flink.yml @@ -70,7 +70,7 @@ jobs: export fmt_SOURCE=BUNDLED export folly_SOURCE=BUNDLED git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git - cd velox4j && git reset --hard 115edf79d265a61c30d45dfcc6ce932ad92378ca + cd velox4j && git reset --hard f6ea2d7f9c79a3476827dd7fd4c16a2b67a17cc3 git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch $GITHUB_WORKSPACE/build/mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true cd .. diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java new file mode 100644 index 00000000000..1fe982ba457 --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCloseables.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.operators; + +final class GlutenCloseables { + private GlutenCloseables() {} + + static void runWithCleanup(ThrowingRunnable action, ThrowingRunnable... cleanups) + throws Exception { + Exception failure = null; + try { + action.run(); + } catch (Exception e) { + failure = e; + } + + for (ThrowingRunnable cleanup : cleanups) { + try { + cleanup.run(); + } catch (Exception e) { + if (failure == null) { + failure = e; + } else { + failure.addSuppressed(e); + } + } + } + + if (failure != null) { + throw failure; + } + } + + @FunctionalInterface + interface ThrowingRunnable { + void run() throws Exception; + } +} diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java index 09b059f3bc2..87ba296d576 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java @@ -39,6 +39,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -49,7 +50,7 @@ /** Calculate operator in gluten, which will call Velox to run. */ public class GlutenOneInputOperator extends TableStreamOperator - implements OneInputStreamOperator, GlutenOperator { + implements OneInputStreamOperator, BoundedOneInput, GlutenOperator { private final StatefulPlanNode glutenPlan; private final String id; @@ -195,24 +196,46 @@ private void drainTaskOutput() { while (true) { UpIterator.State state = task.advance(); if (state == UpIterator.State.AVAILABLE) { - final StatefulElement statefulElement = task.statefulGet(); - try { - if (statefulElement.isWatermark()) { - StatefulWatermark watermark = statefulElement.asWatermark(); - output.emitWatermark(new Watermark(watermark.getTimestamp())); - } else { - outputBridge.collect( - output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType); - } - } finally { - statefulElement.close(); - } + processAvailableElement(); } else { break; } } } + private void processAvailableElement() { + final StatefulElement statefulElement = task.statefulGet(); + try { + if (statefulElement.isWatermark()) { + StatefulWatermark watermark = statefulElement.asWatermark(); + output.emitWatermark(new Watermark(watermark.getTimestamp())); + } else { + outputBridge.collect( + output, statefulElement.asRecord(), sessionResource.getAllocator(), outputType); + } + } finally { + statefulElement.close(); + } + } + + private void finishTask() { + while (true) { + UpIterator.State state = task.advance(); + switch (state) { + case AVAILABLE: + processAvailableElement(); + break; + case BLOCKED: + task.waitFor(); + break; + case FINISHED: + return; + default: + throw new IllegalStateException("Unknown Velox task state: " + state); + } + } + } + public GlutenOneInputOperator cloneWithInputOutputClasses( StatefulPlanNode plan, Class newInClass, Class newOutClass) { return new GlutenOneInputOperator<>( @@ -237,19 +260,37 @@ public void processWatermark2(Watermark mark) throws Exception { } @Override - public void close() throws Exception { - if (task != null) { - task.close(); - } + public void endInput() throws Exception { if (inputQueue != null) { inputQueue.noMoreInput(); - inputQueue.close(); } - if (sessionResource != null) { - sessionResource.close(); + if (task != null) { + finishTask(); } } + @Override + public void close() throws Exception { + GlutenCloseables.runWithCleanup( + () -> {}, + () -> { + if (inputQueue != null) { + inputQueue.close(); + } + }, + () -> { + if (task != null) { + task.close(); + } + }, + () -> { + if (sessionResource != null) { + sessionResource.close(); + } + }, + super::close); + } + @Override public StatefulPlanNode getPlanNode() { return glutenPlan; diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java index 3f73ad4bbd8..824a442c17f 100644 --- a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperator.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -50,7 +51,7 @@ * instead of flink RowData. */ public class GlutenTwoInputOperator extends AbstractStreamOperator - implements TwoInputStreamOperator, GlutenOperator { + implements TwoInputStreamOperator, BoundedMultiInput, GlutenOperator { private static final Logger LOG = LoggerFactory.getLogger(GlutenTwoInputOperator.class); @@ -73,6 +74,8 @@ public class GlutenTwoInputOperator extends AbstractStreamOperator private VectorOutputBridge outputBridge; private String description; private final GlutenMailboxHolder mailboxHolder = new GlutenMailboxHolder(); + private boolean leftInputEnded; + private boolean rightInputEnded; public GlutenTwoInputOperator( StatefulPlanNode plan, @@ -175,24 +178,46 @@ private void drainTaskOutput() { while (true) { UpIterator.State state = task.advance(); if (state == UpIterator.State.AVAILABLE) { - final StatefulElement element = task.statefulGet(); - try { - if (element.isWatermark()) { - StatefulWatermark watermark = element.asWatermark(); - output.emitWatermark(new Watermark(watermark.getTimestamp())); - } else { - outputBridge.collect( - output, element.asRecord(), sessionResource.getAllocator(), outputType); - } - } finally { - element.close(); - } + processAvailableElement(); } else { break; } } } + private void processAvailableElement() { + final StatefulElement element = task.statefulGet(); + try { + if (element.isWatermark()) { + StatefulWatermark watermark = element.asWatermark(); + output.emitWatermark(new Watermark(watermark.getTimestamp())); + } else { + outputBridge.collect( + output, element.asRecord(), sessionResource.getAllocator(), outputType); + } + } finally { + element.close(); + } + } + + private void finishTask() { + while (true) { + UpIterator.State state = task.advance(); + switch (state) { + case AVAILABLE: + processAvailableElement(); + break; + case BLOCKED: + task.waitFor(); + break; + case FINISHED: + return; + default: + throw new IllegalStateException("Unknown Velox task state: " + state); + } + } + } + @Override public void processWatermark(Watermark mark) throws Exception { task.notifyWatermark(mark.getTimestamp()); @@ -212,21 +237,55 @@ public void processWatermark2(Watermark mark) throws Exception { } @Override - public void close() throws Exception { - if (leftInputQueue != null) { - leftInputQueue.close(); - } - if (rightInputQueue != null) { - rightInputQueue.close(); - } - if (task != null) { - task.close(); + public void endInput(int inputId) throws Exception { + switch (inputId) { + case 1: + leftInputEnded = true; + if (leftInputQueue != null) { + leftInputQueue.noMoreInput(); + } + break; + case 2: + rightInputEnded = true; + if (rightInputQueue != null) { + rightInputQueue.noMoreInput(); + } + break; + default: + throw new IllegalArgumentException("Unknown input id: " + inputId); } - if (sessionResource != null) { - sessionResource.close(); + if (leftInputEnded && rightInputEnded && task != null) { + finishTask(); } } + @Override + public void close() throws Exception { + GlutenCloseables.runWithCleanup( + () -> {}, + () -> { + if (leftInputQueue != null) { + leftInputQueue.close(); + } + }, + () -> { + if (rightInputQueue != null) { + rightInputQueue.close(); + } + }, + () -> { + if (task != null) { + task.close(); + } + }, + () -> { + if (sessionResource != null) { + sessionResource.close(); + } + }, + super::close); + } + @Override public StatefulPlanNode getPlanNode() { return glutenPlan; diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml index 03bda070346..b33a0b2c232 100644 --- a/gluten-flink/ut/pom.xml +++ b/gluten-flink/ut/pom.xml @@ -194,6 +194,36 @@ ${flink.version} test + + org.apache.flink + flink-orc + ${flink.version} + test + + + org.apache.orc + orc-core + 1.9.2 + test + + + org.apache.hive + hive-storage-api + 2.8.1 + test + + + com.google.protobuf + protobuf-java + 3.25.5 + test + + + org.apache.hadoop + hadoop-client + 2.7.4 + test + diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperatorCloseTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperatorCloseTest.java new file mode 100644 index 00000000000..d1e59ba5365 --- /dev/null +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperatorCloseTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.operators; + +import io.github.zhztheplayer.velox4j.connector.ExternalStreams; +import io.github.zhztheplayer.velox4j.data.RowVector; +import io.github.zhztheplayer.velox4j.iterator.UpIterator; +import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; +import io.github.zhztheplayer.velox4j.query.SerialTask; +import io.github.zhztheplayer.velox4j.stateful.StatefulElement; +import io.github.zhztheplayer.velox4j.type.BigIntType; +import io.github.zhztheplayer.velox4j.type.RowType; + +import org.apache.flink.table.data.RowData; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public class GlutenOneInputOperatorCloseTest { + private static final RowType ROW_TYPE = new RowType(List.of("c0"), List.of(new BigIntType())); + + @Test + public void testEndInputFinishesTask() throws Exception { + List calls = new ArrayList<>(); + GlutenOneInputOperator operator = newOperator(); + FakeBlockingQueue inputQueue = new FakeBlockingQueue(1, calls); + FakeSerialTask task = new FakeSerialTask(calls); + setField(operator, "inputQueue", inputQueue); + setField(operator, "task", task); + + operator.endInput(); + + assertThat(calls).containsExactly("input.noMoreInput", "task.advance"); + } + + @Test + public void testCloseOnlyCleansResources() throws Exception { + List calls = new ArrayList<>(); + GlutenOneInputOperator operator = newOperator(); + FakeBlockingQueue inputQueue = new FakeBlockingQueue(1, calls); + FakeSerialTask task = new FakeSerialTask(calls); + setField(operator, "inputQueue", inputQueue); + setField(operator, "task", task); + + operator.close(); + + assertThat(calls).containsExactly("input.close", "task.close"); + } + + private static GlutenOneInputOperator newOperator() { + StatefulPlanNode plan = new StatefulPlanNode("project", null); + return new GlutenOneInputOperator<>( + plan, "input", ROW_TYPE, Map.of("project", ROW_TYPE), RowData.class, RowData.class); + } + + private static void setField(Object target, String name, Object value) throws Exception { + Field field = GlutenOneInputOperator.class.getDeclaredField(name); + field.setAccessible(true); + field.set(target, value); + } + + private static class FakeBlockingQueue extends ExternalStreams.BlockingQueue { + private final List calls; + + private FakeBlockingQueue(long id, List calls) { + super(id); + this.calls = calls; + } + + @Override + public void put(RowVector vector) { + throw new UnsupportedOperationException(); + } + + @Override + public void noMoreInput() { + calls.add("input.noMoreInput"); + } + + @Override + public void close() { + calls.add("input.close"); + } + } + + private static class FakeSerialTask extends SerialTask { + private final List calls; + + private FakeSerialTask(List calls) { + super(null, 2); + this.calls = calls; + } + + @Override + public UpIterator.State advance() { + calls.add("task.advance"); + return UpIterator.State.FINISHED; + } + + @Override + public void waitFor() { + throw new UnsupportedOperationException(); + } + + @Override + public StatefulElement statefulGet() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + calls.add("task.close"); + } + } +} diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperatorCloseTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperatorCloseTest.java new file mode 100644 index 00000000000..167473b5cb2 --- /dev/null +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/operators/GlutenTwoInputOperatorCloseTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.table.runtime.operators; + +import io.github.zhztheplayer.velox4j.connector.ExternalStreams; +import io.github.zhztheplayer.velox4j.data.RowVector; +import io.github.zhztheplayer.velox4j.iterator.UpIterator; +import io.github.zhztheplayer.velox4j.plan.StatefulPlanNode; +import io.github.zhztheplayer.velox4j.query.SerialTask; +import io.github.zhztheplayer.velox4j.stateful.StatefulElement; +import io.github.zhztheplayer.velox4j.type.BigIntType; +import io.github.zhztheplayer.velox4j.type.RowType; + +import org.apache.flink.table.data.RowData; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class GlutenTwoInputOperatorCloseTest { + private static final RowType ROW_TYPE = new RowType(List.of("c0"), List.of(new BigIntType())); + + @Test + public void testEndInputFinishesAfterBothInputsEnd() throws Exception { + List calls = new ArrayList<>(); + GlutenTwoInputOperator operator = newOperator(); + FakeBlockingQueue leftQueue = new FakeBlockingQueue(1, "left", calls); + FakeBlockingQueue rightQueue = new FakeBlockingQueue(2, "right", calls); + FakeSerialTask task = new FakeSerialTask(calls, false); + setField(operator, "leftInputQueue", leftQueue); + setField(operator, "rightInputQueue", rightQueue); + setField(operator, "task", task); + + operator.endInput(1); + + assertThat(calls).containsExactly("left.noMoreInput"); + + operator.endInput(2); + + assertThat(calls).containsExactly("left.noMoreInput", "right.noMoreInput", "task.advance"); + } + + @Test + public void testCloseOnlyCleansResources() throws Exception { + List calls = new ArrayList<>(); + GlutenTwoInputOperator operator = newOperator(); + FakeBlockingQueue leftQueue = new FakeBlockingQueue(1, "left", calls); + FakeBlockingQueue rightQueue = new FakeBlockingQueue(2, "right", calls); + FakeSerialTask task = new FakeSerialTask(calls, false); + setField(operator, "leftInputQueue", leftQueue); + setField(operator, "rightInputQueue", rightQueue); + setField(operator, "task", task); + + operator.close(); + + assertThat(calls).containsExactly("left.close", "right.close", "task.close"); + } + + @Test + public void testCloseCleansResourcesWhenCloseFails() throws Exception { + List calls = new ArrayList<>(); + GlutenTwoInputOperator operator = newOperator(); + FakeBlockingQueue leftQueue = new FakeBlockingQueue(1, "left", calls, true); + FakeBlockingQueue rightQueue = new FakeBlockingQueue(2, "right", calls); + FakeSerialTask task = new FakeSerialTask(calls, false); + setField(operator, "leftInputQueue", leftQueue); + setField(operator, "rightInputQueue", rightQueue); + setField(operator, "task", task); + + assertThatThrownBy(operator::close) + .isInstanceOf(RuntimeException.class) + .hasMessage("left.close"); + + assertThat(calls).containsExactly("left.close", "right.close", "task.close"); + } + + private static GlutenTwoInputOperator newOperator() { + StatefulPlanNode plan = new StatefulPlanNode("join", null); + return new GlutenTwoInputOperator<>( + plan, + "left", + "right", + ROW_TYPE, + ROW_TYPE, + Map.of("join", ROW_TYPE), + RowData.class, + RowData.class); + } + + private static void setField(Object target, String name, Object value) throws Exception { + Field field = GlutenTwoInputOperator.class.getDeclaredField(name); + field.setAccessible(true); + field.set(target, value); + } + + private static class FakeBlockingQueue extends ExternalStreams.BlockingQueue { + private final String name; + private final List calls; + private final boolean failClose; + + private FakeBlockingQueue(long id, String name, List calls) { + this(id, name, calls, false); + } + + private FakeBlockingQueue(long id, String name, List calls, boolean failClose) { + super(id); + this.name = name; + this.calls = calls; + this.failClose = failClose; + } + + @Override + public void put(RowVector vector) { + throw new UnsupportedOperationException(); + } + + @Override + public void noMoreInput() { + calls.add(name + ".noMoreInput"); + } + + @Override + public void close() { + calls.add(name + ".close"); + if (failClose) { + throw new RuntimeException(name + ".close"); + } + } + } + + private static class FakeSerialTask extends SerialTask { + private final List calls; + private final boolean failAdvance; + + private FakeSerialTask(List calls, boolean failAdvance) { + super(null, 3); + this.calls = calls; + this.failAdvance = failAdvance; + } + + @Override + public UpIterator.State advance() { + calls.add("task.advance"); + if (failAdvance) { + throw new RuntimeException("finish"); + } + return UpIterator.State.FINISHED; + } + + @Override + public void waitFor() { + throw new UnsupportedOperationException(); + } + + @Override + public StatefulElement statefulGet() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + calls.add("task.close"); + } + } +} diff --git a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java index 476c4cba4d0..cffb8b1809b 100644 --- a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java +++ b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java @@ -27,6 +27,9 @@ import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; import com.salesforce.kafka.test.listeners.PlainListener; +import org.apache.hadoop.conf.Configuration; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -40,6 +43,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.attribute.FileTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -70,7 +74,7 @@ public class NexmarkTest { } }; - private static final int KAFKA_PORT = 9092; + private static final int KAFKA_PORT = Integer.getInteger("gluten.flink.ut.kafka.port", 9092); private static String topicName = "nexmark"; @RegisterExtension @@ -82,7 +86,7 @@ public class NexmarkTest { private static final Map KAFKA_VARIABLES = new HashMap<>() { { - put("BOOTSTRAP_SERVERS", "localhost:9092"); + put("BOOTSTRAP_SERVERS", "localhost:" + KAFKA_PORT); put("NEXMARK_TABLE", "kafka"); } }; @@ -166,6 +170,7 @@ private static void clearEnvironment(StreamTableEnvironment tEnv) { String sql = String.format("drop table if exists %s", tableName); tEnv.executeSql(sql); } + tEnv.executeSql("drop table if exists nexmark_q10_orc"); for (String view : VIEWS) { String sql = String.format("drop view if exists %s", view); tEnv.executeSql(sql); @@ -179,6 +184,7 @@ private static void clearEnvironment(StreamTableEnvironment tEnv) { private void executeQuery(StreamTableEnvironment tEnv, String queryFileName, boolean kafkaSource) throws ExecutionException, InterruptedException, TimeoutException { String queryContent = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/" + queryFileName); + long queryStartMillis = System.currentTimeMillis(); String[] sqlStatements = queryContent.split(";"); assertThat(sqlStatements.length).isGreaterThanOrEqualTo(2); @@ -200,11 +206,53 @@ private void executeQuery(StreamTableEnvironment tEnv, String queryFileName, boo assertThat(checkJobRunningStatus(insertResult, 30000) == true); } else { waitForJobCompletion(insertResult, 30000); + if ("q10_orc.sql".equals(queryFileName)) { + verifyQ10OrcOutput(queryStartMillis); + } } } assertTrue(sqlStatements[sqlStatements.length - 1].trim().isEmpty()); } + private void verifyQ10OrcOutput(long queryStartMillis) { + Path outputDir = Paths.get("/tmp/data/output/bid"); + assertTrue("Q10 ORC output directory should exist", Files.exists(outputDir)); + try { + List outputFiles; + try (java.util.stream.Stream files = Files.walk(outputDir)) { + outputFiles = + files + .filter(Files::isRegularFile) + .filter(path -> isModifiedAfter(path, queryStartMillis)) + .sorted() + .collect(Collectors.toList()); + } + assertThat(outputFiles).isNotEmpty(); + long rowCount = 0; + for (Path outputFile : outputFiles) { + Reader reader = + OrcFile.createReader( + new org.apache.hadoop.fs.Path(outputFile.toUri()), + OrcFile.readerOptions(new Configuration())); + assertThat(reader.getSchema().getFieldNames()) + .containsExactly("auction", "bidder", "price", "dateTime", "extra"); + rowCount += reader.getNumberOfRows(); + } + assertThat(rowCount).isGreaterThan(0); + } catch (IOException e) { + throw new RuntimeException("Failed to read back Q10 ORC output", e); + } + } + + private boolean isModifiedAfter(Path path, long queryStartMillis) { + try { + FileTime lastModifiedTime = Files.getLastModifiedTime(path); + return lastModifiedTime.toMillis() >= queryStartMillis; + } catch (IOException e) { + throw new RuntimeException("Failed to inspect output file " + path, e); + } + } + private void waitForJobCompletion(TableResult result, long timeoutMs) throws InterruptedException, ExecutionException, TimeoutException { assertTrue(result.getJobClient().isPresent()); @@ -242,6 +290,10 @@ private List getQueries() { } } + String query = System.getProperty("gluten.flink.ut.nexmark.query"); + if (query != null && !query.isEmpty()) { + return queryFiles.stream().filter(query::equals).sorted().collect(Collectors.toList()); + } return queryFiles.stream().sorted().collect(Collectors.toList()); } catch (URISyntaxException | IOException e) { diff --git a/gluten-flink/ut/src/test/resources/nexmark/q10_orc.sql b/gluten-flink/ut/src/test/resources/nexmark/q10_orc.sql new file mode 100644 index 00000000000..6b00f387868 --- /dev/null +++ b/gluten-flink/ut/src/test/resources/nexmark/q10_orc.sql @@ -0,0 +1,23 @@ +CREATE TABLE nexmark_q10_orc ( + auction BIGINT, + bidder BIGINT, + price BIGINT, + `dateTime` TIMESTAMP(3), + extra VARCHAR, + dt STRING, + hm STRING +) PARTITIONED BY (dt, hm) WITH ( + 'connector' = 'filesystem', + 'path' = 'file:///tmp/data/output/bid/', + 'format' = 'orc', + 'sink.partition-commit.trigger' = 'partition-time', + 'sink.partition-commit.delay' = '1 min', + 'sink.partition-commit.policy.kind' = 'success-file', + 'partition.time-extractor.timestamp-pattern' = '$dt $hm:00', + 'sink.rolling-policy.rollover-interval' = '1min', + 'sink.rolling-policy.check-interval' = '1min' +); + +INSERT INTO nexmark_q10_orc +SELECT auction, bidder, price, `dateTime`, extra, DATE_FORMAT(`dateTime`, 'yyyy-MM-dd'), DATE_FORMAT(`dateTime`, 'HH:mm') +FROM bid;