Skip to content

Commit b7acd9f

Browse files
authored
[FLINK-38404][test] Fix JobMasterTest by making it scheduler-type aware (#27091)
1 parent c3bf061 commit b7acd9f

File tree

1 file changed

+60
-52
lines changed

1 file changed

+60
-52
lines changed

flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java

Lines changed: 60 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@
184184
import java.util.stream.Collectors;
185185
import java.util.stream.IntStream;
186186

187+
import static org.apache.flink.configuration.JobManagerOptions.SchedulerType;
187188
import static org.apache.flink.configuration.RestartStrategyOptions.RestartStrategyType.FIXED_DELAY;
188189
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
189190
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
@@ -1691,40 +1692,17 @@ private TestingResourceManagerGateway createAndRegisterTestingResourceManagerGat
16911692
@Test
16921693
void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
16931694
final List<Event> jobEvents = new ArrayList<>();
1694-
runJobFailureWhenTaskExecutorTerminatesTest(
1695-
heartbeatServices,
1696-
(localTaskManagerLocation, jobMasterGateway) ->
1697-
jobMasterGateway.disconnectTaskManager(
1698-
localTaskManagerLocation.getResourceID(),
1699-
new FlinkException("Test disconnectTaskManager exception.")),
1700-
jobEvents);
1701-
assertThat(
1702-
jobEvents.stream()
1703-
.filter(
1704-
event ->
1705-
Events.JobStatusChangeEvent.name()
1706-
.equals(event.getName()))
1707-
.map(Event::getAttributes)
1708-
.map(x -> x.get("newJobStatus")))
1709-
.containsExactly(
1710-
JobStatus.RUNNING.toString(),
1711-
JobStatus.FAILING.toString(),
1712-
JobStatus.FAILED.toString());
1695+
final SchedulerType schedulerType =
1696+
runJobFailureWhenTaskExecutorTerminatesTest(
1697+
heartbeatServices,
1698+
(localTaskManagerLocation, jobMasterGateway) ->
1699+
jobMasterGateway.disconnectTaskManager(
1700+
localTaskManagerLocation.getResourceID(),
1701+
new FlinkException(
1702+
"Test disconnectTaskManager exception.")),
1703+
jobEvents);
17131704

1714-
assertThat(
1715-
jobEvents.stream()
1716-
.filter(
1717-
event ->
1718-
Events.AllSubtasksStatusChangeEvent.name()
1719-
.equals(event.getName()))
1720-
.map(Event::getAttributes)
1721-
.map(
1722-
x ->
1723-
x.get(
1724-
AllSubTasksRunningOrFinishedStateTimeMetrics
1725-
.STATUS_ATTRIBUTE)))
1726-
.containsExactly(
1727-
ALL_RUNNING_OR_FINISHED.toString(), NOT_ALL_RUNNING_OR_FINISHED.toString());
1705+
assertJobStatusTransitions(schedulerType, jobEvents);
17281706
}
17291707

