diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
index 7a93c1c7549c3..27e6f871f4edf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderInfo.java
@@ -21,9 +21,16 @@
import org.apache.flink.annotation.Public;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
-/** A container class hosting the information of a {@link SourceReader}. */
+/**
+ * A container class hosting the information of a {@link SourceReader}.
+ *
+ *
The {@code reportedSplitsOnRegistration} can only be provided when the source implements
+ * {@link SupportsSplitReassignmentOnRecovery}.
+ */
@Public
public final class ReaderInfo implements Serializable {
@@ -31,10 +38,27 @@ public final class ReaderInfo implements Serializable {
private final int subtaskId;
private final String location;
+ private final List reportedSplitsOnRegistration;
public ReaderInfo(int subtaskId, String location) {
+ this(subtaskId, location, Collections.emptyList());
+ }
+
+ ReaderInfo(int subtaskId, String location, List splits) {
this.subtaskId = subtaskId;
this.location = location;
+ this.reportedSplitsOnRegistration = splits;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ReaderInfo createReaderInfo(
+ int subtaskId, String location, List splits) {
+ return new ReaderInfo(subtaskId, location, (List) splits);
+ }
+
+ @SuppressWarnings("unchecked")
+ public List getReportedSplitsOnRegistration() {
+ return (List) reportedSplitsOnRegistration;
}
/**
@@ -52,16 +76,18 @@ public String getLocation() {
}
@Override
- public int hashCode() {
- return Objects.hash(subtaskId, location);
+ public boolean equals(Object o) {
+ if (!(o instanceof ReaderInfo)) {
+ return false;
+ }
+ ReaderInfo that = (ReaderInfo) o;
+ return subtaskId == that.subtaskId
+ && Objects.equals(location, that.location)
+ && Objects.equals(reportedSplitsOnRegistration, that.reportedSplitsOnRegistration);
}
@Override
- public boolean equals(Object obj) {
- if (!(obj instanceof ReaderInfo)) {
- return false;
- }
- ReaderInfo other = (ReaderInfo) obj;
- return subtaskId == other.subtaskId && location.equals(other.location);
+ public int hashCode() {
+ return Objects.hash(subtaskId, location, reportedSplitsOnRegistration);
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java
new file mode 100644
index 0000000000000..225eb32dc66e0
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsSplitReassignmentOnRecovery.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A decorative interface for {@link Source}. Implementing this interface indicates that the source
+ * operator needs to report splits to the enumerator on start up and receive reassignment on
+ * recovery.
+ */
+@PublicEvolving
+public interface SupportsSplitReassignmentOnRecovery {}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
index 7a3763bc5f978..2adc79360f33e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.connector.source.mocks;
+import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -28,20 +29,20 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.stream.Collectors;
/** A mock {@link SplitEnumerator} for unit tests. */
public class MockSplitEnumerator
implements SplitEnumerator>, SupportsBatchSnapshot {
- private final SortedSet unassignedSplits;
+ private final Map> pendingSplitAssignment;
+ private final Map globalSplitAssignment;
private final SplitEnumeratorContext enumContext;
private final List handledSourceEvent;
private final List successfulCheckpoints;
@@ -50,22 +51,24 @@ public class MockSplitEnumerator
public MockSplitEnumerator(int numSplits, SplitEnumeratorContext enumContext) {
this(new HashSet<>(), enumContext);
+ List unassignedSplits = new ArrayList<>();
for (int i = 0; i < numSplits; i++) {
unassignedSplits.add(new MockSourceSplit(i));
}
+ recalculateAssignments(unassignedSplits);
}
public MockSplitEnumerator(
Set unassignedSplits,
SplitEnumeratorContext enumContext) {
- this.unassignedSplits =
- new TreeSet<>(Comparator.comparingInt(o -> Integer.parseInt(o.splitId())));
- this.unassignedSplits.addAll(unassignedSplits);
+ this.pendingSplitAssignment = new HashMap<>();
+ this.globalSplitAssignment = new HashMap<>();
this.enumContext = enumContext;
this.handledSourceEvent = new ArrayList<>();
this.successfulCheckpoints = new ArrayList<>();
this.started = false;
this.closed = false;
+ recalculateAssignments(unassignedSplits);
}
@Override
@@ -83,25 +86,36 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
@Override
public void addSplitsBack(List splits, int subtaskId) {
- unassignedSplits.addAll(splits);
+ // add back to same subtaskId.
+ putPendingAssignments(subtaskId, splits);
}
@Override
public void addReader(int subtaskId) {
- List assignment = new ArrayList<>();
- for (MockSourceSplit split : unassignedSplits) {
- if (Integer.parseInt(split.splitId()) % enumContext.currentParallelism() == subtaskId) {
- assignment.add(split);
+ ReaderInfo readerInfo = enumContext.registeredReaders().get(subtaskId);
+ List splitsOnRecovery = readerInfo.getReportedSplitsOnRegistration();
+
+ List redistributedSplits = new ArrayList<>();
+ List addBackSplits = new ArrayList<>();
+ for (MockSourceSplit split : splitsOnRecovery) {
+ if (!globalSplitAssignment.containsKey(split.splitId())) {
+ // if the split is not present in globalSplitAssignment, it means that this split is
+ // being registered for the first time and is eligible for redistribution.
+ redistributedSplits.add(split);
+ } else if (!globalSplitAssignment.containsKey(split.splitId())) {
+ // if split is already assigned to other sub-task, just ignore it. Otherwise, add
+ // back to this sub-task again.
+ addBackSplits.add(split);
}
}
- enumContext.assignSplits(
- new SplitsAssignment<>(Collections.singletonMap(subtaskId, assignment)));
- unassignedSplits.removeAll(assignment);
+ recalculateAssignments(redistributedSplits);
+ putPendingAssignments(subtaskId, addBackSplits);
+ assignAllSplits();
}
@Override
public Set snapshotState(long checkpointId) {
- return unassignedSplits;
+ return getUnassignedSplits();
}
@Override
@@ -114,11 +128,6 @@ public void close() throws IOException {
this.closed = true;
}
- public void addNewSplits(List newSplits) {
- unassignedSplits.addAll(newSplits);
- assignAllSplits();
- }
-
// --------------------
public boolean started() {
@@ -130,7 +139,9 @@ public boolean closed() {
}
public Set getUnassignedSplits() {
- return unassignedSplits;
+ return pendingSplitAssignment.values().stream()
+ .flatMap(Set::stream)
+ .collect(Collectors.toSet());
}
public List getHandledSourceEvent() {
@@ -145,17 +156,27 @@ public List getSuccessfulCheckpoints() {
private void assignAllSplits() {
Map> assignment = new HashMap<>();
- unassignedSplits.forEach(
- split -> {
- int subtaskId =
- Integer.parseInt(split.splitId()) % enumContext.currentParallelism();
- if (enumContext.registeredReaders().containsKey(subtaskId)) {
- assignment
- .computeIfAbsent(subtaskId, ignored -> new ArrayList<>())
- .add(split);
- }
- });
+ for (Map.Entry> iter : pendingSplitAssignment.entrySet()) {
+ Integer subtaskId = iter.getKey();
+ if (enumContext.registeredReaders().containsKey(subtaskId)) {
+ assignment.put(subtaskId, new ArrayList<>(iter.getValue()));
+ }
+ }
enumContext.assignSplits(new SplitsAssignment<>(assignment));
- assignment.values().forEach(l -> unassignedSplits.removeAll(l));
+ assignment.keySet().forEach(pendingSplitAssignment::remove);
+ }
+
+ private void recalculateAssignments(Collection newSplits) {
+ for (MockSourceSplit split : newSplits) {
+ int subtaskId = Integer.parseInt(split.splitId()) % enumContext.currentParallelism();
+ putPendingAssignments(subtaskId, Collections.singletonList(split));
+ }
+ }
+
+ private void putPendingAssignments(int subtaskId, Collection splits) {
+ Set pendingSplits =
+ pendingSplitAssignment.computeIfAbsent(subtaskId, HashSet::new);
+ pendingSplits.addAll(splits);
+ splits.forEach(split -> globalSplitAssignment.put(split.splitId(), subtaskId));
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 680013b88627c..f75c9be0cea46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -113,6 +113,9 @@ public class SourceCoordinator
/** The Source that is associated with this SourceCoordinator. */
private final Source, SplitT, EnumChkT> source;
+ /** The serializer that handles the serde of the split. */
+ private final SimpleVersionedSerializer splitSerializer;
+
/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
private final SimpleVersionedSerializer enumCheckpointSerializer;
@@ -163,6 +166,7 @@ public SourceCoordinator(
this.operatorName = operatorName;
this.source = source;
this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
+ this.splitSerializer = source.getSplitSerializer();
this.context = context;
this.coordinatorStore = coordinatorStore;
this.watermarkAlignmentParams = watermarkAlignmentParams;
@@ -427,7 +431,7 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture r
// assignments
byte[] assignmentData =
context.getAssignmentTracker()
- .snapshotState(source.getSplitSerializer());
+ .snapshotState(splitSerializer);
out.writeInt(assignmentData.length);
out.write(assignmentData);
@@ -680,7 +684,7 @@ private void handleSourceEvent(int subtask, int attemptNumber, SourceEvent event
}
private void handleReaderRegistrationEvent(
- int subtask, int attemptNumber, ReaderRegistrationEvent event) {
+ int subtask, int attemptNumber, ReaderRegistrationEvent event) throws Exception {
checkArgument(subtask == event.subtaskId());
LOG.info(
@@ -692,7 +696,8 @@ private void handleReaderRegistrationEvent(
final boolean subtaskReaderExisted =
context.registeredReadersOfAttempts().containsKey(subtask);
- context.registerSourceReader(subtask, attemptNumber, event.location());
+ context.registerSourceReader(
+ subtask, attemptNumber, event.location(), event.splits(splitSerializer));
if (!subtaskReaderExisted) {
enumerator.addReader(event.subtaskId());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index b469d6ecfd7e4..3f9d78b58643a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -465,8 +465,10 @@ void onCheckpoint(long checkpointId) throws Exception {
* @param subtaskId the subtask id of the source reader.
* @param attemptNumber the attempt number of the source reader.
* @param location the location of the source reader.
+ * @param splits the split restored from source reader's state.
*/
- void registerSourceReader(int subtaskId, int attemptNumber, String location) {
+ void registerSourceReader(
+ int subtaskId, int attemptNumber, String location, List splits) {
final Map attemptReaders =
registeredReaders.computeIfAbsent(subtaskId, k -> new ConcurrentHashMap<>());
checkState(
@@ -474,7 +476,7 @@ void registerSourceReader(int subtaskId, int attemptNumber, String location) {
"ReaderInfo of subtask %s (#%s) already exists.",
subtaskId,
attemptNumber);
- attemptReaders.put(attemptNumber, new ReaderInfo(subtaskId, location));
+ attemptReaders.put(attemptNumber, ReaderInfo.createReaderInfo(subtaskId, location, splits));
sendCachedSplitsToNewlyRegisteredReader(subtaskId, attemptNumber);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
index 2f707cd550f02..6248321574091 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/ReaderRegistrationEvent.java
@@ -18,11 +18,20 @@ Licensed to the Apache Software Foundation (ASF) under one
package org.apache.flink.runtime.source.event;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
/**
- * An {@link OperatorEvent} that registers a {@link
- * org.apache.flink.api.connector.source.SourceReader SourceReader} to the SourceCoordinator.
+ * The SourceOperator should always send the ReaderRegistrationEvent with the
+ * `reportedSplitsOnRegistration` list. But it will not add the splits to readers if {@link
+ * SupportsSplitReassignmentOnRecovery} is implemented.
*/
public class ReaderRegistrationEvent implements OperatorEvent {
@@ -30,10 +39,44 @@ public class ReaderRegistrationEvent implements OperatorEvent {
private final int subtaskId;
private final String location;
+ private final ArrayList splits;
public ReaderRegistrationEvent(int subtaskId, String location) {
this.subtaskId = subtaskId;
this.location = location;
+ this.splits = new ArrayList<>();
+ }
+
+ ReaderRegistrationEvent(int subtaskId, String location, ArrayList splits) {
+ this.subtaskId = subtaskId;
+ this.location = location;
+ this.splits = splits;
+ }
+
+ public static
+ ReaderRegistrationEvent createReaderRegistrationEvent(
+ int subtaskId,
+ String location,
+ List splits,
+ SimpleVersionedSerializer splitSerializer)
+ throws IOException {
+ ArrayList result = new ArrayList<>();
+ for (SplitT split : splits) {
+ result.add(splitSerializer.serialize(split));
+ }
+ return new ReaderRegistrationEvent(subtaskId, location, result);
+ }
+
+ public List splits(
+ SimpleVersionedSerializer splitSerializer) throws IOException {
+ if (splits.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List result = new ArrayList<>(splits.size());
+ for (byte[] serializedSplit : splits) {
+ result.add(splitSerializer.deserialize(splitSerializer.getVersion(), serializedSplit));
+ }
+ return result;
}
public int subtaskId() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index e6cb4b995189f..09b12007d3349 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -215,6 +215,8 @@ private enum OperatingMode {
/** Watermark identifier to whether the watermark are aligned. */
private final Map watermarkIsAlignedMap;
+ private final boolean supportsSplitReassignmentOnRecovery;
+
public SourceOperator(
StreamOperatorParameters parameters,
FunctionWithException, Exception>
@@ -227,7 +229,8 @@ public SourceOperator(
String localHostname,
boolean emitProgressiveWatermarks,
CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
- Map watermarkIsAlignedMap) {
+ Map watermarkIsAlignedMap,
+ boolean supportsSplitReassignmentOnRecovery) {
super(parameters);
this.readerFactory = checkNotNull(readerFactory);
this.operatorEventGateway = checkNotNull(operatorEventGateway);
@@ -242,6 +245,7 @@ public SourceOperator(
this.allowUnalignedSourceSplits = configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords);
this.watermarkIsAlignedMap = watermarkIsAlignedMap;
+ this.supportsSplitReassignmentOnRecovery = supportsSplitReassignmentOnRecovery;
}
@Override
@@ -411,7 +415,7 @@ public void open() throws Exception {
// restore the state if necessary.
final List splits = CollectionUtil.iterableToList(readerState.get());
- if (!splits.isEmpty()) {
+ if (!splits.isEmpty() && !supportsSplitReassignmentOnRecovery) {
LOG.info("Restoring state for {} split(s) to reader.", splits.size());
for (SplitT s : splits) {
getOrCreateSplitMetricGroup(s.splitId());
@@ -421,7 +425,7 @@ public void open() throws Exception {
}
// Register the reader to the coordinator.
- registerReader();
+ registerReader(supportsSplitReassignmentOnRecovery ? splits : Collections.emptyList());
sourceMetricGroup.idlingStarted();
// Start the reader after registration, sending messages in start is allowed.
@@ -811,10 +815,13 @@ private boolean shouldWaitForAlignment() {
return currentMaxDesiredWatermark < latestWatermark;
}
- private void registerReader() {
+ private void registerReader(List splits) throws Exception {
operatorEventGateway.sendEventToCoordinator(
- new ReaderRegistrationEvent(
- getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), localHostname));
+ ReaderRegistrationEvent.createReaderRegistrationEvent(
+ getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
+ localHostname,
+ splits,
+ splitSerializer));
}
// --------------- methods for unit tests ------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index caa78b022ce04..6d1cbc8889730 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -27,6 +27,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SupportsSplitReassignmentOnRecovery;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -128,7 +129,8 @@ public > T createStreamOperator(
.getTaskManagerExternalAddress(),
emitProgressiveWatermarks,
parameters.getContainingTask().getCanEmitBatchOfRecords(),
- getSourceWatermarkDeclarations());
+ getSourceWatermarkDeclarations(),
+ source instanceof SupportsSplitReassignmentOnRecovery);
parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator);
@@ -199,7 +201,8 @@ SourceOperator instantiateSourceOperator(
String localHostName,
boolean emitProgressiveWatermarks,
CanEmitBatchOfRecordsChecker canEmitBatchOfRecords,
- Collection extends WatermarkDeclaration> watermarkDeclarations) {
+ Collection extends WatermarkDeclaration> watermarkDeclarations,
+ boolean supportsSplitReassignmentOnRecovery) {
// jumping through generics hoops: cast the generics away to then cast them back more
// strictly typed
@@ -231,6 +234,7 @@ SourceOperator instantiateSourceOperator(
localHostName,
emitProgressiveWatermarks,
canEmitBatchOfRecords,
- watermarkIsAlignedMap);
+ watermarkIsAlignedMap,
+ supportsSplitReassignmentOnRecovery);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index 4b92191d45b58..c4fb85bedce26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -57,6 +57,21 @@ void testRegisterReader() throws Exception {
final TestingSplitEnumerator> enumerator = getEnumerator();
assertThat(enumerator.getRegisteredReaders()).containsExactlyInAnyOrder(0, 1, 2);
+
+ ReaderInfo readerInfoOfSubtask1 =
+ ReaderInfo.createReaderInfo(
+ 1, "subtask_1_location", Collections.singletonList(new MockSourceSplit(1)));
+ sourceCoordinator.subtaskReset(1, 1);
+ sourceCoordinator.handleEventFromOperator(
+ 1,
+ 1,
+ ReaderRegistrationEvent.createReaderRegistrationEvent(
+ readerInfoOfSubtask1.getSubtaskId(),
+ readerInfoOfSubtask1.getLocation(),
+ readerInfoOfSubtask1.getReportedSplitsOnRegistration(),
+ new MockSourceSplitSerializer()));
+ waitForCoordinatorToProcessActions();
+ assertThat(context.registeredReaders().get(1)).isEqualTo(readerInfoOfSubtask1);
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 0bf8e526f3625..b52403c8eae90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -42,9 +42,11 @@
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.concurrent.FutureConsumerWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.jupiter.api.Test;
@@ -598,6 +600,125 @@ class TestDynamicFilteringEvent implements SourceEvent, DynamicFilteringInfo {}
assertThat(coordinator.inferSourceParallelismAsync(2, 1).get()).isEqualTo(2);
}
+ @Test
+ void testDuplicateRedistribution() throws Exception {
+ final List splits =
+ Arrays.asList(
+ new MockSourceSplit(0), new MockSourceSplit(1), new MockSourceSplit(2));
+
+ testRedistribution(
+ (coordinator) -> {
+ registerReader(coordinator, 0, 0, splits);
+ registerReader(coordinator, 1, 0, Collections.emptyList());
+ registerReader(coordinator, 2, 0, Collections.emptyList());
+ waitForCoordinatorToProcessActions();
+ checkAddSplitEvents(new int[][] {new int[] {1}, new int[] {1}, new int[] {1}});
+
+ // duplicate registration
+ registerReader(coordinator, 0, 0, splits);
+ waitForCoordinatorToProcessActions();
+ // split 1,2,3 won't be sent again.
+ checkAddSplitEvents(new int[][] {new int[] {1}, new int[] {1}, new int[] {1}});
+ });
+ }
+
+ @Test
+ void testRedistributionInPartialRestartBeforeAnyCheckpoint() throws Exception {
+ final List splits =
+ Arrays.asList(
+ new MockSourceSplit(0), new MockSourceSplit(1), new MockSourceSplit(2));
+
+ testRedistribution(
+ (coordinator) -> {
+ registerReader(coordinator, 0, 0, splits);
+ registerReader(coordinator, 1, 0, Collections.emptyList());
+ registerReader(coordinator, 2, 0, Collections.emptyList());
+ waitForCoordinatorToProcessActions();
+ checkAddSplitEvents(new int[][] {new int[] {1}, new int[] {1}, new int[] {1}});
+
+ coordinator.subtaskReset(0, 0);
+ setReaderTaskReady(coordinator, 0, 1);
+ registerReader(coordinator, 0, 1, splits);
+ waitForCoordinatorToProcessActions();
+ checkAddSplitEvents(
+ new int[][] {new int[] {1, 1}, new int[] {1}, new int[] {1}});
+ });
+ }
+
+ @Test
+ void testRedistributionInPartialRestartAfterCheckpoint() throws Exception {
+ final List splits =
+ Arrays.asList(
+ new MockSourceSplit(0), new MockSourceSplit(1), new MockSourceSplit(2));
+
+ testRedistribution(
+ (coordinator) -> {
+ registerReader(coordinator, 0, 0, splits);
+ registerReader(coordinator, 1, 0, Collections.emptyList());
+ registerReader(coordinator, 2, 0, Collections.emptyList());
+ waitForCoordinatorToProcessActions();
+ checkAddSplitEvents(new int[][] {new int[] {1}, new int[] {1}, new int[] {1}});
+
+ CompletableFuture fulture = new CompletableFuture<>();
+ coordinator.checkpointCoordinator(1, fulture);
+ fulture.get();
+
+ coordinator.subtaskReset(0, 0);
+ setReaderTaskReady(coordinator, 0, 1);
+ registerReader(
+ coordinator, 0, 1, Collections.singletonList(new MockSourceSplit(0)));
+ waitForCoordinatorToProcessActions();
+ checkAddSplitEvents(
+ new int[][] {new int[] {1, 1}, new int[] {1}, new int[] {1}});
+ });
+ }
+
+ void testRedistribution(
+ FutureConsumerWithException<
+ SourceCoordinator>, Exception>
+ consumer)
+ throws Exception {
+ try (final SplitEnumerator> splitEnumerator =
+ new MockSplitEnumerator(0, context);
+ final SourceCoordinator> coordinator =
+ new SourceCoordinator<>(
+ new JobID(),
+ OPERATOR_NAME,
+ new EnumeratorCreatingSource<>(() -> splitEnumerator),
+ context,
+ new CoordinatorStoreImpl(),
+ WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+ null)) {
+
+ coordinator.start();
+ setAllReaderTasksReady(coordinator);
+
+ consumer.accept(coordinator);
+ }
+ }
+
+ private void checkAddSplitEvents(int[][] expectedAssignedSplitNum) {
+ MockSourceSplitSerializer mockSourceSplitSerializer = new MockSourceSplitSerializer();
+ assertThat(expectedAssignedSplitNum.length).isEqualTo(NUM_SUBTASKS);
+ for (int i = 0; i < NUM_SUBTASKS; i++) {
+ List sentEventsForSubtask = receivingTasks.getSentEventsForSubtask(i);
+ assertThat(sentEventsForSubtask).hasSize(expectedAssignedSplitNum[i].length);
+ for (int j = 0; j < sentEventsForSubtask.size(); j++) {
+ assertThat(sentEventsForSubtask.get(j)).isExactlyInstanceOf(AddSplitEvent.class);
+ List splits;
+ try {
+ splits =
+ ((AddSplitEvent) sentEventsForSubtask.get(j))
+ .splits(mockSourceSplitSerializer);
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+
+ assertThat(splits).hasSize(expectedAssignedSplitNum[i][j]);
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// test helpers
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index 61ece5c357fe9..0fdb564070a26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -39,6 +39,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -141,6 +142,22 @@ protected void registerReader(int subtask, int attemptNumber) {
new ReaderRegistrationEvent(subtask, createLocationFor(subtask, attemptNumber)));
}
+ protected void registerReader(
+ SourceCoordinator> coordinator,
+ int subtask,
+ int attemptNumber,
+ List splits)
+ throws IOException {
+ coordinator.handleEventFromOperator(
+ subtask,
+ attemptNumber,
+ ReaderRegistrationEvent.createReaderRegistrationEvent(
+ subtask,
+ createLocationFor(subtask, attemptNumber),
+ splits,
+ new MockSourceSplitSerializer()));
+ }
+
static String createLocationFor(int subtask, int attemptNumber) {
return String.format("location_%d_%d", subtask, attemptNumber);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
index 8cc98109fdbc6..35621fbf5bcc8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorSplitWatermarkAlignmentTest.java
@@ -99,7 +99,8 @@ void testSplitWatermarkAlignment() throws Exception {
new MockOperatorEventGateway(),
1,
5,
- true);
+ true,
+ false);
operator.initializeState(
new StreamTaskStateInitializerImpl(env, new HashMapStateBackend()));
@@ -547,7 +548,8 @@ private SourceOperator createAndOpenSourceOperatorWith
new MockOperatorEventGateway(),
1,
5,
- true);
+ true,
+ false);
operator.initializeState(
new StreamTaskStateInitializerImpl(env, new HashMapStateBackend()));
operator.open();
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index 25f54a632c55b..aa4d2cf70814b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -43,11 +43,14 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.CollectorOutput;
+import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nullable;
@@ -99,24 +102,46 @@ void testInitializeState() throws Exception {
.isNotNull();
}
- @Test
- void testOpen() throws Exception {
- // Initialize the operator.
- operator.initializeState(context.createStateContext());
- // Open the operator.
- operator.open();
- // The source reader should have been assigned a split.
- assertThat(mockSourceReader.getAssignedSplits())
- .containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
- // The source reader should have started.
- assertThat(mockSourceReader.isStarted()).isTrue();
-
- // A ReaderRegistrationRequest should have been sent.
- assertThat(mockGateway.getEventsSent()).hasSize(1);
- OperatorEvent operatorEvent = mockGateway.getEventsSent().get(0);
- assertThat(operatorEvent).isInstanceOf(ReaderRegistrationEvent.class);
- assertThat(((ReaderRegistrationEvent) operatorEvent).subtaskId())
- .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX);
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testOpen(boolean supportsSplitReassignmentOnRecovery) throws Exception {
+ try (SourceOperatorTestContext context =
+ new SourceOperatorTestContext(
+ false,
+ false,
+ WatermarkStrategy.noWatermarks(),
+ new MockOutput<>(new ArrayList<>()),
+ supportsSplitReassignmentOnRecovery)) {
+ SourceOperator operator = context.getOperator();
+ // Initialize the operator.
+ operator.initializeState(context.createStateContext());
+ // Open the operator.
+ operator.open();
+ // The source reader should have been assigned a split.
+ if (supportsSplitReassignmentOnRecovery) {
+ assertThat(context.getSourceReader().getAssignedSplits()).isEmpty();
+ } else {
+ assertThat(context.getSourceReader().getAssignedSplits())
+ .containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
+ }
+
+ // The source reader should have started.
+ assertThat(context.getSourceReader().isStarted()).isTrue();
+
+ // A ReaderRegistrationRequest should have been sent.
+ assertThat(context.getGateway().getEventsSent()).hasSize(1);
+ OperatorEvent operatorEvent = context.getGateway().getEventsSent().get(0);
+ assertThat(operatorEvent).isInstanceOf(ReaderRegistrationEvent.class);
+ ReaderRegistrationEvent registrationEvent = (ReaderRegistrationEvent) operatorEvent;
+ assertThat(registrationEvent.subtaskId())
+ .isEqualTo(SourceOperatorTestContext.SUBTASK_INDEX);
+ if (supportsSplitReassignmentOnRecovery) {
+ assertThat(registrationEvent.splits(new MockSourceSplitSerializer()))
+ .containsExactly(SourceOperatorTestContext.MOCK_SPLIT);
+ } else {
+ assertThat(registrationEvent.splits(new MockSourceSplitSerializer())).isEmpty();
+ }
+ }
}
@Test
@@ -210,7 +235,8 @@ void testHandleBacklogEvent() throws Exception {
false,
WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element),
- new CollectorOutput<>(outputStreamElements));
+ new CollectorOutput<>(outputStreamElements),
+ false);
operator = context.getOperator();
operator.initializeState(context.createStateContext());
operator.open();
@@ -251,7 +277,7 @@ public void testMetricGroupIsCreatedForNewSplit() throws Exception {
@Test
public void testMetricGroupIsCreatedForRestoredSplit() throws Exception {
- MockSourceSplit restoredSplit = new MockSourceSplit((2));
+ MockSourceSplit restoredSplit = new MockSourceSplit((1));
StateInitializationContext stateContext =
context.createStateContext(Collections.singletonList(restoredSplit));
operator.initializeState(stateContext);
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
index f7e75788eb3d4..6e1985124451f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTestContext.java
@@ -75,14 +75,15 @@ public SourceOperatorTestContext(boolean idle) throws Exception {
public SourceOperatorTestContext(boolean idle, WatermarkStrategy watermarkStrategy)
throws Exception {
- this(idle, false, watermarkStrategy, new MockOutput<>(new ArrayList<>()));
+ this(idle, false, watermarkStrategy, new MockOutput<>(new ArrayList<>()), false);
}
public SourceOperatorTestContext(
boolean idle,
boolean usePerSplitOutputs,
WatermarkStrategy watermarkStrategy,
- Output> output)
+ Output> output,
+ boolean supportsSplitReassignmentOnRecovery)
throws Exception {
mockSourceReader =
new MockSourceReader(
@@ -109,7 +110,8 @@ public SourceOperatorTestContext(
mockGateway,
SUBTASK_INDEX,
5,
- true);
+ true,
+ supportsSplitReassignmentOnRecovery);
operator.initializeState(
new StreamTaskStateInitializerImpl(env, new HashMapStateBackend()));
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
index 63b7b8ff3712b..7bef5d80c6cae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorWatermarksTest.java
@@ -54,7 +54,8 @@ void setup() throws Exception {
new SourceOperatorAlignmentTest
.PunctuatedGenerator())
.withTimestampAssigner((r, t) -> r),
- new MockOutput<>(new ArrayList<>()));
+ new MockOutput<>(new ArrayList<>()),
+ false);
operator = context.getOperator();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 40a9bfe948677..dc8ebc9806e52 100644
--- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -64,7 +64,8 @@ public TestingSourceOperator(
SourceReader reader,
WatermarkStrategy watermarkStrategy,
ProcessingTimeService timeService,
- boolean emitProgressiveWatermarks) {
+ boolean emitProgressiveWatermarks,
+ boolean supportsSplitReassignmentOnRecovery) {
this(
parameters,
@@ -74,7 +75,8 @@ public TestingSourceOperator(
new MockOperatorEventGateway(),
1,
5,
- emitProgressiveWatermarks);
+ emitProgressiveWatermarks,
+ supportsSplitReassignmentOnRecovery);
}
public TestingSourceOperator(
@@ -85,7 +87,8 @@ public TestingSourceOperator(
OperatorEventGateway eventGateway,
int subtaskIndex,
int parallelism,
- boolean emitProgressiveWatermarks) {
+ boolean emitProgressiveWatermarks,
+ boolean supportsSplitReassignmentOnRecovery) {
super(
parameters,
@@ -98,7 +101,8 @@ public TestingSourceOperator(
"localhost",
emitProgressiveWatermarks,
() -> false,
- Collections.emptyMap());
+ Collections.emptyMap(),
+ supportsSplitReassignmentOnRecovery);
this.subtaskIndex = subtaskIndex;
this.parallelism = parallelism;
@@ -130,6 +134,15 @@ public static SourceOperator createTestOperator(
WatermarkStrategy watermarkStrategy,
boolean emitProgressiveWatermarks)
throws Exception {
+ return createTestOperator(reader, watermarkStrategy, emitProgressiveWatermarks, false);
+ }
+
+ public static SourceOperator createTestOperator(
+ SourceReader reader,
+ WatermarkStrategy watermarkStrategy,
+ boolean emitProgressiveWatermarks,
+ boolean supportsSplitReassignmentOnRecovery)
+ throws Exception {
AbstractStateBackend abstractStateBackend = new HashMapStateBackend();
Environment env = new MockEnvironmentBuilder().build();
@@ -168,7 +181,8 @@ public static SourceOperator createTestOperator(
reader,
watermarkStrategy,
timeService,
- emitProgressiveWatermarks);
+ emitProgressiveWatermarks,
+ supportsSplitReassignmentOnRecovery);
sourceOperator.initializeState(stateContext);
sourceOperator.open();