Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.configuration.JobManagerOptions.SchedulerType;
import static org.apache.flink.configuration.RestartStrategyOptions.RestartStrategyType.FIXED_DELAY;
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
Expand Down Expand Up @@ -1691,40 +1692,17 @@ private TestingResourceManagerGateway createAndRegisterTestingResourceManagerGat
@Test
void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
final List<Event> jobEvents = new ArrayList<>();
runJobFailureWhenTaskExecutorTerminatesTest(
heartbeatServices,
(localTaskManagerLocation, jobMasterGateway) ->
jobMasterGateway.disconnectTaskManager(
localTaskManagerLocation.getResourceID(),
new FlinkException("Test disconnectTaskManager exception.")),
jobEvents);
assertThat(
jobEvents.stream()
.filter(
event ->
Events.JobStatusChangeEvent.name()
.equals(event.getName()))
.map(Event::getAttributes)
.map(x -> x.get("newJobStatus")))
.containsExactly(
JobStatus.RUNNING.toString(),
JobStatus.FAILING.toString(),
JobStatus.FAILED.toString());
final SchedulerType schedulerType =
runJobFailureWhenTaskExecutorTerminatesTest(
heartbeatServices,
(localTaskManagerLocation, jobMasterGateway) ->
jobMasterGateway.disconnectTaskManager(
localTaskManagerLocation.getResourceID(),
new FlinkException(
"Test disconnectTaskManager exception.")),
jobEvents);

assertThat(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed something—this part seems to have been omitted? @liuml07

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is part of the new helper function assertJobStatusTransitions(schedulerType, jobEvents) as this assertion is also about status change and it's duplicate between the two tests, so just reused it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is part of the new helper function assertJobStatusTransitions(schedulerType, jobEvents) as this assertion is also about status change and it's duplicate between the two tests, so just reused it.

Got it, LGTM! 😊

jobEvents.stream()
.filter(
event ->
Events.AllSubtasksStatusChangeEvent.name()
.equals(event.getName()))
.map(Event::getAttributes)
.map(
x ->
x.get(
AllSubTasksRunningOrFinishedStateTimeMetrics
.STATUS_ATTRIBUTE)))
.containsExactly(
ALL_RUNNING_OR_FINISHED.toString(), NOT_ALL_RUNNING_OR_FINISHED.toString());
assertJobStatusTransitions(schedulerType, jobEvents);
}

@Test
Expand All @@ -1733,24 +1711,47 @@ void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception {
final TestingHeartbeatServices testingHeartbeatService =
new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout);

runJobFailureWhenTaskExecutorTerminatesTest(
testingHeartbeatService,
(localTaskManagerLocation, jobMasterGateway) ->
testingHeartbeatService.triggerHeartbeatTimeout(
jmResourceId, localTaskManagerLocation.getResourceID()),
jobEvents);
assertThat(
jobEvents.stream()
.filter(
event ->
Events.JobStatusChangeEvent.name()
.equals(event.getName()))
.map(Event::getAttributes)
.map(x -> x.get("newJobStatus")))
.containsExactly(
JobStatus.RUNNING.toString(),
JobStatus.FAILING.toString(),
JobStatus.FAILED.toString());
final SchedulerType schedulerType =
runJobFailureWhenTaskExecutorTerminatesTest(
testingHeartbeatService,
(localTaskManagerLocation, jobMasterGateway) ->
testingHeartbeatService.triggerHeartbeatTimeout(
jmResourceId, localTaskManagerLocation.getResourceID()),
jobEvents);

assertJobStatusTransitions(schedulerType, jobEvents);
}

/**
* Asserts that job status transitions are as expected based on the scheduler type.
* DefaultScheduler does not emit CREATED state, while AdaptiveScheduler and
* AdaptiveBatchScheduler do.
*/
private static void assertJobStatusTransitions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay in responding. It looks like we have got to a the root cause of this, in that we have 2 different types of scheduler that contain different job status's.