17301708
@Test
@@ -1733,24 +1711,47 @@ void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception {
17331711
final TestingHeartbeatServices testingHeartbeatService =
17341712
new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
17351713

1736-
runJobFailureWhenTaskExecutorTerminatesTest(
1737-
testingHeartbeatService,
1738-
(localTaskManagerLocation, jobMasterGateway) ->
1739-
testingHeartbeatService.triggerHeartbeatTimeout(
1740-
jmResourceId, localTaskManagerLocation.getResourceID()),
1741-
jobEvents);
1742-
assertThat(
1743-
jobEvents.stream()
1744-
.filter(
1745-
event ->
1746-
Events.JobStatusChangeEvent.name()
1747-
.equals(event.getName()))
1748-
.map(Event::getAttributes)
1749-
.map(x -> x.get("newJobStatus")))
1750-
.containsExactly(
1751-
JobStatus.RUNNING.toString(),
1752-
JobStatus.FAILING.toString(),
1753-
JobStatus.FAILED.toString());
1714+
final SchedulerType schedulerType =
1715+
runJobFailureWhenTaskExecutorTerminatesTest(
1716+
testingHeartbeatService,
1717+
(localTaskManagerLocation, jobMasterGateway) ->
1718+
testingHeartbeatService.triggerHeartbeatTimeout(
1719+
jmResourceId, localTaskManagerLocation.getResourceID()),
1720+
jobEvents);
1721+
1722+
assertJobStatusTransitions(schedulerType, jobEvents);
1723+
}
1724+
1725+
/**
1726+
* Asserts that job status transitions are as expected based on the scheduler type.
1727+
* DefaultScheduler does not emit CREATED state, while AdaptiveScheduler and
1728+
* AdaptiveBatchScheduler do.
1729+
*/
1730+
private static void assertJobStatusTransitions(
1731+
SchedulerType schedulerType, List<Event> jobEvents) {
1732+
final List<String> jobStatusTransitions =
1733+
jobEvents.stream()
1734+
.filter(event -> Events.JobStatusChangeEvent.name().equals(event.getName()))
1735+
.map(Event::getAttributes)
1736+
.map(x -> (String) x.get("newJobStatus"))
1737+
.collect(Collectors.toList());
1738+
1739+
if (schedulerType == SchedulerType.Adaptive) {
1740+
// Adaptive schedulers emit CREATED: CREATED → RUNNING → FAILING → FAILED
1741+
assertThat(jobStatusTransitions)
1742+
.containsExactly(
1743+
JobStatus.CREATED.toString(),
1744+
JobStatus.RUNNING.toString(),
1745+
JobStatus.FAILING.toString(),
1746+
JobStatus.FAILED.toString());
1747+
} else {
1748+
// Default scheduler does not emit CREATED: RUNNING → FAILING → FAILED
1749+
assertThat(jobStatusTransitions)
1750+
.containsExactly(
1751+
JobStatus.RUNNING.toString(),
1752+
JobStatus.FAILING.toString(),
1753+
JobStatus.FAILED.toString());
1754+
}
17541755

17551756
assertThat(
17561757
jobEvents.stream()
@@ -2555,7 +2556,7 @@ private TestingResourceManagerGateway createResourceManagerGateway(
25552556
return resourceManagerGateway;
25562557
}
25572558

2558-
private void runJobFailureWhenTaskExecutorTerminatesTest(
2559+
private SchedulerType runJobFailureWhenTaskExecutorTerminatesTest(
25592560
HeartbeatServices heartbeatServices,
25602561
BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway> jobReachedRunningState,
25612562
List<Event> jobEventsOut)
@@ -2564,12 +2565,17 @@ private void runJobFailureWhenTaskExecutorTerminatesTest(
25642565
final JobMasterBuilder.TestingOnCompletionActions onCompletionActions =
25652566
new JobMasterBuilder.TestingOnCompletionActions();
25662567

2568+
final SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory =
2569+
DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
2570+
configuration, jobGraph.getJobType(), jobGraph.isDynamic());
2571+
25672572
try (final JobMaster jobMaster =
25682573
new JobMasterBuilder(jobGraph, rpcService)
25692574
.withResourceId(jmResourceId)
25702575
.withHighAvailabilityServices(haServices)
25712576
.withHeartbeatServices(heartbeatServices)
25722577
.withOnCompletionActions(onCompletionActions)
2578+
.withSlotPoolServiceSchedulerFactory(slotPoolServiceSchedulerFactory)
25732579
.withMetricsGroupFactory(
25742580
new JobManagerJobMetricGroupFactory() {
25752581
@Override
@@ -2636,6 +2642,8 @@ public void addEvent(EventBuilder eventBuilder) {
26362642

26372643
assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
26382644
}
2645+
2646+
return slotPoolServiceSchedulerFactory.getSchedulerType();
26392647
}
26402648

26412649
private Collection<SlotOffer> registerSlotsAtJobMaster(

0 commit comments

Comments
 (0)