diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 64e2bc6c69661..ebacc4847d92c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -184,6 +184,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.flink.configuration.JobManagerOptions.SchedulerType; import static org.apache.flink.configuration.RestartStrategyOptions.RestartStrategyType.FIXED_DELAY; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; @@ -1691,40 +1692,17 @@ private TestingResourceManagerGateway createAndRegisterTestingResourceManagerGat @Test void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception { final List jobEvents = new ArrayList<>(); - runJobFailureWhenTaskExecutorTerminatesTest( - heartbeatServices, - (localTaskManagerLocation, jobMasterGateway) -> - jobMasterGateway.disconnectTaskManager( - localTaskManagerLocation.getResourceID(), - new FlinkException("Test disconnectTaskManager exception.")), - jobEvents); - assertThat( - jobEvents.stream() - .filter( - event -> - Events.JobStatusChangeEvent.name() - .equals(event.getName())) - .map(Event::getAttributes) - .map(x -> x.get("newJobStatus"))) - .containsExactly( - JobStatus.RUNNING.toString(), - JobStatus.FAILING.toString(), - JobStatus.FAILED.toString()); + final SchedulerType schedulerType = + runJobFailureWhenTaskExecutorTerminatesTest( + heartbeatServices, + (localTaskManagerLocation, jobMasterGateway) -> + jobMasterGateway.disconnectTaskManager( + localTaskManagerLocation.getResourceID(), + new FlinkException( + "Test disconnectTaskManager exception.")), + jobEvents); - assertThat( - jobEvents.stream() - .filter( - event -> - Events.AllSubtasksStatusChangeEvent.name() - .equals(event.getName())) - .map(Event::getAttributes) - .map( - x -> - x.get( - AllSubTasksRunningOrFinishedStateTimeMetrics - .STATUS_ATTRIBUTE))) - .containsExactly( - ALL_RUNNING_OR_FINISHED.toString(), NOT_ALL_RUNNING_OR_FINISHED.toString()); + assertJobStatusTransitions(schedulerType, jobEvents); } @Test @@ -1733,24 +1711,47 @@ void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception { final TestingHeartbeatServices testingHeartbeatService = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout); - runJobFailureWhenTaskExecutorTerminatesTest( - testingHeartbeatService, - (localTaskManagerLocation, jobMasterGateway) -> - testingHeartbeatService.triggerHeartbeatTimeout( - jmResourceId, localTaskManagerLocation.getResourceID()), - jobEvents); - assertThat( - jobEvents.stream() - .filter( - event -> - Events.JobStatusChangeEvent.name() - .equals(event.getName())) - .map(Event::getAttributes) - .map(x -> x.get("newJobStatus"))) - .containsExactly( - JobStatus.RUNNING.toString(), - JobStatus.FAILING.toString(), - JobStatus.FAILED.toString()); + final SchedulerType schedulerType = + runJobFailureWhenTaskExecutorTerminatesTest( + testingHeartbeatService, + (localTaskManagerLocation, jobMasterGateway) -> + testingHeartbeatService.triggerHeartbeatTimeout( + jmResourceId, localTaskManagerLocation.getResourceID()), + jobEvents); + + assertJobStatusTransitions(schedulerType, jobEvents); + } + + /** + * Asserts that job status transitions are as expected based on the scheduler type. + * DefaultScheduler does not emit CREATED state, while AdaptiveScheduler and + * AdaptiveBatchScheduler do. + */ + private static void assertJobStatusTransitions( + SchedulerType schedulerType, List jobEvents) { + final List jobStatusTransitions = + jobEvents.stream() + .filter(event -> Events.JobStatusChangeEvent.name().equals(event.getName())) + .map(Event::getAttributes) + .map(x -> (String) x.get("newJobStatus")) + .collect(Collectors.toList()); + + if (schedulerType == SchedulerType.Adaptive) { + // Adaptive schedulers emit CREATED: CREATED → RUNNING → FAILING → FAILED + assertThat(jobStatusTransitions) + .containsExactly( + JobStatus.CREATED.toString(), + JobStatus.RUNNING.toString(), + JobStatus.FAILING.toString(), + JobStatus.FAILED.toString()); + } else { + // Default scheduler does not emit CREATED: RUNNING → FAILING → FAILED + assertThat(jobStatusTransitions) + .containsExactly( + JobStatus.RUNNING.toString(), + JobStatus.FAILING.toString(), + JobStatus.FAILED.toString()); + } assertThat( jobEvents.stream() @@ -2555,7 +2556,7 @@ private TestingResourceManagerGateway createResourceManagerGateway( return resourceManagerGateway; } - private void runJobFailureWhenTaskExecutorTerminatesTest( + private SchedulerType runJobFailureWhenTaskExecutorTerminatesTest( HeartbeatServices heartbeatServices, BiConsumer jobReachedRunningState, List jobEventsOut) @@ -2564,12 +2565,17 @@ private void runJobFailureWhenTaskExecutorTerminatesTest( final JobMasterBuilder.TestingOnCompletionActions onCompletionActions = new JobMasterBuilder.TestingOnCompletionActions(); + final SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory = + DefaultSlotPoolServiceSchedulerFactory.fromConfiguration( + configuration, jobGraph.getJobType(), jobGraph.isDynamic()); + try (final JobMaster jobMaster = new JobMasterBuilder(jobGraph, rpcService) .withResourceId(jmResourceId) .withHighAvailabilityServices(haServices) .withHeartbeatServices(heartbeatServices) .withOnCompletionActions(onCompletionActions) + .withSlotPoolServiceSchedulerFactory(slotPoolServiceSchedulerFactory) .withMetricsGroupFactory( new JobManagerJobMetricGroupFactory() { @Override @@ -2636,6 +2642,8 @@ public void addEvent(EventBuilder eventBuilder) { assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED); } + + return slotPoolServiceSchedulerFactory.getSchedulerType(); } private Collection registerSlotsAtJobMaster(