@@ -274,13 +274,17 @@ bool TExecutor::Execute(const NYdb::NTable::TTableClient::TOperationFunc& func)
274274 }
275275 }
276276
277- TStatUnit stat = Stats.CreateStatUnit ();
277+ auto stat = Stats.StartRequest ();
278+
279+ auto future = InsistentClient.ExecuteWithRetry ([func, stat](NYdb::NTable::TSession session) {
280+ auto result = func (session);
281+ return result;
282+ });
278283
279- auto future = InsistentClient.ExecuteWithRetry (func);
280284 future.Subscribe ([this , stat, SemaphoreWrapper](const TAsyncFinalStatus& future) mutable {
281285 Y_ABORT_UNLESS (future.HasValue ());
282286 TFinalStatus resultStatus = future.GetValue ();
283- Stats.Report (stat, resultStatus);
287+ Stats.FinishRequest (stat, resultStatus);
284288 if (resultStatus) {
285289 CheckForError (*resultStatus);
286290 }
@@ -351,7 +355,6 @@ bool TExecutor::IsStopped() {
351355}
352356
353357void TExecutor::Finish () {
354- // Stats.UpdateSessionStats(InsistentClient.GetSessionStats());
355358 with_lock (Lock) {
356359 if (!AllJobsLaunched) {
357360 AllJobsLaunched = true ;
@@ -361,9 +364,6 @@ void TExecutor::Finish() {
361364}
362365
363366void TExecutor::UpdateStats () {
364- if (Infly > MaxSecInfly) {
365- MaxSecInfly = Infly;
366- }
367367 std::uint64_t activeSessions = InsistentClient.GetActiveSessions ();
368368 if (activeSessions > MaxSecSessions) {
369369 MaxSecSessions = activeSessions;
@@ -382,10 +382,10 @@ void TExecutor::UpdateStats() {
382382}
383383
384384void TExecutor::ReportStats () {
385+ Stats.ReportStats (MaxSecSessions, MaxSecReadPromises, MaxSecExecutorPromises);
385386 TInstant now = TInstant::Now ();
386387 if (now.Seconds () > LastReportSec) {
387- Stats.ReportStats (MaxSecInfly, MaxSecSessions, MaxSecReadPromises, MaxSecExecutorPromises);
388- MaxSecInfly = 0 ;
388+ Stats.ReportStats (MaxSecSessions, MaxSecReadPromises, MaxSecExecutorPromises);
389389 MaxSecSessions = 0 ;
390390 MaxSecReadPromises = 0 ;
391391 MaxSecExecutorPromises = 0 ;
@@ -442,93 +442,3 @@ void TExecutor::Report(TStringBuilder& out) const {
442442 }
443443 }
444444}
445-
446-
447- TExecutorWithRetry::TExecutorWithRetry (const TCommonOptions& opts, TStat& stats)
448- : TExecutor(opts, stats)
449- {}
450-
451- bool TExecutorWithRetry::Execute (const NYdb::NTable::TTableClient::TOperationFunc& func) {
452- auto threadFunc = [this , func]() {
453- if (IsStopped ()) {
454- DecrementWaiting ();
455- return ;
456- }
457-
458- with_lock (Lock) {
459- --Waiting;
460- if (Infly < Opts.MaxInfly ) {
461- ++Infly;
462- if (Infly > MaxInfly) {
463- MaxInfly = Infly;
464- }
465- UpdateStats ();
466- } else {
467- Stats.ReportMaxInfly ();
468- UpdateStats ();
469- return ;
470- }
471- }
472-
473- std::shared_ptr<TRetryContext> context = std::make_shared<TRetryContext>(Stats);
474-
475- auto executeOperation = [this , func]() {
476- return InsistentClient.ExecuteWithRetry (func);
477- };
478-
479- context->HandleStatusFunc = std::make_unique<std::function<void (const TAsyncFinalStatus& resultFuture)>>(
480- [this , executeOperation, context](const TAsyncFinalStatus& future) mutable {
481- Y_ABORT_UNLESS (future.HasValue ());
482- TFinalStatus resultStatus = future.GetValue ();
483- if (resultStatus) {
484- // Reply received
485- CheckForError (*resultStatus);
486- if (resultStatus->IsSuccess ()) {
487- // Ok received
488- Stats.Report (context->LifeTimeStat , resultStatus->GetStatus ());
489- DecrementInfly ();
490- context->HandleStatusFunc .reset ();
491- return ;
492- }
493- }
494- if (IsStopped () || TInstant::Now () - context->LifeTimeStat .Start > GlobalTimeout) {
495- // Application stopped working or global timeout reached. Ok reply hasn't received yet
496- Stats.Report (context->LifeTimeStat , TInnerStatus::StatusNotFinished);
497- DecrementInfly ();
498- context->HandleStatusFunc .reset ();
499- return ;
500- }
501- Stats.Report (context->PerRequestStat , resultStatus);
502- context->PerRequestStat = Stats.CreateStatUnit ();
503- // Retrying:
504- executeOperation ().Subscribe (*context->HandleStatusFunc );
505- });
506-
507- context->Retries .fetch_add (1 );
508- Y_ABORT_UNLESS (context->Retries .load () < 500 , " Too much retries" );
509-
510- executeOperation ().Subscribe (*context->HandleStatusFunc );
511- };
512-
513- if (IsStopped ()) {
514- return false ;
515- }
516-
517- bool CanLaunchJob = false ;
518-
519- with_lock (Lock) {
520- if (!AllJobsLaunched) {
521- CanLaunchJob = true ;
522- ++Waiting;
523- }
524- }
525-
526- if (CanLaunchJob) {
527- if (!InputQueue->AddFunc (threadFunc)) {
528- DecrementWaiting ();
529- }
530- }
531- ++InProgressCount;
532- InProgressSum += InputQueue->Size ();
533- return true ;
534- }
0 commit comments