From 878ac0c4538c7613077b429cd388521718d83367 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Wed, 15 Oct 2025 11:17:53 +0800 Subject: [PATCH 1/3] [FLINK-38564][connector] FLIP-537: Enumerator Maintains Global Splits Distribution for Balanced assignment. --- .../api/connector/source/ReaderInfo.java | 44 +++++-- .../SupportSplitReassignmentOnRecovery.java | 28 ++++ .../source/mocks/MockSplitEnumerator.java | 88 ++++++++----- .../source/coordinator/SourceCoordinator.java | 11 +- .../coordinator/SourceCoordinatorContext.java | 6 +- .../source/event/ReaderRegistrationEvent.java | 46 ++++++- .../api/operators/SourceOperator.java | 20 ++- .../api/operators/SourceOperatorFactory.java | 10 +- .../SourceCoordinatorContextTest.java | 15 +++ .../coordinator/SourceCoordinatorTest.java | 121 ++++++++++++++++++ .../SourceCoordinatorTestBase.java | 17 +++ ...ceOperatorSplitWatermarkAlignmentTest.java | 6 +- .../api/operators/SourceOperatorTest.java | 66 +++++++--- .../operators/SourceOperatorTestContext.java | 8 +- .../SourceOperatorWatermarksTest.java | 3 +- .../source/TestingSourceOperator.java | 24 +++- pom.xml | 2 +- 17 files changed, 425 insertions(+), 90 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/connector/source/SupportSplitReassignmentOnRecovery.java diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java index 7a93c1c7549c3..bef8e51193b6b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java @@ -21,9 +21,16 @@ import org.apache.flink.annotation.Public; import java.io.Serializable; +import java.util.Collections; +import java.util.List; import java.util.Objects; -/** A container class hosting the information of a {@link SourceReader}. */ +/** + * A container class hosting the information of a {@link SourceReader}. + * + *

The {@code reportedSplitsOnRegistration} can only be provided when the source implements + * {@link SupportSplitReassignmentOnRecovery}. + */ @Public public final class ReaderInfo implements Serializable { @@ -31,10 +38,27 @@ public final class ReaderInfo implements Serializable { private final int subtaskId; private final String location; + private final List reportedSplitsOnRegistration; public ReaderInfo(int subtaskId, String location) { + this(subtaskId, location, Collections.emptyList()); + } + + ReaderInfo(int subtaskId, String location, List splits) { this.subtaskId = subtaskId; this.location = location; + this.reportedSplitsOnRegistration = splits; + } + + @SuppressWarnings("unchecked") + public static ReaderInfo createReaderInfo( + int subtaskId, String location, List splits) { + return new ReaderInfo(subtaskId, location, (List) splits); + } + + @SuppressWarnings("unchecked") + public List getReportedSplitsOnRegistration() { + return (List) reportedSplitsOnRegistration; } /** @@ -52,16 +76,18 @@ public String getLocation() { } @Override - public int hashCode() { - return Objects.hash(subtaskId, location); + public boolean equals(Object o) { + if (!(o instanceof ReaderInfo)) { + return false; + } + ReaderInfo that = (ReaderInfo) o; + return subtaskId == that.subtaskId + && Objects.equals(location, that.location) + && Objects.equals(reportedSplitsOnRegistration, that.reportedSplitsOnRegistration); } @Override - public boolean equals(Object obj) { - if (!(obj instanceof ReaderInfo)) { - return false; - } - ReaderInfo other = (ReaderInfo) obj; - return subtaskId == other.subtaskId && location.equals(other.location); + public int hashCode() { + return Objects.hash(subtaskId, location, reportedSplitsOnRegistration); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportSplitReassignmentOnRecovery.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportSplitReassignmentOnRecovery.java new file mode 100644 index 0000000000000..0f4453b149d9f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportSplitReassignmentOnRecovery.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A decorative interface {@link Source}. Implementing this interface indicates that the source + * operator needs to report splits to the enumerator and receive reassignment. + */ +@PublicEvolving +public interface SupportSplitReassignmentOnRecovery {} diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java index 7a3763bc5f978..bfa4c6834fea2 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java @@ -18,6 +18,7 @@ package org.apache.flink.api.connector.source.mocks; +import org.apache.flink.api.connector.source.ReaderInfo; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; @@ -28,20 +29,21 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.stream.Collectors; /** A mock {@link SplitEnumerator} for unit tests. */ public class MockSplitEnumerator implements SplitEnumerator>, SupportsBatchSnapshot { - private final SortedSet unassignedSplits; + // 扩成16个partition, unas + private final Map> pendingSplitAssignment; + private final Map globalSplitAssignment; private final SplitEnumeratorContext enumContext; private final List handledSourceEvent; private final List successfulCheckpoints; @@ -50,22 +52,24 @@ public class MockSplitEnumerator public MockSplitEnumerator(int numSplits, SplitEnumeratorContext enumContext) { this(new HashSet<>(), enumContext); + List unassignedSplits = new ArrayList<>(); for (int i = 0; i < numSplits; i++) { unassignedSplits.add(new MockSourceSplit(i)); } + calculateAndPutPendingAssignments(unassignedSplits); } public MockSplitEnumerator( Set unassignedSplits, SplitEnumeratorContext enumContext) { - this.unassignedSplits = - new TreeSet<>(Comparator.comparingInt(o -> Integer.parseInt(o.splitId()))); - this.unassignedSplits.addAll(unassignedSplits); + this.pendingSplitAssignment = new HashMap<>(); + this.globalSplitAssignment = new HashMap<>(); this.enumContext = enumContext; this.handledSourceEvent = new ArrayList<>(); this.successfulCheckpoints = new ArrayList<>(); this.started = false; this.closed = false; + calculateAndPutPendingAssignments(unassignedSplits); } @Override @@ -83,25 +87,36 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { @Override public void addSplitsBack(List splits, int subtaskId) { - unassignedSplits.addAll(splits); + // add back to same subtaskId. + putPendingAssignments(subtaskId, splits); } @Override public void addReader(int subtaskId) { - List assignment = new ArrayList<>(); - for (MockSourceSplit split : unassignedSplits) { - if (Integer.parseInt(split.splitId()) % enumContext.currentParallelism() == subtaskId) { - assignment.add(split); + ReaderInfo readerInfo = enumContext.registeredReaders().get(subtaskId); + List splitsOnRecovery = readerInfo.getReportedSplitsOnRegistration(); + + List redistributedSplits = new ArrayList<>(); + List addBackSplits = new ArrayList<>(); + for (MockSourceSplit split : splitsOnRecovery) { + if (!globalSplitAssignment.containsKey(split.splitId())) { + // if split not existed in globalSplitAssignment, mean that it's registered first + // time, can be redistibuted. + redistributedSplits.add(split); + } else if (!globalSplitAssignment.containsKey(split.splitId())) { + // if split already is assigned to other substaskId, just ignore it. Otherwise, + // addback to this subtaskId again. + addBackSplits.add(split); } } - enumContext.assignSplits( - new SplitsAssignment<>(Collections.singletonMap(subtaskId, assignment))); - unassignedSplits.removeAll(assignment); + calculateAndPutPendingAssignments(redistributedSplits); + putPendingAssignments(subtaskId, addBackSplits); + assignAllSplits(); } @Override public Set snapshotState(long checkpointId) { - return unassignedSplits; + return getUnassignedSplits(); } @Override @@ -114,11 +129,6 @@ public void close() throws IOException { this.closed = true; } - public void addNewSplits(List newSplits) { - unassignedSplits.addAll(newSplits); - assignAllSplits(); - } - // -------------------- public boolean started() { @@ -130,7 +140,9 @@ public boolean closed() { } public Set getUnassignedSplits() { - return unassignedSplits; + return pendingSplitAssignment.values().stream() + .flatMap(Set::stream) + .collect(Collectors.toSet()); } public List getHandledSourceEvent() { @@ -145,17 +157,27 @@ public List getSuccessfulCheckpoints() { private void assignAllSplits() { Map> assignment = new HashMap<>(); - unassignedSplits.forEach( - split -> { - int subtaskId = - Integer.parseInt(split.splitId()) % enumContext.currentParallelism(); - if (enumContext.registeredReaders().containsKey(subtaskId)) { - assignment - .computeIfAbsent(subtaskId, ignored -> new ArrayList<>()) - .add(split); - } - }); + for (Map.Entry> iter : pendingSplitAssignment.entrySet()) { + Integer subtaskId = iter.getKey(); + if (enumContext.registeredReaders().containsKey(subtaskId)) { + assignment.put(subtaskId, new ArrayList<>(iter.getValue())); + } + } enumContext.assignSplits(new SplitsAssignment<>(assignment)); - assignment.values().forEach(l -> unassignedSplits.removeAll(l)); + assignment.keySet().forEach(pendingSplitAssignment::remove); + } + + private void calculateAndPutPendingAssignments(Collection newSplits) { + for (MockSourceSplit split : newSplits) { + int subtaskId = Integer.parseInt(split.splitId()) % enumContext.currentParallelism(); + putPendingAssignments(subtaskId, Collections.singletonList(split)); + } + } + + private void putPendingAssignments(int subtaskId, Collection splits) { + Set pendingSplits = + pendingSplitAssignment.computeIfAbsent(subtaskId, HashSet::new); + pendingSplits.addAll(splits); + splits.forEach(split -> globalSplitAssignment.put(split.splitId(), subtaskId)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index 680013b88627c..f75c9be0cea46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -113,6 +113,9 @@ public class SourceCoordinator /** The Source that is associated with this SourceCoordinator. */ private final Source source; + /** The serializer that handles the serde of the split. */ + private final SimpleVersionedSerializer splitSerializer; + /** The serializer that handles the serde of the SplitEnumerator checkpoints. */ private final SimpleVersionedSerializer enumCheckpointSerializer; @@ -163,6 +166,7 @@ public SourceCoordinator( this.operatorName = operatorName; this.source = source; this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer(); + this.splitSerializer = source.getSplitSerializer(); this.context = context; this.coordinatorStore = coordinatorStore; this.watermarkAlignmentParams = watermarkAlignmentParams; @@ -427,7 +431,7 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture r // assignments byte[] assignmentData = context.getAssignmentTracker() - .snapshotState(source.getSplitSerializer()); + .snapshotState(splitSerializer); out.writeInt(assignmentData.length); out.write(assignmentData); @@ -680,7 +684,7 @@ private void handleSourceEvent(int subtask, int attemptNumber, SourceEvent event } private void handleReaderRegistrationEvent( - int subtask, int attemptNumber, ReaderRegistrationEvent event) { + int subtask, int attemptNumber, ReaderRegistrationEvent event) throws Exception { checkArgument(subtask == event.subtaskId()); LOG.info( @@ -692,7 +696,8 @@ private void handleReaderRegistrationEvent( final boolean subtaskReaderExisted = context.registeredReadersOfAttempts().containsKey(subtask); - context.registerSourceReader(subtask, attemptNumber, event.location()); + context.registerSourceReader( + subtask, attemptNumber, event.location(), event.splits(splitSerializer)); if (!subtaskReaderExisted) { enumerator.addReader(event.subtaskId()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index b469d6ecfd7e4..3f9d78b58643a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -465,8 +465,10 @@ void onCheckpoint(long checkpointId) throws Exception { * @param subtaskId the subtask id of the source reader. * @param attemptNumber the attempt number of the source reader. * @param location the location of the source reader. + * @param splits the split restored from source reader's state. */ - void registerSourceReader(int subtaskId, int attemptNumber, String location) { + void registerSourceReader( + int subtaskId, int attemptNumber, String location, List splits) { final Map attemptReaders = registeredReaders.computeIfAbsent(subtaskId, k -> new ConcurrentHashMap<>()); checkState( @@ -474,7 +476,7 @@ void registerSourceReader(int subtaskId, int attemptNumber, String location) { "ReaderInfo of subtask %s (#%s) already exists.", subtaskId, attemptNumber); - attemptReaders.put(attemptNumber, new ReaderInfo(subtaskId, location)); + attemptReaders.put(attemptNumber, ReaderInfo.createReaderInfo(subtaskId, location, splits)); sendCachedSplitsToNewlyRegisteredReader(subtaskId, attemptNumber); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java index 2f707cd550f02..528c1b3e19bff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java @@ -18,11 +18,19 @@ Licensed to the Apache Software Foundation (ASF) under one package org.apache.flink.runtime.source.event; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + /** - * An {@link OperatorEvent} that registers a {@link - * org.apache.flink.api.connector.source.SourceReader SourceReader} to the SourceCoordinator. + * The SourceOperator should always send the ReaderRegistrationEvent with the + * `reportedSplitsOnRegistration` list. But it will not add the splits to readers if {@link + * org.apache.flink.api.connector.source.SupportSplitReassignmentOnRecovery} is implemented. */ public class ReaderRegistrationEvent implements OperatorEvent { @@ -30,10 +38,44 @@ public class ReaderRegistrationEvent implements OperatorEvent { private final int subtaskId; private final String location; + private final ArrayList splits; public ReaderRegistrationEvent(int subtaskId, String location) { this.subtaskId = subtaskId; this.location = location; + this.splits = new ArrayList<>(); + } + + ReaderRegistrationEvent(int subtaskId, String location, ArrayList splits) { + this.subtaskId = subtaskId; + this.location = location; + this.splits = splits; + } + + public static + ReaderRegistrationEvent createReaderRegistrationEvent( + int subtaskId, + String location, + List splits, + SimpleVersionedSerializer splitSerializer) + throws IOException { + ArrayList result = new ArrayList<>(); + for (SplitT split : splits) { + result.add(splitSerializer.serialize(split)); + } + return new ReaderRegistrationEvent(subtaskId, location, result); + } + + public List splits( + SimpleVersionedSerializer splitSerializer) throws IOException { + if (splits.isEmpty()) { + return Collections.emptyList(); + } + List result = new ArrayList<>(splits.size()); + for (byte[] serializedSplit : splits) { + result.add(splitSerializer.deserialize(splitSerializer.getVersion(), serializedSplit)); + } + return result; } public int subtaskId() { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index e6cb4b995189f..30e1250c2671f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -215,6 +215,8 @@ private enum OperatingMode { /** Watermark identifier to whether the watermark are aligned. */ private final Map watermarkIsAlignedMap; + private final boolean supportSupportSplitReassignmentOnRecovery; + public SourceOperator( StreamOperatorParameters parameters, FunctionWithException, Exception> @@ -227,7 +229,8 @@ public SourceOperator( String localHostname, boolean emitProgressiveWatermarks, CanEmitBatchOfRecordsChecker canEmitBatchOfRecords, - Map watermarkIsAlignedMap) { + Map watermarkIsAlignedMap, + boolean supportSupportSplitReassignmentOnRecovery) { super(parameters); this.readerFactory = checkNotNull(readerFactory); this.operatorEventGateway = checkNotNull(operatorEventGateway); @@ -242,6 +245,7 @@ public SourceOperator( this.allowUnalignedSourceSplits = configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS); this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords); this.watermarkIsAlignedMap = watermarkIsAlignedMap; + this.supportSupportSplitReassignmentOnRecovery = supportSupportSplitReassignmentOnRecovery; } @Override @@ -411,7 +415,7 @@ public void open() throws Exception { // restore the state if necessary. final List splits = CollectionUtil.iterableToList(readerState.get()); - if (!splits.isEmpty()) { + if (!splits.isEmpty() && !supportSupportSplitReassignmentOnRecovery) { LOG.info("Restoring state for {} split(s) to reader.", splits.size()); for (SplitT s : splits) { getOrCreateSplitMetricGroup(s.splitId()); @@ -421,7 +425,8 @@ public void open() throws Exception { } // Register the reader to the coordinator. - registerReader(); + registerReader( + supportSupportSplitReassignmentOnRecovery ? splits : Collections.emptyList()); sourceMetricGroup.idlingStarted(); // Start the reader after registration, sending messages in start is allowed. @@ -811,10 +816,13 @@ private boolean shouldWaitForAlignment() { return currentMaxDesiredWatermark < latestWatermark; } - private void registerReader() { + private void registerReader(List splits) throws Exception { operatorEventGateway.sendEventToCoordinator( - new ReaderRegistrationEvent( - getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), localHostname)); + ReaderRegistrationEvent.createReaderRegistrationEvent( + getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), + localHostname, + splits, + splitSerializer)); } // --------------- methods for unit tests ------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java index caa78b022ce04..1bf3e6102fbaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java @@ -27,6 +27,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SupportSplitReassignmentOnRecovery; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -128,7 +129,8 @@ public > T createStreamOperator( .getTaskManagerExternalAddress(), emitProgressiveWatermarks, parameters.getContainingTask().getCanEmitBatchOfRecords(), - getSourceWatermarkDeclarations()); + getSourceWatermarkDeclarations(), + source instanceof SupportSplitReassignmentOnRecovery); parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator); @@ -199,7 +201,8 @@ SourceOperator instantiateSourceOperator( String localHostName, boolean emitProgressiveWatermarks, CanEmitBatchOfRecordsChecker canEmitBatchOfRecords, - Collection watermarkDeclarations) { + Collection watermarkDeclarations, + boolean supportSupportSplitReassignmentOnRecovery) { // jumping through generics hoops: cast the generics away to then cast them back more // strictly typed @@ -231,6 +234,7 @@ SourceOperator instantiateSourceOperator( localHostName, emitProgressiveWatermarks, canEmitBatchOfRecords, - watermarkIsAlignedMap); + watermarkIsAlignedMap, + supportSupportSplitReassignmentOnRecovery); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java index 4b92191d45b58..c4fb85bedce26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java @@ -57,6 +57,21 @@ void testRegisterReader() throws Exception { final TestingSplitEnumerator enumerator = getEnumerator(); assertThat(enumerator.getRegisteredReaders()).containsExactlyInAnyOrder(0, 1, 2); + + ReaderInfo readerInfoOfSubtask1 = + ReaderInfo.createReaderInfo( + 1, "subtask_1_location", Collections.singletonList(new MockSourceSplit(1))); + sourceCoordinator.subtaskReset(1, 1); + sourceCoordinator.handleEventFromOperator( + 1, + 1, + ReaderRegistrationEvent.createReaderRegistrationEvent( + readerInfoOfSubtask1.getSubtaskId(), + readerInfoOfSubtask1.getLocation(), + readerInfoOfSubtask1.getReportedSplitsOnRegistration(), + new MockSourceSplitSerializer())); + waitForCoordinatorToProcessActions(); + assertThat(context.registeredReaders().get(1)).isEqualTo(readerInfoOfSubtask1); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java index 0bf8e526f3625..b52403c8eae90 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java @@ -42,9 +42,11 @@ import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.source.event.AddSplitEvent; import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.runtime.source.event.RequestSplitEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.util.concurrent.FutureConsumerWithException; import org.apache.flink.util.function.ThrowingRunnable; import org.junit.jupiter.api.Test; @@ -598,6 +600,125 @@ class TestDynamicFilteringEvent implements SourceEvent, DynamicFilteringInfo {} assertThat(coordinator.inferSourceParallelismAsync(2, 1).get()).isEqualTo(2); } + @Test + void testDuplicateRedistribution() throws Exception { + final List splits = + Arrays.asList( + new MockSourceSplit(0), new MockSourceSplit(1), new MockSourceSplit(2)); + + testRedistribution( + (coordinator) -> { + registerReader(coordinator, 0, 0, splits); + registerReader(coordinator, 1, 0, Collections.emptyList()); + registerReader(coordinator, 2, 0, Collections.emptyList()); + waitForCoordinatorToProcessActions(); + checkAddSplitEvents(new int[][] {new int[] {1}, new int[] {1}, new int[] {1}}); + + // duplicate registration + registerReader(coordinator, 0, 0, splits); + waitForCoordinatorToProcessActions(); + // split 1,2,3 won't be sent again. + checkAddSplitEvents(new int[][] {new int[] {1}, new int[] {1}, new int[] {1}}); + }); + } + + @Test + void testRedistributionInPartialRestartBeforeAnyCheckpoint() throws Exception { + final List splits = + Arrays.asList( + new MockSourceSplit(0), new MockSourceSplit(1), new MockSourceSplit(2)); + + testRedistribution( + (coordinator) -> { + registerReader(coordinator, 0, 0, splits); + registerReader(coordinator, 1, 0, Collections.emptyList()); + registerReader(coordinator, 2, 0, Collections.emptyList()); + waitForCoordinatorToProcessActions(); + checkAddSplitEvents(new int[][] {new int[] {1}, new int[] {1}, new int[] {1}}); + + coordinator.subtaskReset(0, 0); + setReaderTaskReady(coordinator, 0, 1); + registerReader(coordinator, 0, 1, splits); + waitForCoordinatorToProcessActions(); + checkAddSplitEvents( + new int[][] {new int[] {1, 1}, new int[] {1}, new int[] {1}}); + }); + } + + @Test + void testRedistributionInPartialRestartAfterCheckpoint() throws Exception { + final List splits = + Arrays.asList( + new MockSourceSplit(0), new MockSourceSplit(1), new MockSourceSplit(2)); + + testRedistribution( + (coordinator) -> { + registerReader(coordinator, 0, 0, splits); + registerReader(coordinator, 1, 0, Collections.emptyList()); + registerReader(coordinator, 2, 0, Collections.emptyList()); + waitForCoordinatorToProcessActions(); + checkAddSplitEvents(new int[][] {new int[] {1}, new int[] {1}, new int[] {1}}); + + CompletableFuture fulture = new CompletableFuture<>(); + coordinator.checkpointCoordinator(1, fulture); + fulture.get(); + + coordinator.subtaskReset(0, 0); + setReaderTaskReady(coordinator, 0, 1); + registerReader( + coordinator, 0, 1, Collections.singletonList(new MockSourceSplit(0))); + waitForCoordinatorToProcessActions(); + checkAddSplitEvents( + new int[][] {new int[] {1, 1}, new int[] {1}, new int[] {1}}); + }); + } + + void testRedistribution( + FutureConsumerWithException< + SourceCoordinator>, Exception> + consumer) + throws Exception { + try (final SplitEnumerator> splitEnumerator = + new MockSplitEnumerator(0, context); + final SourceCoordinator> coordinator = + new SourceCoordinator<>( + new JobID(), + OPERATOR_NAME, + new EnumeratorCreatingSource<>(() -> splitEnumerator), + context, + new CoordinatorStoreImpl(), + WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, + null)) { + + coordinator.start(); + setAllReaderTasksReady(coordinator); + + consumer.accept(coordinator); + } + } + + private void checkAddSplitEvents(int[][] expectedAssignedSplitNum) { + MockSourceSplitSerializer mockSourceSplitSerializer = new MockSourceSplitSerializer(); + assertThat(expectedAssignedSplitNum.length).isEqualTo(NUM_SUBTASKS); + for (int i = 0; i < NUM_SUBTASKS; i++) { + List sentEventsForSubtask = receivingTasks.getSentEventsForSubtask(i); + assertThat(sentEventsForSubtask).hasSize(expectedAssignedSplitNum[i].length); + for (int j = 0; j < sentEventsForSubtask.size(); j++) { + assertThat(sentEventsForSubtask.get(j)).isExactlyInstanceOf(AddSplitEvent.class); + List splits; + try { + splits = + ((AddSplitEvent) sentEventsForSubtask.get(j)) + .splits(mockSourceSplitSerializer); + } catch (Exception e) { + throw new RuntimeException(); + } + + assertThat(splits).hasSize(expectedAssignedSplitNum[i][j]); + } + } + } + // ------------------------------------------------------------------------ // test helpers // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java index 61ece5c357fe9..0fdb564070a26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -141,6 +142,22 @@ protected void registerReader(int subtask, int attemptNumber) { new ReaderRegistrationEvent(subtask, createLocationFor(subtask, attemptNumber))); } + protected void registerReader( + SourceCoordinator> coordinator, + int subtask, + int attemptNumber, + List splits) + throws IOException { + coordinator.handleEventFromOperator( + subtask, + attemptNumber, + ReaderRegistrationEvent.createReaderRegistrationEvent( + subtask, + createLocationFor(subtask, attemptNumber), + splits, + new MockSourceSplitSerializer())); + } + static String createLocationFor(int subtask, int attemptNumber) { return String.format("location_%d_%d", subtask, attemptNumber); } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java index 8cc98109fdbc6..35621fbf5bcc8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java @@ -99,7 +99,8 @@ void testSplitWatermarkAlignment() throws Exception { new MockOperatorEventGateway(), 1, 5, - true); + true, + false); operator.initializeState( new StreamTaskStateInitializerImpl(env, new HashMapStateBackend())); @@ -547,7 +548,8 @@ private SourceOperator createAndOpenSourceOperatorWith new MockOperatorEventGateway(), 1, 5, - true); + true, + false); operator.initializeState( new StreamTaskStateInitializerImpl(env, new HashMapStateBackend())); operator.open(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index 25f54a632c55b..59e0dd97e6c82 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -43,11 +43,14 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.streaming.util.CollectorOutput; +import org.apache.flink.streaming.util.MockOutput; import org.apache.flink.util.CollectionUtil; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; @@ -99,24 +102,46 @@ void testInitializeState() throws Exception { .isNotNull(); } - @Test - void testOpen() throws Exception { - // Initialize the operator. - operator.initializeState(context.createStateContext()); - // Open the operator. - operator.open(); - // The source reader should have been assigned a split. - assertThat(mockSourceReader.getAssignedSplits()) - .containsExactly(SourceOperatorTestContext.MOCK_SPLIT); - // The source reader should have started. - assertThat(mockSourceReader.isStarted()).isTrue(); - - // A ReaderRegistrationRequest should have been sent. - assertThat(mockGateway.getEventsSent()).hasSize(1); - OperatorEvent operatorEvent = mockGateway.getEventsSent().get(0); - assertThat(operatorEvent).isInstanceOf(ReaderRegistrationEvent.class); - assertThat(((ReaderRegistrationEvent) operatorEvent).subtaskId()) - .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testOpen(boolean supportSupportSplitReassignmentOnRecovery) throws Exception { + try (SourceOperatorTestContext context = + new SourceOperatorTestContext( + false, + false, + WatermarkStrategy.noWatermarks(), + new MockOutput<>(new ArrayList<>()), + supportSupportSplitReassignmentOnRecovery)) { + SourceOperator operator = context.getOperator(); + // Initialize the operator. + operator.initializeState(context.createStateContext()); + // Open the operator. + operator.open(); + // The source reader should have been assigned a split. + if (supportSupportSplitReassignmentOnRecovery) { + assertThat(context.getSourceReader().getAssignedSplits()).isEmpty(); + } else { + assertThat(context.getSourceReader().getAssignedSplits()) + .containsExactly(SourceOperatorTestContext.MOCK_SPLIT); + } + + // The source reader should have started. + assertThat(context.getSourceReader().isStarted()).isTrue(); + + // A ReaderRegistrationRequest should have been sent. + assertThat(context.getGateway().getEventsSent()).hasSize(1); + OperatorEvent operatorEvent = context.getGateway().getEventsSent().get(0); + assertThat(operatorEvent).isInstanceOf(ReaderRegistrationEvent.class); + ReaderRegistrationEvent registrationEvent = (ReaderRegistrationEvent) operatorEvent; + assertThat(registrationEvent.subtaskId()) + .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX); + if (supportSupportSplitReassignmentOnRecovery) { + assertThat(registrationEvent.splits(new MockSourceSplitSerializer())) + .containsExactly(SourceOperatorTestContext.MOCK_SPLIT); + } else { + assertThat(registrationEvent.splits(new MockSourceSplitSerializer())).isEmpty(); + } + } } @Test @@ -210,7 +235,8 @@ void testHandleBacklogEvent() throws Exception { false, WatermarkStrategy.forMonotonousTimestamps() .withTimestampAssigner((element, recordTimestamp) -> element), - new CollectorOutput<>(outputStreamElements)); + new CollectorOutput<>(outputStreamElements), + false); operator = context.getOperator(); operator.initializeState(context.createStateContext()); operator.open(); @@ -251,7 +277,7 @@ public void testMetricGroupIsCreatedForNewSplit() throws Exception { @Test public void testMetricGroupIsCreatedForRestoredSplit() throws Exception { - MockSourceSplit restoredSplit = new MockSourceSplit((2)); + MockSourceSplit restoredSplit = new MockSourceSplit((1)); StateInitializationContext stateContext = context.createStateContext(Collections.singletonList(restoredSplit)); operator.initializeState(stateContext); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java index f7e75788eb3d4..e483ea333871f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java @@ -75,14 +75,15 @@ public SourceOperatorTestContext(boolean idle) throws Exception { public SourceOperatorTestContext(boolean idle, WatermarkStrategy watermarkStrategy) throws Exception { - this(idle, false, watermarkStrategy, new MockOutput<>(new ArrayList<>())); + this(idle, false, watermarkStrategy, new MockOutput<>(new ArrayList<>()), false); } public SourceOperatorTestContext( boolean idle, boolean usePerSplitOutputs, WatermarkStrategy watermarkStrategy, - Output> output) + Output> output, + boolean supportSupportSplitReassignmentOnRecovery) throws Exception { mockSourceReader = new MockSourceReader( @@ -109,7 +110,8 @@ public SourceOperatorTestContext( mockGateway, SUBTASK_INDEX, 5, - true); + true, + supportSupportSplitReassignmentOnRecovery); operator.initializeState( new StreamTaskStateInitializerImpl(env, new HashMapStateBackend())); } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java index 63b7b8ff3712b..7bef5d80c6cae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java @@ -54,7 +54,8 @@ void setup() throws Exception { new SourceOperatorAlignmentTest .PunctuatedGenerator()) .withTimestampAssigner((r, t) -> r), - new MockOutput<>(new ArrayList<>())); + new MockOutput<>(new ArrayList<>()), + false); operator = context.getOperator(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java index 40a9bfe948677..5ea7b49fc5072 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java @@ -64,7 +64,8 @@ public TestingSourceOperator( SourceReader reader, WatermarkStrategy watermarkStrategy, ProcessingTimeService timeService, - boolean emitProgressiveWatermarks) { + boolean emitProgressiveWatermarks, + boolean supportSupportSplitReassignmentOnRecovery) { this( parameters, @@ -74,7 +75,8 @@ public TestingSourceOperator( new MockOperatorEventGateway(), 1, 5, - emitProgressiveWatermarks); + emitProgressiveWatermarks, + supportSupportSplitReassignmentOnRecovery); } public TestingSourceOperator( @@ -85,7 +87,8 @@ public TestingSourceOperator( OperatorEventGateway eventGateway, int subtaskIndex, int parallelism, - boolean emitProgressiveWatermarks) { + boolean emitProgressiveWatermarks, + boolean supportSupportSplitReassignmentOnRecovery) { super( parameters, @@ -98,7 +101,8 @@ public TestingSourceOperator( "localhost", emitProgressiveWatermarks, () -> false, - Collections.emptyMap()); + Collections.emptyMap(), + supportSupportSplitReassignmentOnRecovery); this.subtaskIndex = subtaskIndex; this.parallelism = parallelism; @@ -130,6 +134,15 @@ public static SourceOperator createTestOperator( WatermarkStrategy watermarkStrategy, boolean emitProgressiveWatermarks) throws Exception { + return createTestOperator(reader, watermarkStrategy, emitProgressiveWatermarks, false); + } + + public static SourceOperator createTestOperator( + SourceReader reader, + WatermarkStrategy watermarkStrategy, + boolean emitProgressiveWatermarks, + boolean supportSupportSplitReassignmentOnRecovery) + throws Exception { AbstractStateBackend abstractStateBackend = new HashMapStateBackend(); Environment env = new MockEnvironmentBuilder().build(); @@ -168,7 +181,8 @@ public static SourceOperator createTestOperator( reader, watermarkStrategy, timeService, - emitProgressiveWatermarks); + emitProgressiveWatermarks, + supportSupportSplitReassignmentOnRecovery); sourceOperator.initializeState(stateContext); sourceOperator.open(); diff --git a/pom.xml b/pom.xml index b9cc0ab6c8e24..d93d695a0c42b 100644 --- a/pom.xml +++ b/pom.xml @@ -1910,7 +1910,7 @@ under the License. - [3.8.6] + 3.8.6 ${target.java.version} From ce9a85af40e2dc0d6069c02e3b4a98e066ca9afe Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 30 Oct 2025 10:23:44 +0800 Subject: [PATCH 2/3] modified based on CR --- .../flink/api/connector/source/ReaderInfo.java | 2 +- ...=> SupportsSplitReassignmentOnRecovery.java} | 4 ++-- .../source/mocks/MockSplitEnumerator.java | 17 ++++++++--------- .../source/event/ReaderRegistrationEvent.java | 3 ++- .../streaming/api/operators/SourceOperator.java | 11 +++++------ .../api/operators/SourceOperatorFactory.java | 8 ++++---- .../api/operators/SourceOperatorTest.java | 8 ++++---- .../operators/SourceOperatorTestContext.java | 4 ++-- .../operators/source/TestingSourceOperator.java | 12 ++++++------ 9 files changed, 34 insertions(+), 35 deletions(-) rename flink-core/src/main/java/org/apache/flink/api/connector/source/{SupportSplitReassignmentOnRecovery.java => SupportsSplitReassignmentOnRecovery.java} (86%) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java index bef8e51193b6b..27e6f871f4edf 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java @@ -29,7 +29,7 @@ * A container class hosting the information of a {@link SourceReader}. * *

The {@code reportedSplitsOnRegistration} can only be provided when the source implements - * {@link SupportSplitReassignmentOnRecovery}. + * {@link SupportsSplitReassignmentOnRecovery}. */ @Public public final class ReaderInfo implements Serializable { diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportSplitReassignmentOnRecovery.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java similarity index 86% rename from flink-core/src/main/java/org/apache/flink/api/connector/source/SupportSplitReassignmentOnRecovery.java rename to flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java index 0f4453b149d9f..0d06b4994eb90 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportSplitReassignmentOnRecovery.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java @@ -21,8 +21,8 @@ import org.apache.flink.annotation.PublicEvolving; /** - * A decorative interface {@link Source}. Implementing this interface indicates that the source + * A decorative interface for {@link Source}. Implementing this interface indicates that the source * operator needs to report splits to the enumerator and receive reassignment. */ @PublicEvolving -public interface SupportSplitReassignmentOnRecovery {} +public interface SupportsSplitReassignmentOnRecovery {} diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java index bfa4c6834fea2..2adc79360f33e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java @@ -41,7 +41,6 @@ /** A mock {@link SplitEnumerator} for unit tests. */ public class MockSplitEnumerator implements SplitEnumerator>, SupportsBatchSnapshot { - // 扩成16个partition, unas private final Map> pendingSplitAssignment; private final Map globalSplitAssignment; private final SplitEnumeratorContext enumContext; @@ -56,7 +55,7 @@ public MockSplitEnumerator(int numSplits, SplitEnumeratorContext(); this.started = false; this.closed = false; - calculateAndPutPendingAssignments(unassignedSplits); + recalculateAssignments(unassignedSplits); } @Override @@ -100,16 +99,16 @@ public void addReader(int subtaskId) { List addBackSplits = new ArrayList<>(); for (MockSourceSplit split : splitsOnRecovery) { if (!globalSplitAssignment.containsKey(split.splitId())) { - // if split not existed in globalSplitAssignment, mean that it's registered first - // time, can be redistibuted. + // if the split is not present in globalSplitAssignment, it means that this split is + // being registered for the first time and is eligible for redistribution. redistributedSplits.add(split); } else if (!globalSplitAssignment.containsKey(split.splitId())) { - // if split already is assigned to other substaskId, just ignore it. Otherwise, - // addback to this subtaskId again. + // if split is already assigned to other sub-task, just ignore it. Otherwise, add + // back to this sub-task again. addBackSplits.add(split); } } - calculateAndPutPendingAssignments(redistributedSplits); + recalculateAssignments(redistributedSplits); putPendingAssignments(subtaskId, addBackSplits); assignAllSplits(); } @@ -167,7 +166,7 @@ private void assignAllSplits() { assignment.keySet().forEach(pendingSplitAssignment::remove); } - private void calculateAndPutPendingAssignments(Collection newSplits) { + private void recalculateAssignments(Collection newSplits) { for (MockSourceSplit split : newSplits) { int subtaskId = Integer.parseInt(split.splitId()) % enumContext.currentParallelism(); putPendingAssignments(subtaskId, Collections.singletonList(split)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java index 528c1b3e19bff..6248321574091 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java @@ -19,6 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one package org.apache.flink.runtime.source.event; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -30,7 +31,7 @@ Licensed to the Apache Software Foundation (ASF) under one /** * The SourceOperator should always send the ReaderRegistrationEvent with the * `reportedSplitsOnRegistration` list. But it will not add the splits to readers if {@link - * org.apache.flink.api.connector.source.SupportSplitReassignmentOnRecovery} is implemented. + * SupportsSplitReassignmentOnRecovery} is implemented. */ public class ReaderRegistrationEvent implements OperatorEvent { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 30e1250c2671f..09b12007d3349 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -215,7 +215,7 @@ private enum OperatingMode { /** Watermark identifier to whether the watermark are aligned. */ private final Map watermarkIsAlignedMap; - private final boolean supportSupportSplitReassignmentOnRecovery; + private final boolean supportsSplitReassignmentOnRecovery; public SourceOperator( StreamOperatorParameters parameters, @@ -230,7 +230,7 @@ public SourceOperator( boolean emitProgressiveWatermarks, CanEmitBatchOfRecordsChecker canEmitBatchOfRecords, Map watermarkIsAlignedMap, - boolean supportSupportSplitReassignmentOnRecovery) { + boolean supportsSplitReassignmentOnRecovery) { super(parameters); this.readerFactory = checkNotNull(readerFactory); this.operatorEventGateway = checkNotNull(operatorEventGateway); @@ -245,7 +245,7 @@ public SourceOperator( this.allowUnalignedSourceSplits = configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS); this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords); this.watermarkIsAlignedMap = watermarkIsAlignedMap; - this.supportSupportSplitReassignmentOnRecovery = supportSupportSplitReassignmentOnRecovery; + this.supportsSplitReassignmentOnRecovery = supportsSplitReassignmentOnRecovery; } @Override @@ -415,7 +415,7 @@ public void open() throws Exception { // restore the state if necessary. final List splits = CollectionUtil.iterableToList(readerState.get()); - if (!splits.isEmpty() && !supportSupportSplitReassignmentOnRecovery) { + if (!splits.isEmpty() && !supportsSplitReassignmentOnRecovery) { LOG.info("Restoring state for {} split(s) to reader.", splits.size()); for (SplitT s : splits) { getOrCreateSplitMetricGroup(s.splitId()); @@ -425,8 +425,7 @@ public void open() throws Exception { } // Register the reader to the coordinator. - registerReader( - supportSupportSplitReassignmentOnRecovery ? splits : Collections.emptyList()); + registerReader(supportsSplitReassignmentOnRecovery ? splits : Collections.emptyList()); sourceMetricGroup.idlingStarted(); // Start the reader after registration, sending messages in start is allowed. diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java index 1bf3e6102fbaa..6d1cbc8889730 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java @@ -27,7 +27,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.api.connector.source.SupportSplitReassignmentOnRecovery; +import org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -130,7 +130,7 @@ public > T createStreamOperator( emitProgressiveWatermarks, parameters.getContainingTask().getCanEmitBatchOfRecords(), getSourceWatermarkDeclarations(), - source instanceof SupportSplitReassignmentOnRecovery); + source instanceof SupportsSplitReassignmentOnRecovery); parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator); @@ -202,7 +202,7 @@ SourceOperator instantiateSourceOperator( boolean emitProgressiveWatermarks, CanEmitBatchOfRecordsChecker canEmitBatchOfRecords, Collection watermarkDeclarations, - boolean supportSupportSplitReassignmentOnRecovery) { + boolean supportsSplitReassignmentOnRecovery) { // jumping through generics hoops: cast the generics away to then cast them back more // strictly typed @@ -235,6 +235,6 @@ SourceOperator instantiateSourceOperator( emitProgressiveWatermarks, canEmitBatchOfRecords, watermarkIsAlignedMap, - supportSupportSplitReassignmentOnRecovery); + supportsSplitReassignmentOnRecovery); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index 59e0dd97e6c82..aa4d2cf70814b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -104,21 +104,21 @@ void testInitializeState() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) - void testOpen(boolean supportSupportSplitReassignmentOnRecovery) throws Exception { + void testOpen(boolean supportsSplitReassignmentOnRecovery) throws Exception { try (SourceOperatorTestContext context = new SourceOperatorTestContext( false, false, WatermarkStrategy.noWatermarks(), new MockOutput<>(new ArrayList<>()), - supportSupportSplitReassignmentOnRecovery)) { + supportsSplitReassignmentOnRecovery)) { SourceOperator operator = context.getOperator(); // Initialize the operator. operator.initializeState(context.createStateContext()); // Open the operator. operator.open(); // The source reader should have been assigned a split. - if (supportSupportSplitReassignmentOnRecovery) { + if (supportsSplitReassignmentOnRecovery) { assertThat(context.getSourceReader().getAssignedSplits()).isEmpty(); } else { assertThat(context.getSourceReader().getAssignedSplits()) @@ -135,7 +135,7 @@ void testOpen(boolean supportSupportSplitReassignmentOnRecovery) throws Exceptio ReaderRegistrationEvent registrationEvent = (ReaderRegistrationEvent) operatorEvent; assertThat(registrationEvent.subtaskId()) .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX); - if (supportSupportSplitReassignmentOnRecovery) { + if (supportsSplitReassignmentOnRecovery) { assertThat(registrationEvent.splits(new MockSourceSplitSerializer())) .containsExactly(SourceOperatorTestContext.MOCK_SPLIT); } else { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java index e483ea333871f..6e1985124451f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java @@ -83,7 +83,7 @@ public SourceOperatorTestContext( boolean usePerSplitOutputs, WatermarkStrategy watermarkStrategy, Output> output, - boolean supportSupportSplitReassignmentOnRecovery) + boolean supportsSplitReassignmentOnRecovery) throws Exception { mockSourceReader = new MockSourceReader( @@ -111,7 +111,7 @@ public SourceOperatorTestContext( SUBTASK_INDEX, 5, true, - supportSupportSplitReassignmentOnRecovery); + supportsSplitReassignmentOnRecovery); operator.initializeState( new StreamTaskStateInitializerImpl(env, new HashMapStateBackend())); } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java index 5ea7b49fc5072..dc8ebc9806e52 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java @@ -65,7 +65,7 @@ public TestingSourceOperator( WatermarkStrategy watermarkStrategy, ProcessingTimeService timeService, boolean emitProgressiveWatermarks, - boolean supportSupportSplitReassignmentOnRecovery) { + boolean supportsSplitReassignmentOnRecovery) { this( parameters, @@ -76,7 +76,7 @@ public TestingSourceOperator( 1, 5, emitProgressiveWatermarks, - supportSupportSplitReassignmentOnRecovery); + supportsSplitReassignmentOnRecovery); } public TestingSourceOperator( @@ -88,7 +88,7 @@ public TestingSourceOperator( int subtaskIndex, int parallelism, boolean emitProgressiveWatermarks, - boolean supportSupportSplitReassignmentOnRecovery) { + boolean supportsSplitReassignmentOnRecovery) { super( parameters, @@ -102,7 +102,7 @@ public TestingSourceOperator( emitProgressiveWatermarks, () -> false, Collections.emptyMap(), - supportSupportSplitReassignmentOnRecovery); + supportsSplitReassignmentOnRecovery); this.subtaskIndex = subtaskIndex; this.parallelism = parallelism; @@ -141,7 +141,7 @@ public static SourceOperator createTestOperator( SourceReader reader, WatermarkStrategy watermarkStrategy, boolean emitProgressiveWatermarks, - boolean supportSupportSplitReassignmentOnRecovery) + boolean supportsSplitReassignmentOnRecovery) throws Exception { AbstractStateBackend abstractStateBackend = new HashMapStateBackend(); @@ -182,7 +182,7 @@ public static SourceOperator createTestOperator( watermarkStrategy, timeService, emitProgressiveWatermarks, - supportSupportSplitReassignmentOnRecovery); + supportsSplitReassignmentOnRecovery); sourceOperator.initializeState(stateContext); sourceOperator.open(); From a67c2146eb0eb5ca953815aef6d7e8aefbd5092a Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 30 Oct 2025 15:04:23 +0800 Subject: [PATCH 3/3] modified based on CR --- .../connector/source/SupportsSplitReassignmentOnRecovery.java | 3 ++- pom.xml | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java index 0d06b4994eb90..225eb32dc66e0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java @@ -22,7 +22,8 @@ /** * A decorative interface for {@link Source}. Implementing this interface indicates that the source - * operator needs to report splits to the enumerator and receive reassignment. + * operator needs to report splits to the enumerator on start up and receive reassignment on + * recovery. */ @PublicEvolving public interface SupportsSplitReassignmentOnRecovery {} diff --git a/pom.xml b/pom.xml index d93d695a0c42b..b9cc0ab6c8e24 100644 --- a/pom.xml +++ b/pom.xml @@ -1910,7 +1910,7 @@ under the License. - 3.8.6 + [3.8.6] ${target.java.version}