8989import java .util .Queue ;
9090import java .util .Set ;
9191import java .util .concurrent .TimeUnit ;
92- import java .util .concurrent .atomic .AtomicBoolean ;
9392import java .util .stream .IntStream ;
9493import javax .annotation .Nullable ;
9594import org .apache .twill .internal .CompositeService ;
101100 * events topic
102101 */
103102public class ProgramNotificationSubscriberService extends AbstractIdleService {
103+
104+ private static final Logger LOG =
105+ LoggerFactory .getLogger (ProgramNotificationSubscriberService .class );
104106 private final MessagingService messagingService ;
105107 private final CConfiguration cConf ;
106108 private final MetricsCollectionService metricsCollectionService ;
@@ -145,6 +147,8 @@ protected void startUp() throws Exception {
145147 List <Service > children = new ArrayList <>();
146148 String topicPrefix = cConf .get (Constants .AppFabric .PROGRAM_STATUS_EVENT_TOPIC );
147149 int numPartitions = cConf .getInt (Constants .AppFabric .PROGRAM_STATUS_EVENT_NUM_PARTITIONS );
150+ // Active runs should be restored only once not for every shard that is created.
151+ restoreActiveRuns ();
148152 // Add bare one - we always listen to it
149153 children .add (createChildService ("program.status" , topicPrefix ));
150154 // If number of partitions is more than 1 - create partitioned services
@@ -153,8 +157,60 @@ protected void startUp() throws Exception {
153157 .forEach (i -> children .add (createChildService ("program.status." + i , topicPrefix + i )));
154158 }
155159 delegate = new CompositeService (children );
156-
157160 delegate .startAndWait ();
161+ // Explicitly emit both launching and running counts on startup.
162+ emitFlowControlMetrics ();
163+ }
164+
165+ private void emitFlowControlMetrics () {
166+ runRecordMonitorService .emitLaunchingMetrics ();
167+ runRecordMonitorService .emitRunningMetrics ();
168+ }
169+
170+ private void restoreActiveRuns () {
171+ LOG .info ("Restoring active runs" );
172+ int batchSize = cConf .getInt (Constants .RuntimeMonitor .INIT_BATCH_SIZE );
173+ RetryStrategy retryStrategy =
174+ RetryStrategies .fromConfiguration (cConf , Constants .Service .RUNTIME_MONITOR_RETRY_PREFIX );
175+ long startTs = System .currentTimeMillis ();
176+
177+ Retries .runWithRetries (
178+ () ->
179+ store .scanActiveRuns (
180+ batchSize ,
181+ (runRecordDetail ) -> {
182+ if (runRecordDetail .getStartTs () > startTs ) {
183+ return ;
184+ }
185+ try {
186+ LOG .info ("Found active run: {}" , runRecordDetail .getProgramRunId ());
187+ if (runRecordDetail .getStatus () == ProgramRunStatus .PENDING ) {
188+ runRecordMonitorService .addRequest (runRecordDetail .getProgramRunId ());
189+ } else if (runRecordDetail .getStatus () == ProgramRunStatus .STARTING ) {
190+ runRecordMonitorService .addRequest (runRecordDetail .getProgramRunId ());
191+ // It is unknown what is the state of program runs in STARTING state.
192+ // A STARTING message is published again to retry STARTING logic.
193+ ProgramOptions programOptions =
194+ new SimpleProgramOptions (
195+ runRecordDetail .getProgramRunId ().getParent (),
196+ new BasicArguments (runRecordDetail .getSystemArgs ()),
197+ new BasicArguments (runRecordDetail .getUserArgs ()));
198+ LOG .debug ("Retrying to start run {}." , runRecordDetail .getProgramRunId ());
199+ programStateWriter .start (
200+ runRecordDetail .getProgramRunId (),
201+ programOptions ,
202+ null ,
203+ this .store .loadProgram (runRecordDetail .getProgramRunId ().getParent ()));
204+ }
205+ } catch (Exception e ) {
206+ ProgramRunId programRunId = runRecordDetail .getProgramRunId ();
207+ LOG .warn (
208+ "Retrying to start run {} failed. Marking it as failed." , programRunId , e );
209+ programStateWriter .error (programRunId , e );
210+ }
211+ }),
212+ retryStrategy ,
213+ e -> true );
158214 }
159215
160216 @ Override
@@ -178,7 +234,6 @@ private ProgramNotificationSingleTopicSubscriberService createChildService(
178234 provisioningService ,
179235 programStateWriter ,
180236 transactionRunner ,
181- store ,
182237 runRecordMonitorService ,
183238 name ,
184239 topicName ,
@@ -195,7 +250,7 @@ class ProgramNotificationSingleTopicSubscriberService
195250 extends AbstractNotificationSubscriberService {
196251
197252 private static final Logger LOG =
198- LoggerFactory .getLogger (ProgramNotificationSubscriberService .class );
253+ LoggerFactory .getLogger (ProgramNotificationSingleTopicSubscriberService .class );
199254
200255 private static final Gson GSON =
201256 ApplicationSpecificationAdapter .addTypeAdapters (new GsonBuilder ()).create ();
@@ -220,8 +275,6 @@ class ProgramNotificationSingleTopicSubscriberService
220275 private final Queue <Runnable > tasks ;
221276 private final MetricsCollectionService metricsCollectionService ;
222277 private Set <ProgramCompletionNotifier > programCompletionNotifiers ;
223- private final CConfiguration cConf ;
224- private final Store store ;
225278 private final RunRecordMonitorService runRecordMonitorService ;
226279 private final boolean checkTxSeparation ;
227280
@@ -234,7 +287,6 @@ class ProgramNotificationSingleTopicSubscriberService
234287 ProvisioningService provisioningService ,
235288 ProgramStateWriter programStateWriter ,
236289 TransactionRunner transactionRunner ,
237- Store store ,
238290 RunRecordMonitorService runRecordMonitorService ,
239291 String name ,
240292 String topicName ,
@@ -259,8 +311,6 @@ class ProgramNotificationSingleTopicSubscriberService
259311 this .metricsCollectionService = metricsCollectionService ;
260312 this .programCompletionNotifiers = programCompletionNotifiers ;
261313 this .runRecordMonitorService = runRecordMonitorService ;
262- this .cConf = cConf ;
263- this .store = store ;
264314
265315 // If number of partitions equals 1, DB deadlock cannot happen as a result of concurrent
266316 // modifications to
@@ -273,55 +323,6 @@ class ProgramNotificationSingleTopicSubscriberService
273323 @ Override
274324 protected void doStartUp () throws Exception {
275325 super .doStartUp ();
276-
277- int batchSize = cConf .getInt (Constants .RuntimeMonitor .INIT_BATCH_SIZE );
278- RetryStrategy retryStrategy =
279- RetryStrategies .fromConfiguration (cConf , Constants .Service .RUNTIME_MONITOR_RETRY_PREFIX );
280- long startTs = System .currentTimeMillis ();
281-
282- AtomicBoolean launching = new AtomicBoolean (false );
283- Retries .runWithRetries (
284- () ->
285- store .scanActiveRuns (
286- batchSize ,
287- (runRecordDetail ) -> {
288- if (runRecordDetail .getStartTs () > startTs ) {
289- return ;
290- }
291- try {
292- if (runRecordDetail .getStatus () == ProgramRunStatus .PENDING ) {
293- launching .set (true );
294- runRecordMonitorService .addRequest (runRecordDetail .getProgramRunId ());
295- } else if (runRecordDetail .getStatus () == ProgramRunStatus .STARTING ) {
296- launching .set (true );
297- runRecordMonitorService .addRequest (runRecordDetail .getProgramRunId ());
298- // It is unknown what is the state of program runs in STARTING state.
299- // A STARTING message is published again to retry STARTING logic.
300- ProgramOptions programOptions =
301- new SimpleProgramOptions (
302- runRecordDetail .getProgramRunId ().getParent (),
303- new BasicArguments (runRecordDetail .getSystemArgs ()),
304- new BasicArguments (runRecordDetail .getUserArgs ()));
305- LOG .debug ("Retrying to start run {}." , runRecordDetail .getProgramRunId ());
306- programStateWriter .start (
307- runRecordDetail .getProgramRunId (),
308- programOptions ,
309- null ,
310- this .store .loadProgram (runRecordDetail .getProgramRunId ().getParent ()));
311- }
312- } catch (Exception e ) {
313- ProgramRunId programRunId = runRecordDetail .getProgramRunId ();
314- LOG .warn (
315- "Retrying to start run {} failed. Marking it as failed." , programRunId , e );
316- programStateWriter .error (programRunId , e );
317- }
318- }),
319- retryStrategy ,
320- e -> true );
321- if (!launching .get ()) {
322- // there is no launching pipeline
323- runRecordMonitorService .emitLaunchingMetrics (0 );
324- }
325326 }
326327
327328 @ Nullable
0 commit comments