Skip to content

Conversation

@liuml07
Copy link
Member

@liuml07 liuml07 commented Oct 9, 2025

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 9, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@liuml07
Copy link
Member Author

liuml07 commented Oct 9, 2025

I don't think e2e test failure is related

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x16596ae]
...
Oct 09 05:08:29 [FAIL] 'Run kubernetes session test (default input)' failed after 1 minutes and 52 seconds! Test exited with exit code 1

.map(Event::getAttributes)
.map(x -> x.get("newJobStatus")))
.containsExactly(
.containsSubsequence(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a quick look at this is. My concern is that the CREATED event might not be coming out - and this is a Flink bug. I would feel for confident if the test checks that the CREATED event is coming out and the code be arranged so that the race condition does not occur. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this test cares more about the RUNNING → FAILING → FAILED events sequence when planned failures happen, I agree it's possible to hide a real problem in the Flink non-testing code. However, I do not find an easy fix that only touches the testing code to avoid the race condition. As the JobStatusChangeEvent is only for new job status (code), the initial CREATED is not expected to be captured here.

Reporting JobStatusChange event seems pretty new. @pnowojski may provide some ideas?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we'd like to change the Flink non-testing code, I have updated the PR to demonstrate the idea of notifying the jobStatus listeners at registration time. But I presume that's a larger code change, and just share for discussion. I'll look into the code more carefully.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Oct 9, 2025
@noorall
Copy link
Contributor

noorall commented Oct 27, 2025

From my observation, when this test was introduced, it was only validated on the DefaultScheduler, but not on the AdaptiveScheduler, which has a different initial job state.
In DefaultScheduler, a job starts in the CREATED state, so the state transition is:
CREATED → RUNNING → ...
In contrast, in AdaptiveScheduler, a job starts in the INITIALIZING state (see link) resulting in the transition:
INITIALIZING → CREATED → RUNNING → ...
Unfortunately, the flink CI bot that runs during PR submission doesn't verify whether this test passes under AdaptiveScheduler—that validation only happens during the daily runs, which is why the issue wasn't caught when the PR was merge.
As the AdaptiveScheduler code has been around for a considerable time and this state transition behavior is both intentional and reasonable, I suggest we simply update the test code to fix this issue, without breaking the original assumptions for DefaultScheduler. WDYT? @liuml07 @davidradl

@liuml07
Copy link
Member Author

liuml07 commented Oct 28, 2025

@noorall This is indeed very helpful context. I can revert the new commit and make improvements on the first commit that fixes the test here.

One quick question though: why the test fails intermittently? Is the scheduler used in tests non-deterministic? Can we make it static in test and/or make test code aware of which schedule is being used? Thanks!

@noorall
Copy link
Contributor

noorall commented Oct 28, 2025

@noorall This is indeed very helpful context. I can revert the new commit and make improvements on the first commit that fixes the test here.

One quick question though: why the test fails intermittently? Is the scheduler used in tests non-deterministic? Can we make it static in test and/or make test code aware of which schedule is being used? Thanks!

  1. This isn’t actually an intermittently failing test—when I locally configure the scheduler type to adaptive, the test fails 100% of the time, not intermittently.
  2. The scheduler used in tests is not deterministic. It’s controlled by JobManagerOptions.SCHEDULER, which defaults to DEFAULT, but certain CI pipelines inject extra environment variables to force a specific scheduler type (see the logic in ClusterOptions.isAdaptiveSchedulerEnabled() and DefaultSlotPoolServiceSchedulerFactory.getSchedulerType()).
  3. It’s better not to hardcode a particular scheduler type in tests. Instead, we can explicitly pass a slotPoolServiceSchedulerFactory to the JobMasterBuilder and let the test code determine the active scheduler type via slotPoolServiceSchedulerFactory.getSchedulerType().
    This way, the test remains flexible while correctly handling both scheduler modes. @liuml07

@liuml07
Copy link
Member Author

liuml07 commented Oct 29, 2025

  1. Thanks for pointing out that. I now can reproduce the test failure as you suggested by specifying adaptive scheduler via -Dflink.tests.enable-adaptive-scheduler=true
  2. Glad to know that. Without that context, I was only chasing possible data races.
  3. Totally makes sense. I have updated the PR to implement the idea. Could you help review it, @noorall ? Thanks,

Copy link
Contributor

@noorall noorall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The works overall looks good. I have a comment. PTAL. @liuml07

.map(x -> (String) x.get("newJobStatus"))
.collect(Collectors.toList());

if (schedulerType == SchedulerType.Default) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer using if (schedulerType == SchedulerType.Adaptive) as Default is a more general scheduler.

@liuml07 liuml07 changed the title [FLINK-38404][core] Fix JobMasterTest due to data race [FLINK-38404][core] Fix JobMasterTest by making it scheduler-type aware Oct 29, 2025
Copy link
Contributor

@noorall noorall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@ruanhang1993
Copy link
Contributor

@flinkbot run azure

@ruanhang1993
Copy link
Contributor

@liuml07 The flink robot is not available now. Could you please append an empty commit to trigger a new CI run?

@liuml07
Copy link
Member Author

liuml07 commented Oct 30, 2025

Sounds good. I rebased from master again.

noorall

This comment was marked as duplicate.

"Test disconnectTaskManager exception.")),
jobEvents);