I see that this method checks for the schedulerType and performs different asserts. I cannot see from the test that this method is driven for each scheduler type. If the 2 callers of this method cover the 2 scheduler types - it would be good if this was more explicit in the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slotPoolServiceSchedulerFactory is created from the configuration fromConfiguration() which would honor the dynamic scheduler config when running the tests. The default scheduler is used if no config overrides. According to @noorall we have CI tests that injects different scheduler for all tests. This test is more about JobMaster per se and iterating explicitly all schedulers seems unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidradl the maven profile 'enable-adaptive-scheduler' was introduced so that lots of existing tests can run against the AdaptiveScheduler which was added later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liuml07 @zhuzhurk Just to confirm, you are sure that both the if and the else paths below will be driven in this test. If so, LGTM, if not then we should change the test so it definitively targets the if and the else.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with @noorall and @zhuzhurk that he scheduler used in tests are not static, but can be set when running it. Specifically it's via the flink.tests.enable-adaptive-scheduler JVM args (or run tests via Maven profile enable-adaptive-scheduler). The slotPoolServiceSchedulerFactory creates schedulers dynamically by checking this property. With this PR, in this test we make different assertions accordingly.

By default, when this test is running, the DEFAULT scheduler is used so the else clause of this assertion method is called. When the test is running with either flink.tests.enable-adaptive-scheduler=true or with the Maven profile enable-adaptive-scheduler, the ADAPTIVE scheduler is used and the if clause of this assertion method is called. One can enable it locally for debugging, and in the nightly CI, one workflow is enabling it, see here.

I guess the purpose of adding the dynamic feature through the Maven profile 'enable-adaptive-scheduler' is likely to eliminate the need for existing tests to explicitly enumerate various scheduler types, as long as they are either agnostic or aware of scheduler type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidradl Please help to review again. Thanks.

SchedulerType schedulerType, List<Event> jobEvents) {
final List<String> jobStatusTransitions =
jobEvents.stream()
.filter(event -> Events.JobStatusChangeEvent.name().equals(event.getName()))
.map(Event::getAttributes)
.map(x -> (String) x.get("newJobStatus"))
.collect(Collectors.toList());

if (schedulerType == SchedulerType.Adaptive) {
// Adaptive schedulers emit CREATED: CREATED → RUNNING → FAILING → FAILED
assertThat(jobStatusTransitions)
.containsExactly(
JobStatus.CREATED.toString(),
JobStatus.RUNNING.toString(),
JobStatus.FAILING.toString(),
JobStatus.FAILED.toString());
} else {
// Default scheduler does not emit CREATED: RUNNING → FAILING → FAILED
assertThat(jobStatusTransitions)
.containsExactly(
JobStatus.RUNNING.toString(),
JobStatus.FAILING.toString(),
JobStatus.FAILED.toString());
}

assertThat(
jobEvents.stream()
Expand Down Expand Up @@ -2555,7 +2556,7 @@ private TestingResourceManagerGateway createResourceManagerGateway(
return resourceManagerGateway;
}

private void runJobFailureWhenTaskExecutorTerminatesTest(
private SchedulerType runJobFailureWhenTaskExecutorTerminatesTest(
HeartbeatServices heartbeatServices,
BiConsumer<LocalUnresolvedTaskManagerLocation, JobMasterGateway> jobReachedRunningState,
List<Event> jobEventsOut)
Expand All @@ -2564,12 +2565,17 @@ private void runJobFailureWhenTaskExecutorTerminatesTest(
final JobMasterBuilder.TestingOnCompletionActions onCompletionActions =
new JobMasterBuilder.TestingOnCompletionActions();

final SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory =
DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
configuration, jobGraph.getJobType(), jobGraph.isDynamic());

try (final JobMaster jobMaster =
new JobMasterBuilder(jobGraph, rpcService)
.withResourceId(jmResourceId)
.withHighAvailabilityServices(haServices)
.withHeartbeatServices(heartbeatServices)
.withOnCompletionActions(onCompletionActions)
.withSlotPoolServiceSchedulerFactory(slotPoolServiceSchedulerFactory)
.withMetricsGroupFactory(
new JobManagerJobMetricGroupFactory() {
@Override
Expand Down Expand Up @@ -2636,6 +2642,8 @@ public void addEvent(EventBuilder eventBuilder) {

assertThat(archivedExecutionGraph.getState()).isEqualTo(JobStatus.FAILED);
}

return slotPoolServiceSchedulerFactory.getSchedulerType();
}

private Collection<SlotOffer> registerSlotsAtJobMaster(
Expand Down