Skip to content

Commit 78e8100

Browse files
committed
[C++ SDK] Added metrics for QueryClient (#15213)
1 parent 55d3e8d commit 78e8100

File tree

6 files changed

+89
-48
lines changed

6 files changed

+89
-48
lines changed

src/client/impl/ydb_stats/stats.h

Lines changed: 52 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ struct TStatCollector {
160160
using TMetricRegistry = ::NMonitoring::TMetricRegistry;
161161

162162
public:
163-
164163
struct TEndpointElectorStatCollector {
165164

166165
TEndpointElectorStatCollector(::NMonitoring::TIntGauge* endpointCount = nullptr
@@ -197,29 +196,34 @@ struct TStatCollector {
197196

198197
TClientRetryOperationStatCollector() : MetricRegistry_(), Database_() {}
199198

200-
TClientRetryOperationStatCollector(::NMonitoring::TMetricRegistry* registry, const std::string& database)
201-
: MetricRegistry_(registry), Database_(database)
199+
TClientRetryOperationStatCollector(::NMonitoring::TMetricRegistry* registry,
200+
const std::string& database,
201+
const std::string& clientType)
202+
: MetricRegistry_(registry)
203+
, Database_(database)
204+
, ClientType_(clientType)
202205
{ }
203206

204207
void IncSyncRetryOperation(const EStatus& status) {
205208
if (auto registry = MetricRegistry_.Get()) {
206209
std::string statusName = TStringBuilder() << status;
207210
std::string sensor = TStringBuilder() << "RetryOperation/" << UnderscoreToUpperCamel(statusName);
208-
registry->Rate({ {"database", Database_}, {"sensor", sensor} })->Inc();
211+
registry->Rate({ {"database", Database_}, {"ydb_client", ClientType_}, {"sensor", sensor} })->Inc();
209212
}
210213
}
211214

212215
void IncAsyncRetryOperation(const EStatus& status) {
213216
if (auto registry = MetricRegistry_.Get()) {
214217
std::string statusName = TStringBuilder() << status;
215218
std::string sensor = TStringBuilder() << "RetryOperation/" << UnderscoreToUpperCamel(statusName);
216-
registry->Rate({ {"database", Database_}, {"sensor", sensor} })->Inc();
219+
registry->Rate({ {"database", Database_}, {"ydb_client", ClientType_}, {"sensor", sensor} })->Inc();
217220
}
218221
}
219222

220223
private:
221224
TAtomicPointer<::NMonitoring::TMetricRegistry> MetricRegistry_;
222225
std::string Database_;
226+
std::string ClientType_;
223227
};
224228

225229
struct TClientStatCollector {
@@ -264,22 +268,11 @@ struct TStatCollector {
264268
RequestFailDueQueueOverflow_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Request/FailedDiscoveryQueueOverflow"} }));
265269
RequestFailDueNoEndpoint_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Request/FailedNoEndpoint"} }));
266270
RequestFailDueTransportError_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Request/FailedTransportError"} }));
267-
CacheMiss_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Request/ClientQueryCacheMiss"} }));
268-
ActiveSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InUse"} }));
269-
InPoolSessions_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/InPool"} }));
270-
Waiters_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Sessions/WaitForReturn"} }));
271271
SessionCV_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "SessionBalancer/Variation"} }));
272-
SessionRemovedDueBalancing_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/SessionsRemoved"} }));
273-
RequestMigrated_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "SessionBalancer/RequestsMigrated"} }));
274-
FakeSessions_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Sessions/SessionsLimitExceeded"} }));
275272
GRpcInFlight_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Grpc/InFlight"} }));
276273

277274
RequestLatency_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/Latency"} },
278275
::NMonitoring::ExponentialHistogram(20, 2, 1)));
279-
QuerySize_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/QuerySize"} },
280-
::NMonitoring::ExponentialHistogram(20, 2, 32)));
281-
ParamsSize_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/ParamsSize"} },
282-
::NMonitoring::ExponentialHistogram(10, 2, 32)));
283276
ResultSize_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/ResultSize"} },
284277
::NMonitoring::ExponentialHistogram(20, 2, 32)));
285278
}
@@ -336,31 +329,57 @@ struct TStatCollector {
336329

337330
TEndpointElectorStatCollector GetEndpointElectorStatCollector() {
338331
if (auto registry = MetricRegistryPtr_.Get()) {
339-
auto endpointCoint = registry->IntGauge({ {"database", Database_}, {"sensor", "Endpoints/Total"} });
340-
auto pessimizationRatio = registry->IntGauge({ {"database", Database_}, {"sensor", "Endpoints/BadRatio"} });
341-
auto activeEndpoints = registry->IntGauge({ {"database", Database_}, {"sensor", "Endpoints/Good"} });
332+
auto endpointCoint = registry->IntGauge({ DatabaseLabel_, {"sensor", "Endpoints/Total"} });
333+
auto pessimizationRatio = registry->IntGauge({ DatabaseLabel_, {"sensor", "Endpoints/BadRatio"} });
334+
auto activeEndpoints = registry->IntGauge({ DatabaseLabel_, {"sensor", "Endpoints/Good"} });
342335
return TEndpointElectorStatCollector(endpointCoint, pessimizationRatio, activeEndpoints);
343-
} else {
344-
return TEndpointElectorStatCollector();
345336
}
337+
338+
return TEndpointElectorStatCollector();
346339
}
347340

348-
TSessionPoolStatCollector GetSessionPoolStatCollector() {
349-
if (!IsCollecting()) {
350-
return TSessionPoolStatCollector();
341+
TSessionPoolStatCollector GetSessionPoolStatCollector(const std::string& clientType) {
342+
if (auto registry = MetricRegistryPtr_.Get()) {
343+
auto activeSessions = registry->IntGauge({ DatabaseLabel_, {"ydb_client", clientType},
344+
{"sensor", "Sessions/InUse"} });
345+
auto inPoolSessions = registry->IntGauge({ DatabaseLabel_, {"ydb_client", clientType},
346+
{"sensor", "Sessions/InPool"} });
347+
auto fakeSessions = registry->Rate({ DatabaseLabel_, {"ydb_client", clientType},
348+
{"sensor", "Sessions/SessionsLimitExceeded"} });
349+
auto waiters = registry->IntGauge({ DatabaseLabel_, {"ydb_client", clientType},
350+
{"sensor", "Sessions/WaitForReturn"} });
351+
352+
return TSessionPoolStatCollector(activeSessions, inPoolSessions, fakeSessions, waiters);
351353
}
352354

353-
return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get(), Waiters_.Get());
355+
return TSessionPoolStatCollector();
354356
}
355357

356-
TClientStatCollector GetClientStatCollector() {
357-
if (IsCollecting()) {
358-
return TClientStatCollector(CacheMiss_.Get(), QuerySize_.Get(), ParamsSize_.Get(),
359-
SessionRemovedDueBalancing_.Get(), RequestMigrated_.Get(),
360-
TClientRetryOperationStatCollector(MetricRegistryPtr_.Get(), Database_));
361-
} else {
362-
return TClientStatCollector();
358+
TClientStatCollector GetClientStatCollector(const std::string& clientType) {
359+
if (auto registry = MetricRegistryPtr_.Get()) {
360+
::NMonitoring::TRate* cacheMiss = nullptr;
361+
::NMonitoring::TRate* sessionRemovedDueBalancing = nullptr;
362+
::NMonitoring::TRate* requestMigrated = nullptr;
363+
364+
if (clientType == "Table") {
365+
cacheMiss = registry->Rate({ DatabaseLabel_, {"ydb_client", clientType},
366+
{"sensor", "Request/ClientQueryCacheMiss"} });
367+
sessionRemovedDueBalancing = registry->Rate({ DatabaseLabel_, {"ydb_client", clientType},
368+
{"sensor", "SessionBalancer/SessionsRemoved"} });
369+
requestMigrated = registry->Rate({ DatabaseLabel_, {"ydb_client", clientType},
370+
{"sensor", "SessionBalancer/RequestsMigrated"} });
371+
}
372+
373+
auto querySize = registry->HistogramRate({ DatabaseLabel_, {"ydb_client", clientType},
374+
{"sensor", "Request/QuerySize"} }, ::NMonitoring::ExponentialHistogram(20, 2, 32));
375+
auto paramsSize = registry->HistogramRate({ DatabaseLabel_, {"ydb_client", clientType},
376+
{"sensor", "Request/ParamsSize"} }, ::NMonitoring::ExponentialHistogram(10, 2, 32));
377+
378+
return TClientStatCollector(cacheMiss, querySize, paramsSize, sessionRemovedDueBalancing, requestMigrated,
379+
TClientRetryOperationStatCollector(MetricRegistryPtr_.Get(), Database_, clientType));
363380
}
381+
382+
return TClientStatCollector();
364383
}
365384

366385
bool IsCollecting() {
@@ -374,6 +393,7 @@ struct TStatCollector {
374393

375394
void IncGRpcInFlightByHost(const std::string& host);
376395
void DecGRpcInFlightByHost(const std::string& host);
396+
377397
private:
378398
const std::string Database_;
379399
const ::NMonitoring::TLabel DatabaseLabel_;
@@ -384,18 +404,9 @@ struct TStatCollector {
384404
TAtomicCounter<::NMonitoring::TRate> RequestFailDueNoEndpoint_;
385405
TAtomicCounter<::NMonitoring::TRate> RequestFailDueTransportError_;
386406
TAtomicCounter<::NMonitoring::TRate> DiscoveryFailDueTransportError_;
387-
TAtomicPointer<::NMonitoring::TIntGauge> ActiveSessions_;
388-
TAtomicPointer<::NMonitoring::TIntGauge> InPoolSessions_;
389-
TAtomicPointer<::NMonitoring::TIntGauge> Waiters_;
390407
TAtomicCounter<::NMonitoring::TIntGauge> SessionCV_;
391-
TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing_;
392-
TAtomicCounter<::NMonitoring::TRate> RequestMigrated_;
393-
TAtomicCounter<::NMonitoring::TRate> FakeSessions_;
394-
TAtomicCounter<::NMonitoring::TRate> CacheMiss_;
395408
TAtomicCounter<::NMonitoring::TIntGauge> GRpcInFlight_;
396409
TAtomicHistogram<::NMonitoring::THistogram> RequestLatency_;
397-
TAtomicHistogram<::NMonitoring::THistogram> QuerySize_;
398-
TAtomicHistogram<::NMonitoring::THistogram> ParamsSize_;
399410
TAtomicHistogram<::NMonitoring::THistogram> ResultSize_;
400411
};
401412

src/client/query/client.cpp

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,25 @@ class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl>, public
6767
, Settings_(settings)
6868
, SessionPool_(Settings_.SessionPoolSettings_.MaxActiveSessions_)
6969
{
70+
SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector("Query"));
71+
SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector("Query"));
7072
}
7173

7274
~TImpl() {
7375
// TODO: Drain sessions.
7476
}
7577

78+
void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector) {
79+
QuerySizeHistogram_.Set(collector.QuerySize);
80+
ParamsSizeHistogram_.Set(collector.ParamsSize);
81+
RetryOperationStatCollector_ = collector.RetryOperationStatCollector;
82+
}
83+
7684
TAsyncExecuteQueryIterator StreamExecuteQuery(const std::string& query, const TTxControl& txControl,
7785
const std::optional<TParams>& params, const TExecuteQuerySettings& settings, const std::optional<TSession>& session = {})
7886
{
87+
CollectQuerySize(query);
88+
CollectParamsSize(params ? &params->GetProtoMap() : nullptr);
7989
return TExecQueryImpl::StreamExecuteQuery(
8090
Connections_, DbDriverState_, query, txControl, params, settings, session);
8191
}
@@ -84,6 +94,8 @@ class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl>, public
8494
const std::optional<TParams>& params, const TExecuteQuerySettings& settings,
8595
const std::optional<TSession>& session = {})
8696
{
97+
CollectQuerySize(query);
98+
CollectParamsSize(params ? &params->GetProtoMap() : nullptr);
8799
return TExecQueryImpl::ExecuteQuery(
88100
Connections_, DbDriverState_, query, txControl, params, settings, session);
89101
}
@@ -500,14 +512,34 @@ class TQueryClient::TImpl: public TClientImplCommon<TQueryClient::TImpl>, public
500512
}
501513

502514
void CollectRetryStatAsync(EStatus status) {
503-
Y_UNUSED(status);
515+
RetryOperationStatCollector_.IncAsyncRetryOperation(status);
504516
}
505517

506518
void CollectRetryStatSync(EStatus status) {
507-
Y_UNUSED(status);
519+
RetryOperationStatCollector_.IncSyncRetryOperation(status);
520+
}
521+
522+
void CollectQuerySize(const std::string& query) {
523+
if (QuerySizeHistogram_.IsCollecting()) {
524+
QuerySizeHistogram_.Record(query.size());
525+
}
526+
}
527+
528+
void CollectParamsSize(const ::google::protobuf::Map<TStringType, Ydb::TypedValue>* params) {
529+
if (params && ParamsSizeHistogram_.IsCollecting()) {
530+
size_t size = 0;
531+
for (auto& keyvalue: *params) {
532+
size += keyvalue.second.ByteSizeLong();
533+
}
534+
ParamsSizeHistogram_.Record(size);
535+
}
508536
}
509537

510538
private:
539+
NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector_;
540+
NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> QuerySizeHistogram_;
541+
NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> ParamsSizeHistogram_;
542+
511543
TClientSettings Settings_;
512544
NSessionPool::TSessionPool SessionPool_;
513545
};

src/client/query/impl/exec_query.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ TFuture<std::pair<TPlainStatus, TExecuteQueryProcessorPtr>> StreamExecuteQueryIm
247247
}
248248

249249
if (settings.StatsCollectPeriod_) {
250-
request.set_stats_period_ms((*settings.StatsCollectPeriod_).count());
250+
request.set_stats_period_ms(settings.StatsCollectPeriod_->count());
251251
}
252252

253253
if (txControl.HasTx()) {

src/client/table/impl/table_client.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ TTableClient::TImpl::TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections,
2626
return;
2727
}
2828

29-
SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector());
30-
SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector());
29+
SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector("Table"));
30+
SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector("Table"));
3131
}
3232

3333
TTableClient::TImpl::~TImpl() {
@@ -987,7 +987,6 @@ void TTableClient::TImpl::SetStatCollector(const NSdkStats::TStatCollector::TCli
987987
ParamsSizeHistogram.Set(collector.ParamsSize);
988988
RetryOperationStatCollector = collector.RetryOperationStatCollector;
989989
SessionRemovedDueBalancing.Set(collector.SessionRemovedDueBalancing);
990-
RequestMigrated.Set(collector.RequestMigrated);
991990
}
992991

993992
TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings) {

src/client/table/impl/table_client.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,6 @@ class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public
315315
NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> QuerySizeHistogram;
316316
NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> ParamsSizeHistogram;
317317
NSdkStats::TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing;
318-
NSdkStats::TAtomicCounter<::NMonitoring::TRate> RequestMigrated;
319318

320319
private:
321320
NSessionPool::TSessionPool SessionPool_;

0 commit comments

Comments
 (0)