diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index dc323f019b1fd..12a3355840127 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -85,7 +85,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static java.util.Arrays.asList; @@ -109,7 +108,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -1985,53 +1983,31 @@ public void shouldComputeOffsetSumForUnassignedTaskWeCanLock() throws Exception assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums)); } - @Test - public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() throws Exception { + @ParameterizedTest + @EnumSource(value = State.class, names = {"CREATED", "CLOSED"}) + public void shouldComputeOffsetSumFromCheckpointFileForCreatedAndClosedTasks(final State state) throws Exception { final Map changelogOffsets = mkMap( mkEntry(new TopicPartition("changelog", 0), 5L), mkEntry(new TopicPartition("changelog", 1), 10L) ); final Map expectedOffsetSums = mkMap(mkEntry(taskId00, 15L)); - expectLockObtainedFor(taskId00); - makeTaskFolders(taskId00.toString()); - writeCheckpointFile(taskId00, changelogOffsets); - - taskManager.handleRebalanceStart(singleton("topic")); - final StateMachineTask uninitializedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singleton(uninitializedTask)); - - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - - assertThat(uninitializedTask.state(), is(State.CREATED)); + final StreamTask task = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(state) + .withInputPartitions(taskId00Partitions) + .build(); - assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums)); - } + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, task))); - @Test - public void shouldComputeOffsetSumFromCheckpointFileForClosedTask() throws Exception { - final Map changelogOffsets = mkMap( - mkEntry(new TopicPartition("changelog", 0), 5L), - mkEntry(new TopicPartition("changelog", 1), 10L) - ); - final Map expectedOffsetSums = mkMap(mkEntry(taskId00, 15L)); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); expectLockObtainedFor(taskId00); makeTaskFolders(taskId00.toString()); writeCheckpointFile(taskId00, changelogOffsets); - final StateMachineTask closedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - taskManager.handleRebalanceStart(singleton("topic")); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singleton(closedTask)); - - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - - closedTask.suspend(); - closedTask.closeClean(); - assertThat(closedTask.state(), is(State.CLOSED)); - assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums)); } @@ -4051,21 +4027,22 @@ public void shouldNotSendPurgeDataIfPreviousNotDone() { @Test public void shouldIgnorePurgeDataErrors() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - - when(consumer.assignment()).thenReturn(assignment); - final KafkaFutureImpl futureDeletedRecords = new KafkaFutureImpl<>(); final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords)); futureDeletedRecords.completeExceptionally(new Exception("KABOOM!")); when(adminClient.deleteRecords(any())).thenReturn(deleteRecordsResult); - taskManager.addTask(task00); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); - assertThat(task00.state(), is(Task.State.RUNNING)); + when(task00.purgeableOffsets()).thenReturn(singletonMap(t1p1, 5L)); - task00.setPurgeableOffsets(singletonMap(t1p1, 5L)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task00)); + + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.maybePurgeCommittedRecords(); taskManager.maybePurgeCommittedRecords(); @@ -4151,44 +4128,38 @@ public void shouldMaybeCommitAllActiveTasksThatNeedCommit() { @Test public void shouldProcessActiveTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); - - final Map> firstAssignment = new HashMap<>(); - firstAssignment.put(taskId00, taskId00Partitions); - firstAssignment.put(taskId01, taskId01Partitions); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions) + .build(); - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(firstAssignment))) - .thenReturn(Arrays.asList(task00, task01)); + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId01Partitions) + .build(); - taskManager.handleAssignment(firstAssignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + // simulate processing records from the queue + when(task00.process(anyLong())) + .thenReturn(true) // record 1 + .thenReturn(true) // record 2 + .thenReturn(true) // record 3 + .thenReturn(true) // record 4 + .thenReturn(true) // record 5 + .thenReturn(true) // record 6 + .thenReturn(false); // no more records + + when(task01.process(anyLong())) + .thenReturn(true) // record 1 + .thenReturn(true) // record 2 + .thenReturn(true) // record 3 + .thenReturn(true) // record 4 + .thenReturn(true) // record 5 + .thenReturn(false); // no more records - assertThat(task00.state(), is(Task.State.RUNNING)); - assertThat(task01.state(), is(Task.State.RUNNING)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.activeTasks()).thenReturn(Set.of(task00, task01)); - task00.addRecords( - t1p0, - Arrays.asList( - getConsumerRecord(t1p0, 0L), - getConsumerRecord(t1p0, 1L), - getConsumerRecord(t1p0, 2L), - getConsumerRecord(t1p0, 3L), - getConsumerRecord(t1p0, 4L), - getConsumerRecord(t1p0, 5L) - ) - ); - task01.addRecords( - t1p1, - Arrays.asList( - getConsumerRecord(t1p1, 0L), - getConsumerRecord(t1p1, 1L), - getConsumerRecord(t1p1, 2L), - getConsumerRecord(t1p1, 3L), - getConsumerRecord(t1p1, 4L) - ) - ); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); // check that we should be processing at most max num records assertThat(taskManager.process(3, time), is(6)); @@ -4200,62 +4171,48 @@ public void shouldProcessActiveTasks() { @Test public void shouldNotFailOnTimeoutException() { - final AtomicReference timeoutException = new AtomicReference<>(); - timeoutException.set(new TimeoutException("Skip me!")); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions) + .build(); + // throws TimeoutException on first call, then processes 2 records + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId01Partitions) + .build(); + final StreamTask task02 = statefulTask(taskId02, taskId02ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId02Partitions) + .build(); - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - task00.transitionTo(State.RESTORING); - task00.transitionTo(State.RUNNING); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { - @Override - public boolean process(final long wallClockTime) { - final TimeoutException exception = timeoutException.get(); - if (exception != null) { - throw exception; - } - return true; - } - }; - task01.transitionTo(State.RESTORING); - task01.transitionTo(State.RUNNING); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true, stateManager); - task02.transitionTo(State.RESTORING); - task02.transitionTo(State.RUNNING); + when(task00.process(anyLong())) + .thenReturn(true) + .thenReturn(true) + .thenReturn(false); - taskManager.addTask(task00); - taskManager.addTask(task01); - taskManager.addTask(task02); + when(task01.process(anyLong())) + .thenThrow(new TimeoutException("Skip me!")) // throws TimeoutException + .thenReturn(true) + .thenReturn(true) + .thenReturn(false); - task00.addRecords( - t1p0, - Arrays.asList( - getConsumerRecord(t1p0, 0L), - getConsumerRecord(t1p0, 1L) - ) - ); - task01.addRecords( - t1p1, - Arrays.asList( - getConsumerRecord(t1p1, 0L), - getConsumerRecord(t1p1, 1L) - ) - ); - task02.addRecords( - t1p2, - Arrays.asList( - getConsumerRecord(t1p2, 0L), - getConsumerRecord(t1p2, 1L) - ) - ); + when(task02.process(anyLong())) + .thenReturn(true) + .thenReturn(true) + .thenReturn(false); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02)); + + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); // should only process 2 records, because task01 throws TimeoutException assertThat(taskManager.process(1, time), is(2)); - assertThat(task01.timeout, equalTo(time.milliseconds())); + verify(task01).maybeInitTaskTimeoutOrThrow(anyLong(), any(TimeoutException.class)); // retry without error - timeoutException.set(null); assertThat(taskManager.process(1, time), is(3)); - assertThat(task01.timeout, equalTo(null)); + verify(task01).clearTaskTimeout(); // there should still be one record for task01 to be processed assertThat(taskManager.process(1, time), is(1)); @@ -4263,47 +4220,35 @@ public boolean process(final long wallClockTime) { @Test public void shouldPropagateTaskMigratedExceptionsInProcessActiveTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public boolean process(final long wallClockTime) { - throw new TaskMigratedException("migrated", new RuntimeException("cause")); - } - }; - - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions) + .build(); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + when(task00.process(anyLong())) + .thenThrow(new TaskMigratedException("migrated", new RuntimeException("cause"))); - assertThat(task00.state(), is(Task.State.RUNNING)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.activeTasks()).thenReturn(Set.of(task00)); - final TopicPartition partition = taskId00Partitions.iterator().next(); - task00.addRecords(partition, singletonList(getConsumerRecord(partition, 0L))); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); assertThrows(TaskMigratedException.class, () -> taskManager.process(1, time)); } @Test public void shouldWrapRuntimeExceptionsInProcessActiveTasksAndSetTaskId() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public boolean process(final long wallClockTime) { - throw new RuntimeException("oops"); - } - }; + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions) + .build(); - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))) - .thenReturn(singletonList(task00)); + when(task00.process(anyLong())).thenThrow(new RuntimeException("oops")); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - - assertThat(task00.state(), is(Task.State.RUNNING)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.activeTasks()).thenReturn(Set.of(task00)); - final TopicPartition partition = taskId00Partitions.iterator().next(); - task00.addRecords(partition, singletonList(getConsumerRecord(partition, 0L))); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); final StreamsException exception = assertThrows(StreamsException.class, () -> taskManager.process(1, time)); assertThat(exception.taskId().isPresent(), is(true)); @@ -4313,42 +4258,37 @@ public boolean process(final long wallClockTime) { @Test public void shouldPropagateTaskMigratedExceptionsInPunctuateActiveTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public boolean maybePunctuateStreamTime() { - throw new TaskMigratedException("migrated", new RuntimeException("cause")); - } - }; + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions) + .build(); - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + when(task00.maybePunctuateStreamTime()) + .thenThrow(new TaskMigratedException("migrated", new RuntimeException("cause"))); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.activeTasks()).thenReturn(Set.of(task00)); - assertThat(task00.state(), is(Task.State.RUNNING)); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - assertThrows(TaskMigratedException.class, () -> taskManager.punctuate()); + assertThrows(TaskMigratedException.class, taskManager::punctuate); } @Test public void shouldPropagateKafkaExceptionsInPunctuateActiveTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public boolean maybePunctuateStreamTime() { - throw new KafkaException("oops"); - } - }; + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions) + .build(); - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + when(task00.maybePunctuateStreamTime()).thenThrow(new KafkaException("oops")); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.activeTasks()).thenReturn(Set.of(task00)); - assertThat(task00.state(), is(Task.State.RUNNING)); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - assertThrows(KafkaException.class, () -> taskManager.punctuate()); + assertThrows(KafkaException.class, taskManager::punctuate); } @Test @@ -4375,38 +4315,37 @@ public void shouldPunctuateActiveTasks() { @Test public void shouldReturnFalseWhenThereAreStillNonRunningTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public Set changelogPartitions() { - return singleton(new TopicPartition("fake", 0)); - } - }; + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + // mock that the state updater is still restoring active tasks + when(stateUpdater.restoresActiveTasks()).thenReturn(true); + + assertThat(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter), is(false)); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false)); - assertThat(task00.state(), is(Task.State.RESTORING)); verifyNoInteractions(consumer); } @Test public void shouldHaveRemainingPartitionsUncleared() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions) + .build(); + final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); - task00.setCommittableOffsetsAndMetadata(offsets); + when(task00.prepareCommit(false)).thenReturn(offsets); - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + when(tasks.allTasks()).thenReturn(Set.of(task00)); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(TaskManager.class)) { appender.setClassLogger(TaskManager.class, Level.DEBUG); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); - assertThat(task00.state(), is(Task.State.RUNNING)); taskManager.handleRevocation(Set.of(t1p0, new TopicPartition("unknown", 0))); - assertThat(task00.state(), is(Task.State.SUSPENDED)); + + verify(task00).suspend(); final List messages = appender.getMessages(); assertThat( @@ -4596,17 +4535,26 @@ private void expectDirectoryNotEmpty(final TaskId... tasks) { @Test public void shouldThrowTaskMigratedExceptionOnCommitFailed() { - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING) + .build(); + final Map offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); - task01.setCommittableOffsetsAndMetadata(offsets); - task01.setCommitNeeded(); - taskManager.addTask(task01); + + when(task01.commitNeeded()).thenReturn(true); + when(task01.prepareCommit(true)).thenReturn(offsets); + + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.allTasks()).thenReturn(Set.of(task01)); + + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); doThrow(new CommitFailedException()).when(consumer).commitSync(offsets); final TaskMigratedException thrown = assertThrows( TaskMigratedException.class, - () -> taskManager.commitAll() + taskManager::commitAll ); assertThat(thrown.getCause(), instanceOf(CommitFailedException.class)); @@ -4615,29 +4563,38 @@ public void shouldThrowTaskMigratedExceptionOnCommitFailed() { equalTo("Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group;" + " it means all tasks belonging to this thread should be migrated.") ); - assertThat(task01.state(), is(Task.State.CREATED)); } @SuppressWarnings("unchecked") @Test public void shouldNotFailForTimeoutExceptionOnConsumerCommit() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); - task00.setCommittableOffsetsAndMetadata(taskId00Partitions.stream().collect(Collectors.toMap(p -> p, p -> new OffsetAndMetadata(0)))); - task01.setCommittableOffsetsAndMetadata(taskId00Partitions.stream().collect(Collectors.toMap(p -> p, p -> new OffsetAndMetadata(0)))); + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING) + .build(); - doThrow(new TimeoutException("KABOOM!")).doNothing().when(consumer).commitSync(any(Map.class)); + final Map offsets = taskId00Partitions.stream() + .collect(Collectors.toMap(p -> p, p -> new OffsetAndMetadata(0))); - task00.setCommitNeeded(); + when(task00.commitNeeded()).thenReturn(true); + when(task00.prepareCommit(true)).thenReturn(offsets); + when(task01.commitNeeded()).thenReturn(false); + + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); + + doThrow(new TimeoutException("KABOOM!")).doNothing().when(consumer).commitSync(any(Map.class)); assertThat(taskManager.commit(Set.of(task00, task01)), equalTo(0)); - assertThat(task00.timeout, equalTo(time.milliseconds())); - assertNull(task01.timeout); + verify(task00).maybeInitTaskTimeoutOrThrow(anyLong(), any(TimeoutException.class)); assertThat(taskManager.commit(Set.of(task00, task01)), equalTo(1)); - assertNull(task00.timeout); - assertNull(task01.timeout); + verify(task00).clearTaskTimeout(); verify(consumer, times(2)).commitSync(any(Map.class)); }