Skip to content

[GLUTEN-12306][FLINK][VL] Route print sink options to Velox via reflection (no file path)#12320

Open
ggjh-159 wants to merge 1 commit into
apache:mainfrom
ggjh-159:fix/print-sink-multi-parallelism
Open

[GLUTEN-12306][FLINK][VL] Route print sink options to Velox via reflection (no file path)#12320
ggjh-159 wants to merge 1 commit into
apache:mainfrom
ggjh-159:fix/print-sink-multi-parallelism

Conversation

@ggjh-159

Copy link
Copy Markdown

fix: #12306
related prs: bigo-sg/velox#49 bigo-sg/velox4j#39

What changes are proposed in this pull request?

This PR consumes the companion velox PR (which now writes to std::cout / std::cerr):

  • Drop the file-path resolution and the Configuration / CoreOptions / ConfigConstants imports.
  • PrintSinkFactory.extractPrintOptions reflects into the RowDataPrintFunctionPrintSinkOutputWriter to read the user-supplied sinkIdentifier (print-identifier) and target (true = stderr) fields, and passes them through to PrintTableHandle(tableName, inputColumns, printIdentifier, isStdErr).
  • The velox C++ side computes the Flink-style N> prefix from parallelism / task_index session properties and serializes concurrent subtask writes with a process-wide mutex.

How was this patch tested?

  • UTs
  • Manual run on a standalone Flink cluster with parallelism.default = 2, nexmark events.num = 10000, tps = 2000, query q0: every bid row reaches taskmanager.out, each line carries the 1> / 2> subtask prefix, no truncation across subtasks.

@ggjh-159 ggjh-159 force-pushed the fix/print-sink-multi-parallelism branch from 914fab7 to bc074f6 Compare June 18, 2026 06:12
@github-actions github-actions Bot added the INFRA label Jun 18, 2026
Field idField = writer.getClass().getDeclaredField("sinkIdentifier");
idField.setAccessible(true);
Field stdErrField = writer.getClass().getDeclaredField("target");
stdErrField.setAccessible(true);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ReflectUtils.getObjectField() should be simpler

// Flink 1.19.x field names: sinkIdentifier (print-identifier), target (standard-error, true =
// stderr).
// Package-private for direct unit testing.
static String[] extractPrintOptions(Transformation<RowData> transformation) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the readability of String[] is bad, may we can use class PringOptions instead?

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class PrintSinkFactoryTest {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GlutenStreamingTestBase.runAndCheck() still read taskmanager.out, please check whether it should be fixed together?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FLINK][VL] PrintSink Crashes Under Multi-Parallelism

2 participants