[FLINK] Fix Nexmark: support multi-parallelism via ParallelSplit#36
Conversation
a4f9605 to
1a52f3a
Compare
lgbo-ustc
left a comment
There was a problem hiding this comment.
One concern about the new testNexmarkConnectorSplitWithSubtasks: it looks like it is intended to verify the parallel split round-trip, but subtaskSplits is not currently serialized by velox4j Serde because auto-detect getters/fields are disabled and NexmarkConnectorSplit only exposes config via @JsonGetter. As a result, this test can pass while the deserialized split has subtaskSplits == null.
If subtaskSplits is intended to survive Java/C++ serde, please add an explicit getter and assert getSubtaskSplit() still works after round-trip. If it is intentionally Java-runtime-only and should not be sent to C++, please make the test/name/assertions explicit about that behavior so it does not look like the subtask list is being validated.
lgbo-ustc
left a comment
There was a problem hiding this comment.
Another concern is that NexmarkConnectorSplit#getSubtaskSplit(int index, int parallelism) ignores the parallelism argument and directly indexes subtaskSplits. This assumes the planned split count always equals the runtime parallelism.
If runtime parallelism is larger than subtaskSplits.size(), this fails with IndexOutOfBoundsException. If runtime parallelism is smaller, some generated subtask ranges are silently unused, which can drop events. It would be safer to validate subtaskSplits != null and subtaskSplits.size() == parallelism, then fail with a clear exception if they do not match.
| import com.fasterxml.jackson.annotation.JsonCreator; | ||
| import com.fasterxml.jackson.annotation.JsonGetter; | ||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
|
|
There was a problem hiding this comment.
Naming question: GeneratorConfig looks generic, but this class is Nexmark-specific: it contains NexmarkConfiguration and Nexmark event range fields such as firstEventId, maxEvents, and firstEventNumber. The corresponding C++ type also lives under the nexmark connector.
Could we make this Nexmark-specific in Java as well, e.g. NexmarkGeneratorConfig or a nexmark-specific package/name? If keeping the C++ name is intentional for serde parity, a short class comment saying this is Nexmark-only would help.
There was a problem hiding this comment.
Done, using NexmarkGeneratorConfig.
9891619 to
cf51f45
Compare
@lgbo-ustc
See |
@lgbo-ustc Fixed. @Override
public ConnectorSplit getSubtaskSplit(int index, int parallelism) {
Objects.requireNonNull(subtaskSplits, "subtaskSplits is null");
if (parallelism != subtaskSplits.size()) {
throw new IllegalStateException(
String.format(
"Runtime parallelism (%d) does not match planned subtask count (%d). "
+ "Nexmark multi-parallelism requires the same parallelism at plan and runtime.",
parallelism, subtaskSplits.size()));
}
if (index < 0 || index >= subtaskSplits.size()) {
throw new IndexOutOfBoundsException(
"Subtask index " + index + " out of range [0, " + subtaskSplits.size() + ")");
}
return subtaskSplits.get(index);
} |
cf51f45 to
b235914
Compare
Summary
Introduce an abstraction for "one logical split that expands into per-subtask splits at runtime", so the planner can express a disjoint event range per subtask.
ParallelSplit(subclass ofConnectorSplit) declaringgetSubtaskSplit(int index, int parallelism).GeneratorConfigandNexmarkConfigurationPOJOs mirroring the C++ side.GeneratorConfigserializesmaxEventsOrZeroas JSON keymaxEventsto match the C++ serde.NexmarkConfigurationannotatesisRateLimited()/isUseWallclockEventTime()with explicit@JsonGetternames — otherwise Jackson strips theisprefix and the C++ deserializer silently drops both fields.NexmarkConnectorSplitto extendParallelSplit, replacingint numRowswithGeneratorConfig config+List<NexmarkConnectorSplit> subtaskSplits.fix: gluten#12298
depends: bigo-sg/velox#45
related: apache/gluten#12304
Test
NexmarkConnectorSplitSerdeTest: JSON serde round-trip forNexmarkConfiguration/GeneratorConfigandISerializableJava↔C++ round-trip forNexmarkConnectorSplit(single and parallel with two subtasks).parallelism.default = 2, nexmarkevents.num = 10000, tps = 2000, q0): bid input rows 9200 (no duplicates),dateTimespan ~5s, subtask splitsfirstEventId=1, maxEvents=5000andfirstEventId=5001, maxEvents=5000.