File tree Expand file tree Collapse file tree 2 files changed +13
-2
lines changed
main/java/org/apache/flink/runtime/executiongraph
test/java/org/apache/flink/runtime/jobmaster Expand file tree Collapse file tree 2 files changed +13
-2
lines changed Original file line number Diff line number Diff line change @@ -1648,6 +1648,15 @@ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
16481648 public void registerJobStatusListener (JobStatusListener listener ) {
16491649 if (listener != null ) {
16501650 jobStatusListeners .add (listener );
1651+ // Emit current state to the newly registered listener
1652+ // This ensures listeners don't miss the initial state
1653+ try {
1654+ listener .jobStatusChanges (getJobID (), state , stateTimestamps [state .ordinal ()]);
1655+ } catch (Throwable t ) {
1656+ LOG .warn (
1657+ "Error while notifying newly registered JobStatusListener of current state" ,
1658+ t );
1659+ }
16511660 }
16521661 }
16531662
Original file line number Diff line number Diff line change @@ -1706,7 +1706,8 @@ void testJobFailureWhenGracefulTaskExecutorTermination() throws Exception {
17061706 .equals (event .getName ()))
17071707 .map (Event ::getAttributes )
17081708 .map (x -> x .get ("newJobStatus" )))
1709- .containsSubsequence (
1709+ .containsExactly (
1710+ JobStatus .CREATED .toString (),
17101711 JobStatus .RUNNING .toString (),
17111712 JobStatus .FAILING .toString (),
17121713 JobStatus .FAILED .toString ());
@@ -1747,7 +1748,8 @@ void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws Exception {
17471748 .equals (event .getName ()))
17481749 .map (Event ::getAttributes )
17491750 .map (x -> x .get ("newJobStatus" )))
1750- .containsSubsequence (
1751+ .containsExactly (
1752+ JobStatus .CREATED .toString (),
17511753 JobStatus .RUNNING .toString (),
17521754 JobStatus .FAILING .toString (),
17531755 JobStatus .FAILED .toString ());
You can’t perform that action at this time.
0 commit comments