Backend
VL (Velox)
Bug description
The Velox-backed 'connector' = 'print' sink opens a fixed file
(log/taskmanager.out) directly from C++. Under multi-parallelism the second
subtask's open races with the first and crashes the TaskManager with
File exists, failing the job. The sink also diverges from Flink native in
where it writes and which options it honors.
Actual
With parallelism.default: 2, any query whose sink is 'connector' = 'print'
(e.g. nexmark q0) fails within ~5 seconds. The TaskManager dies with:
E20260616 21:02:08.319926 1643895 Exceptions.h:66]
Line: ${WORKSPACE}/velox/common/file/File.cpp:338,
Function: LocalWriteFile,
Expression: fd_ >= 0 (-1 vs 0)
Cannot open or create ${FLINK_HOME}/log/taskmanager.out.
Error: File exists, Source: RUNTIME, ErrorCode: INVALID_STATE
terminate called after throwing an instance of 'facebook::velox::VeloxRuntimeError'
Job state: FAILED. log/taskmanager.out: 0 bytes.
With parallelism.default: 1 the job runs but output diverges from Flink native
(no subtask prefix; written to a fixed taskmanager.out path instead of process
stdout/stderr; print-identifier / standard-error options ignored).
Expected
Match Flink native 'connector' = 'print' behavior
(flink-core/.../PrintSinkOutputWriter.java):
- Write to the TaskManager JVM's
System.out / System.err, not a file.
(flink-daemon.sh redirects stdout to log/taskmanager.out in standalone
mode; the sink itself should not open that file directly.)
- Stay correct under multi-parallelism (native sink relies on a single
synchronized PrintStream instance shared across subtasks).
- Honor
print-identifier and standard-error table options and apply the
same prefix logic the native sink uses.
Program Info
- JDK: OpenJDK 17
- Flink: 1.19.2, standalone — 1 JobManager + 1 TaskManager, 2 task slots
- Flink config:
parallelism.default: 2, taskmanager.numberOfTaskSlots: 2
- Query: nexmark
q0 with 'connector' = 'print' sink
Reproduction
Set parallelism.default: 2, run any query with a 'connector' = 'print' sink.
Gluten version
main branch
System information
- CMake Version: 3.30.4
- System: Linux-6.6.0
- Arch: aarch64
- C++ Compiler: /usr/lib64/ccache/c++
- C++ Compiler Version: 12.3.1
Backend
VL (Velox)
Bug description
The Velox-backed
'connector' = 'print'sink opens a fixed file(
log/taskmanager.out) directly from C++. Under multi-parallelism the secondsubtask's open races with the first and crashes the TaskManager with
File exists, failing the job. The sink also diverges from Flink native inwhere it writes and which options it honors.
Actual
With
parallelism.default: 2, any query whose sink is'connector' = 'print'(e.g. nexmark
q0) fails within ~5 seconds. The TaskManager dies with:Job state:
FAILED.log/taskmanager.out: 0 bytes.With
parallelism.default: 1the job runs but output diverges from Flink native(no subtask prefix; written to a fixed
taskmanager.outpath instead of processstdout/stderr;
print-identifier/standard-erroroptions ignored).Expected
Match Flink native
'connector' = 'print'behavior(
flink-core/.../PrintSinkOutputWriter.java):System.out/System.err, not a file.(
flink-daemon.shredirects stdout tolog/taskmanager.outin standalonemode; the sink itself should not open that file directly.)
synchronized
PrintStreaminstance shared across subtasks).print-identifierandstandard-errortable options and apply thesame prefix logic the native sink uses.
Program Info
parallelism.default: 2,taskmanager.numberOfTaskSlots: 2q0with'connector' = 'print'sinkReproduction
Set
parallelism.default: 2, run any query with a'connector' = 'print'sink.Gluten version
main branch
System information