diff --git a/gradle.properties b/gradle.properties index bdf09354..426e0ee5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -40,7 +40,7 @@ testcontainersVersion=1.15.3 # Version and base tags can be overridden at build time. connectorVersion=0.13.0-SNAPSHOT -pravegaVersion=0.13.0-3151.28c485f-SNAPSHOT +pravegaVersion=0.13.0-3184.23d8ef6-SNAPSHOT schemaRegistryVersion=0.6.0-88.65039cd-SNAPSHOT apacheCommonsVersion=3.7 diff --git a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java b/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java index 91a0883f..d1b449c1 100644 --- a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java +++ b/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java @@ -21,6 +21,7 @@ import io.pravega.client.stream.ReaderGroup; import io.pravega.client.stream.ReaderGroupConfig; import io.pravega.client.stream.ReaderGroupNotFoundException; +import io.pravega.client.stream.impl.MaxNumberOfCheckpointsExceededException; import io.pravega.connectors.flink.serialization.CheckpointSerializer; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -113,7 +114,13 @@ public CompletableFuture triggerCheckpoint( final String checkpointName = createCheckpointName(checkpointId); final CompletableFuture checkpointResult = - this.readerGroup.initiateCheckpoint(checkpointName, scheduledExecutorService); + this.readerGroup.initiateCheckpoint(checkpointName, scheduledExecutorService) + .exceptionally(e -> { + if (e instanceof MaxNumberOfCheckpointsExceededException) { + readerGroup.cancelOutstandingCheckpoints(); + } + return null; + }); // Add a timeout to the future, to prevent long blocking calls scheduledExecutorService.schedule(() -> checkpointResult.cancel(false), triggerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); diff --git a/src/test/java/io/pravega/connectors/flink/ReaderCheckpointHookTest.java b/src/test/java/io/pravega/connectors/flink/ReaderCheckpointHookTest.java index 21e194fd..77a878e9 100644 --- a/src/test/java/io/pravega/connectors/flink/ReaderCheckpointHookTest.java +++ b/src/test/java/io/pravega/connectors/flink/ReaderCheckpointHookTest.java @@ -24,6 +24,7 @@ import io.pravega.client.stream.Stream; import io.pravega.client.stream.StreamCut; import io.pravega.client.stream.impl.CheckpointImpl; +import io.pravega.client.stream.impl.MaxNumberOfCheckpointsExceededException; import io.pravega.client.stream.impl.StreamCutImpl; import io.pravega.connectors.flink.serialization.CheckpointSerializer; import org.apache.flink.api.common.time.Time; @@ -95,6 +96,24 @@ public void testTriggerCheckpointTimeout() throws Exception { assertThat(checkpointFuture.isCancelled()).isTrue(); } + @Test + public void testCancelWhenExceedingMaxOutstandingCheckpoints() throws Exception { + ReaderGroupConfig readerGroupConfig = mock(ReaderGroupConfig.class); + ClientConfig clientConfig = mock(ClientConfig.class); + CompletableFuture checkpointPromise = new CompletableFuture<>(); + checkpointPromise.completeExceptionally(new MaxNumberOfCheckpointsExceededException("test")); + + TestableReaderCheckpointHook hook = new TestableReaderCheckpointHook(HOOK_UID, READER_GROUP_NAME, SCOPE, Time.minutes(1), clientConfig, readerGroupConfig); + when(hook.readerGroup.initiateCheckpoint(anyString(), any())).thenReturn(checkpointPromise); + + CompletableFuture checkpointFuture = hook.triggerCheckpoint(1L, 1L, Executors.directExecutor()); + assertThat(checkpointFuture).isNotNull(); + verify(hook.readerGroup).initiateCheckpoint(anyString(), any()); + + // invoke the cancelOutstandingCheckpoints + verify(hook.readerGroup).cancelOutstandingCheckpoints(); + } + @Test public void testReset() { ReaderGroupConfig readerGroupConfig = mock(ReaderGroupConfig.class);