@@ -274,13 +274,17 @@ bool TExecutor::Execute(const NYdb::NTable::TTableClient::TOperationFunc& func)
274274            }
275275        }
276276
277-         TStatUnit stat = Stats.CreateStatUnit ();
277+         auto  stat = Stats.StartRequest ();
278+ 
279+         auto  future = InsistentClient.ExecuteWithRetry ([func, stat](NYdb::NTable::TSession session) {
280+             auto  result = func (session);
281+             return  result;
282+         });
278283
279-         auto  future = InsistentClient.ExecuteWithRetry (func);
280284        future.Subscribe ([this , stat, SemaphoreWrapper](const  TAsyncFinalStatus& future) mutable  {
281285            Y_ABORT_UNLESS (future.HasValue ());
282286            TFinalStatus resultStatus = future.GetValue ();
283-             Stats.Report (stat, resultStatus);
287+             Stats.FinishRequest (stat, resultStatus);
284288            if  (resultStatus) {
285289                CheckForError (*resultStatus);
286290            }
@@ -351,7 +355,6 @@ bool TExecutor::IsStopped() {
351355}
352356
353357void  TExecutor::Finish () {
354-     //  Stats.UpdateSessionStats(InsistentClient.GetSessionStats());
355358    with_lock (Lock) {
356359        if  (!AllJobsLaunched) {
357360            AllJobsLaunched = true ;
@@ -361,9 +364,6 @@ void TExecutor::Finish() {
361364}
362365
363366void  TExecutor::UpdateStats () {
364-     if  (Infly > MaxSecInfly) {
365-         MaxSecInfly = Infly;
366-     }
367367    std::uint64_t  activeSessions = InsistentClient.GetActiveSessions ();
368368    if  (activeSessions > MaxSecSessions) {
369369        MaxSecSessions = activeSessions;
@@ -382,10 +382,10 @@ void TExecutor::UpdateStats() {
382382}
383383
384384void  TExecutor::ReportStats () {
385+     Stats.ReportStats (MaxSecSessions, MaxSecReadPromises, MaxSecExecutorPromises);
385386    TInstant now = TInstant::Now ();
386387    if  (now.Seconds () > LastReportSec) {
387-         Stats.ReportStats (MaxSecInfly, MaxSecSessions, MaxSecReadPromises, MaxSecExecutorPromises);
388-         MaxSecInfly = 0 ;
388+         Stats.ReportStats (MaxSecSessions, MaxSecReadPromises, MaxSecExecutorPromises);
389389        MaxSecSessions = 0 ;
390390        MaxSecReadPromises = 0 ;
391391        MaxSecExecutorPromises = 0 ;
@@ -442,93 +442,3 @@ void TExecutor::Report(TStringBuilder& out) const {
442442        }
443443    }
444444}
445- 
446- 
447- TExecutorWithRetry::TExecutorWithRetry (const  TCommonOptions& opts, TStat& stats)
448-     : TExecutor(opts, stats)
449- {}
450- 
451- bool  TExecutorWithRetry::Execute (const  NYdb::NTable::TTableClient::TOperationFunc& func) {
452-     auto  threadFunc = [this , func]() {
453-         if  (IsStopped ()) {
454-             DecrementWaiting ();
455-             return ;
456-         }
457- 
458-         with_lock (Lock) {
459-             --Waiting;
460-             if  (Infly < Opts.MaxInfly ) {
461-                 ++Infly;
462-                 if  (Infly > MaxInfly) {
463-                     MaxInfly = Infly;
464-                 }
465-                 UpdateStats ();
466-             } else  {
467-                 Stats.ReportMaxInfly ();
468-                 UpdateStats ();
469-                 return ;
470-             }
471-         }
472- 
473-         std::shared_ptr<TRetryContext> context = std::make_shared<TRetryContext>(Stats);
474- 
475-         auto  executeOperation = [this , func]() {
476-             return  InsistentClient.ExecuteWithRetry (func);
477-         };
478- 
479-         context->HandleStatusFunc  = std::make_unique<std::function<void (const  TAsyncFinalStatus& resultFuture)>>(
480-             [this , executeOperation, context](const  TAsyncFinalStatus& future) mutable  {
481-             Y_ABORT_UNLESS (future.HasValue ());
482-             TFinalStatus resultStatus = future.GetValue ();
483-             if  (resultStatus) {
484-                 //  Reply received
485-                 CheckForError (*resultStatus);
486-                 if  (resultStatus->IsSuccess ()) {
487-                     // Ok received
488-                     Stats.Report (context->LifeTimeStat , resultStatus->GetStatus ()); 
489-                     DecrementInfly ();
490-                     context->HandleStatusFunc .reset ();
491-                     return ;
492-                 }
493-             }
494-             if  (IsStopped () || TInstant::Now () - context->LifeTimeStat .Start  > GlobalTimeout) {
495-                 //  Application stopped working or global timeout reached. Ok reply hasn't received yet
496-                 Stats.Report (context->LifeTimeStat , TInnerStatus::StatusNotFinished);
497-                 DecrementInfly ();
498-                 context->HandleStatusFunc .reset ();
499-                 return ;
500-             }
501-             Stats.Report (context->PerRequestStat , resultStatus);
502-             context->PerRequestStat  = Stats.CreateStatUnit ();
503-             //  Retrying:
504-             executeOperation ().Subscribe (*context->HandleStatusFunc );
505-         });
506- 
507-         context->Retries .fetch_add (1 );
508-         Y_ABORT_UNLESS (context->Retries .load () < 500 , " Too much retries" 
509- 
510-         executeOperation ().Subscribe (*context->HandleStatusFunc );
511-     };
512- 
513-     if  (IsStopped ()) {
514-         return  false ;
515-     }
516- 
517-     bool  CanLaunchJob = false ;
518- 
519-     with_lock (Lock) {
520-         if  (!AllJobsLaunched) {
521-             CanLaunchJob = true ;
522-             ++Waiting;
523-         }
524-     }
525- 
526-     if  (CanLaunchJob) {
527-         if  (!InputQueue->AddFunc (threadFunc)) {
528-             DecrementWaiting ();
529-         }
530-     }
531-     ++InProgressCount;
532-     InProgressSum += InputQueue->Size ();
533-     return  true ;
534- }
0 commit comments