assertThat(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed something—this part seems to have been omitted? @liuml07

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is part of the new helper function assertJobStatusTransitions(schedulerType, jobEvents) as this assertion is also about status change and it's duplicate between the two tests, so just reused it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is part of the new helper function assertJobStatusTransitions(schedulerType, jobEvents) as this assertion is also about status change and it's duplicate between the two tests, so just reused it.

Got it, LGTM! 😊

* DefaultScheduler does not emit CREATED state, while AdaptiveScheduler and
* AdaptiveBatchScheduler do.
*/
private static void assertJobStatusTransitions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay in responding. It looks like we have got to a the root cause of this, in that we have 2 different types of scheduler that contain different job status's.

I see that this method checks for the schedulerType and performs different asserts. I cannot see from the test that this method is driven for each scheduler type. If the 2 callers of this method cover the 2 scheduler types - it would be good if this was more explicit in the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slotPoolServiceSchedulerFactory is created from the configuration fromConfiguration() which would honor the dynamic scheduler config when running the tests. The default scheduler is used if no config overrides. According to @noorall we have CI tests that injects different scheduler for all tests. This test is more about JobMaster per se and iterating explicitly all schedulers seems unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidradl the maven profile 'enable-adaptive-scheduler' was introduced so that lots of existing tests can run against the AdaptiveScheduler which was added later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liuml07 @zhuzhurk Just to confirm, you are sure that both the if and the else paths below will be driven in this test. If so, LGTM, if not then we should change the test so it definitively targets the if and the else.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with @noorall and @zhuzhurk that he scheduler used in tests are not static, but can be set when running it. Specifically it's via the flink.tests.enable-adaptive-scheduler JVM args (or run tests via Maven profile enable-adaptive-scheduler). The slotPoolServiceSchedulerFactory creates schedulers dynamically by checking this property. With this PR, in this test we make different assertions accordingly.

By default, when this test is running, the DEFAULT scheduler is used so the else clause of this assertion method is called. When the test is running with either flink.tests.enable-adaptive-scheduler=true or with the Maven profile enable-adaptive-scheduler, the ADAPTIVE scheduler is used and the if clause of this assertion method is called. One can enable it locally for debugging, and in the nightly CI, one workflow is enabling it, see here.

I guess the purpose of adding the dynamic feature through the Maven profile 'enable-adaptive-scheduler' is likely to eliminate the need for existing tests to explicitly enumerate various scheduler types, as long as they are either agnostic or aware of scheduler type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidradl Please help to review again. Thanks.

Copy link
Contributor

@zhuzhurk zhuzhurk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@zhuzhurk
Copy link
Contributor

Thanks for fixing it! @liuml07
And thanks for the reviewing. @davidradl @noorall

@ruanhang1993
Copy link
Contributor

@liuml07 @noorall @davidradl @zhuzhurk Thanks for the fix and reviews.
If there are no more comments, I will merge this PR about 4 hours later.

@ruanhang1993
Copy link
Contributor

Thanks for all comments. Merged.

@ruanhang1993 ruanhang1993 merged commit b7acd9f into apache:master Nov 3, 2025
@liuml07 liuml07 deleted the FLINK-38404 branch November 3, 2025 19:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants