From 05cd065d25a2289a00433f2a3b9d7e8cb7a76f44 Mon Sep 17 00:00:00 2001 From: FloatingCrowbar <103565628+FloatingCrowbar@users.noreply.github.com> Date: Mon, 1 Apr 2024 09:48:14 +0300 Subject: [PATCH] Deduplicated messages sensors: LOGBROKER-8733 (#3147) --- ydb/core/persqueue/partition.h | 3 ++ ydb/core/persqueue/partition_init.cpp | 11 ++++++++ ydb/core/persqueue/partition_write.cpp | 2 ++ ydb/core/persqueue/ut/counters_ut.cpp | 7 +++-- .../ut/resources/counters_datastreams.html | 4 ++- .../ut/resources/counters_pqproxy.html | 28 +++++++++++++------ .../persqueue_new_schemecache_ut.cpp | 20 +++++++------ ydb/services/persqueue_v1/persqueue_ut.cpp | 2 ++ 8 files changed, 56 insertions(+), 21 deletions(-) diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 55258f939ecb..62d6c1a836fd 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -780,6 +780,9 @@ class TPartition : public TActorBootstrapped { TPartitionCounterWrapper MsgsWrittenTotal; TPartitionCounterWrapper MsgsWrittenGrpc; + NKikimr::NPQ::TMultiCounter MsgsDiscarded; + NKikimr::NPQ::TMultiCounter BytesDiscarded; + // Writing blob with topic quota variables ui64 TopicQuotaRequestCookie = 0; ui64 NextTopicWriteQuotaRequestCookie = 1; diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index a33ba35f7cdd..321fd5efa174 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -802,6 +802,10 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) { MsgsWrittenTotal.Setup( IsSupportive(), true, NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"MessagesWritten" + txSuffix}, true)); + if (IsLocalDC) { + MsgsDiscarded = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"DiscardedMessages"}, true); + BytesDiscarded = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"DiscardedBytes"}, true); + } TVector aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}}; ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs(); @@ -904,6 +908,13 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) { NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, {"topic.write." + messagesSuffix}, true, "name")); + MsgsDiscarded = NKikimr::NPQ::TMultiCounter( + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, + {"topic.write.discarded_messages"}, true, "name"); + BytesDiscarded = NKikimr::NPQ::TMultiCounter( + NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, + {"topic.write.discarded_bytes"} , true, "name"); + BytesWrittenUncompressed.Setup( IsSupportive(), false, NKikimr::NPQ::TMultiCounter( diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index bbb00bb6ec50..835ce73ba1a5 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -927,7 +927,9 @@ TPartition::EProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParam ); TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1); + MsgsDiscarded.Inc(); TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size()); + BytesDiscarded.Inc(p.Msg.Data.size()); } else { TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1); TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size()); diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp index b1cff036ae8d..6cfdf273af88 100644 --- a/ydb/core/persqueue/ut/counters_ut.cpp +++ b/ydb/core/persqueue/ut/counters_ut.cpp @@ -84,6 +84,8 @@ Y_UNIT_TEST(Partition) { CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true); CmdWrite(0, "sourceid1", TestData(), tc, false); CmdWrite(0, "sourceid2", TestData(), tc, false); + CmdWrite(0, "sourceid1", TestData(), tc, false); + CmdWrite(0, "sourceid2", TestData(), tc, false); PQGetPartInfo(0, 30, tc); @@ -94,7 +96,7 @@ Y_UNIT_TEST(Partition) { dbGroup->OutputHtml(countersStr); TString referenceCounters = NResource::Find(TStringBuf("counters_pqproxy.html")); - UNIT_ASSERT_EQUAL(countersStr.Str() + "\n", referenceCounters); + UNIT_ASSERT_VALUES_EQUAL(countersStr.Str() + "\n", referenceCounters); } { @@ -102,7 +104,7 @@ Y_UNIT_TEST(Partition) { auto dbGroup = GetServiceCounters(counters, "datastreams"); TStringStream countersStr; dbGroup->OutputHtml(countersStr); - UNIT_ASSERT_EQUAL(countersStr.Str(), "
");
+        UNIT_ASSERT_VALUES_EQUAL(countersStr.Str(), "
");
     }
 }
 
@@ -173,6 +175,7 @@ Y_UNIT_TEST(PartitionFirstClass) {
     CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true);
     CmdWrite(0, "sourceid1", TestData(), tc, false);
     CmdWrite(0, "sourceid2", TestData(), tc, false);
+    CmdWrite(0, "sourceid0", TestData(), tc, false);
     PQGetPartInfo(0, 30, tc);
 
     {
diff --git a/ydb/core/persqueue/ut/resources/counters_datastreams.html b/ydb/core/persqueue/ut/resources/counters_datastreams.html
index 9de064cfaf99..ca5239ace27e 100644
--- a/ydb/core/persqueue/ut/resources/counters_datastreams.html
+++ b/ydb/core/persqueue/ut/resources/counters_datastreams.html
@@ -3,6 +3,8 @@
     name=api.grpc.topic.stream_write.bytes: 540
     name=api.grpc.topic.stream_write.messages: 30
     name=topic.write.bytes: 540
+    name=topic.write.discarded_bytes: 90
+    name=topic.write.discarded_messages: 10
     name=topic.write.messages: 30
     name=topic.write.uncompressed_bytes: 270
 
@@ -55,7 +57,7 @@
         bin=99999999: 0
 
     name=topic.write.partition_throttled_milliseconds:
-        bin=0: 30
+        bin=0: 40
         bin=1: 0
         bin=10: 0
         bin=100: 0
diff --git a/ydb/core/persqueue/ut/resources/counters_pqproxy.html b/ydb/core/persqueue/ut/resources/counters_pqproxy.html
index 530620620daa..1b24056d1bde 100644
--- a/ydb/core/persqueue/ut/resources/counters_pqproxy.html
+++ b/ydb/core/persqueue/ut/resources/counters_pqproxy.html
@@ -50,7 +50,7 @@
                     OriginDC=Dc1:
 
                         sensor=PartitionWriteQuotaWaitOriginal:
-                            Interval=0ms: 30
+                            Interval=0ms: 50
                             Interval=10000ms: 0
                             Interval=1000ms: 0
                             Interval=100ms: 0
@@ -67,7 +67,7 @@
                     OriginDC=cluster:
 
                         sensor=PartitionWriteQuotaWaitOriginal:
-                            Interval=0ms: 30
+                            Interval=0ms: 50
                             Interval=10000ms: 0
                             Interval=1000ms: 0
                             Interval=100ms: 0
@@ -88,7 +88,7 @@
                     OriginDC=cluster:
 
                         sensor=PartitionWriteQuotaWaitOriginal:
-                            Interval=0ms: 30
+                            Interval=0ms: 50
                             Interval=10000ms: 0
                             Interval=1000ms: 0
                             Interval=100ms: 0
@@ -111,7 +111,7 @@
                     OriginDC=cluster:
 
                         sensor=PartitionWriteQuotaWaitOriginal:
-                            Interval=0ms: 30
+                            Interval=0ms: 50
                             Interval=10000ms: 0
                             Interval=1000ms: 0
                             Interval=100ms: 0
@@ -136,7 +136,7 @@
                     OriginDC=cluster:
 
                         sensor=PartitionWriteQuotaWaitOriginal:
-                            Interval=0ms: 30
+                            Interval=0ms: 50
                             Interval=10000ms: 0
                             Interval=1000ms: 0
                             Interval=100ms: 0
@@ -470,17 +470,21 @@
                 TopicPath=asdfgs/topic:
 
                     ClientDC=Unknown:
-                        sensor=BytesWrittenFromDC: 1560
+                        sensor=BytesWrittenFromDC: 2600
 
                     OriginDC=Dc1:
                         sensor=BytesWrittenOriginal: 540
                         sensor=CompactedBytesWrittenOriginal: 747
+                        sensor=DiscardedBytes: 180
+                        sensor=DiscardedMessages: 20
                         sensor=MessagesWrittenOriginal: 30
                         sensor=UncompressedBytesWrittenOriginal: 270
 
                     OriginDC=cluster:
                         sensor=BytesWrittenOriginal: 540
                         sensor=CompactedBytesWrittenOriginal: 747
+                        sensor=DiscardedBytes: 180
+                        sensor=DiscardedMessages: 20
                         sensor=MessagesWrittenOriginal: 30
                         sensor=UncompressedBytesWrittenOriginal: 270
 
@@ -489,11 +493,13 @@
                 TopicPath=total:
 
                     ClientDC=Unknown:
-                        sensor=BytesWrittenFromDC: 1560
+                        sensor=BytesWrittenFromDC: 2600
 
                     OriginDC=cluster:
                         sensor=BytesWrittenOriginal: 540
                         sensor=CompactedBytesWrittenOriginal: 747
+                        sensor=DiscardedBytes: 180
+                        sensor=DiscardedMessages: 20
                         sensor=MessagesWrittenOriginal: 30
                         sensor=UncompressedBytesWrittenOriginal: 270
 
@@ -504,11 +510,13 @@
                 TopicPath=total:
 
                     ClientDC=Unknown:
-                        sensor=BytesWrittenFromDC: 1560
+                        sensor=BytesWrittenFromDC: 2600
 
                     OriginDC=cluster:
                         sensor=BytesWrittenOriginal: 540
                         sensor=CompactedBytesWrittenOriginal: 747
+                        sensor=DiscardedBytes: 180
+                        sensor=DiscardedMessages: 20
                         sensor=MessagesWrittenOriginal: 30
                         sensor=UncompressedBytesWrittenOriginal: 270
 
@@ -521,11 +529,13 @@
                 TopicPath=total:
 
                     ClientDC=Unknown:
-                        sensor=BytesWrittenFromDC: 1560
+                        sensor=BytesWrittenFromDC: 2600
 
                     OriginDC=cluster:
                         sensor=BytesWrittenOriginal: 540
                         sensor=CompactedBytesWrittenOriginal: 747
+                        sensor=DiscardedBytes: 180
+                        sensor=DiscardedMessages: 20
                         sensor=MessagesWrittenOriginal: 30
                         sensor=UncompressedBytesWrittenOriginal: 270
 
diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
index 8e617380ae17..7d4a33371514 100644
--- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
@@ -223,7 +223,7 @@ namespace NKikimr::NPersQueueTests {
                 Sleep(TDuration::MilliSeconds(10));
             }
 
-            // Ts and firstOffset and expectingQuantities will be set in first iteration of reading by received messages. 
+            // Ts and firstOffset and expectingQuantities will be set in first iteration of reading by received messages.
             // Each will contains shifts from the message: before, equals and after.
             // It allow check reading from different shift. First iteration read from zero.
             TVector ts { TInstant::Zero() };
@@ -254,10 +254,10 @@ namespace NKikimr::NPersQueueTests {
                 ui32 lastOffset = 0;
 
                 settings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) mutable {
-                        Cerr << ">>>>> Iteration: " << i << " TDataReceivedEvent: " << event.DebugString(false) 
+                        Cerr << ">>>>> Iteration: " << i << " TDataReceivedEvent: " << event.DebugString(false)
                              << " size=" << event.GetMessages().size() << Endl << Flush;
                         for (const auto& msg : event.GetMessages()) {
-                            Cerr << ">>>>> Iteration: " << i << " Got message: " << msg.GetData().substr(0, 16) 
+                            Cerr << ">>>>> Iteration: " << i << " Got message: " << msg.GetData().substr(0, 16)
                                                         << " :: " << msg.DebugString(false) << Endl << Flush;
 
                             auto count = ++map[msg.GetData()];
@@ -281,12 +281,12 @@ namespace NKikimr::NPersQueueTests {
                             } else {
                                 if (map.size() == 1) {
                                     auto expectedOffset = firstOffset[i];
-                                    UNIT_ASSERT_EQUAL_C(msg.GetOffset(), expectedOffset, "Iteration: " << i 
-                                                                << " Expected first message offset " << expectedOffset 
+                                    UNIT_ASSERT_EQUAL_C(msg.GetOffset(), expectedOffset, "Iteration: " << i
+                                                                << " Expected first message offset " << expectedOffset
                                                                 << " but got " << msg.GetOffset());
                                 } else {
-                                    UNIT_ASSERT_C(lastOffset < msg.GetOffset(), "Iteration: " << i 
-                                                                << " unexpected offset order. Last offset " << lastOffset 
+                                    UNIT_ASSERT_C(lastOffset < msg.GetOffset(), "Iteration: " << i
+                                                                << " unexpected offset order. Last offset " << lastOffset
                                                                 << " Message offset " << msg.GetOffset());
                                 }
 
@@ -310,8 +310,8 @@ namespace NKikimr::NPersQueueTests {
 
                 if (i == 0) {
                     for (ui32 j = 1; j < ts.size(); ++j) {
-                        Cerr << ">>>>> Planed iteration: " << j 
-                             << ". Start reading from time: " << ts[j]  
+                        Cerr << ">>>>> Planed iteration: " << j
+                             << ". Start reading from time: " << ts[j]
                              << ". Expected first message offset: " << firstOffset[j]
                              << ". Expected message quantity: " << expectingQuantities[j] << Endl;
                     }
@@ -462,6 +462,8 @@ namespace NKikimr::NPersQueueTests {
                                       "topic.read.lag_milliseconds",
                                       "topic.write.bytes",
                                       "topic.write.messages",
+                                      "topic.write.discarded_bytes",
+                                      "topic.write.discarded_messages",
                                       "api.grpc.topic.stream_write.bytes",
                                       "topic.write.partition_throttled_milliseconds",
                                       "topic.write.message_size_bytes",
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 7770a5d41436..cefeba6bacdd 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -3681,6 +3681,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
                           {
                               "BytesWrittenOriginal",
                               "CompactedBytesWrittenOriginal",
+                              "DiscardedBytes",
+                              "DiscardedMessages",
                               "MessagesWrittenOriginal",
                               "UncompressedBytesWrittenOriginal"
                           },