Skip to content

Commit

Permalink
Merge Deduplicated messages sensors: LOGBROKER-8733 (#3147) (#5328)
Browse files Browse the repository at this point in the history
  • Loading branch information
FloatingCrowbar authored Jun 14, 2024
1 parent 7487d61 commit 309333d
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 22 deletions.
3 changes: 3 additions & 0 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
NKikimr::NPQ::TMultiCounter MsgsWrittenTotal;
NKikimr::NPQ::TMultiCounter MsgsWrittenGrpc;;

NKikimr::NPQ::TMultiCounter MsgsDiscarded;
NKikimr::NPQ::TMultiCounter BytesDiscarded;

// Writing blob with topic quota variables
ui64 TopicQuotaRequestCookie = 0;

Expand Down
12 changes: 11 additions & 1 deletion ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,10 @@ void TPartition::SetupTopicCounters(const TActorContext& ctx) {
BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"UncompressedBytesWritten" + suffix}, true);
BytesWrittenComp = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"CompactedBytesWritten" + suffix}, true);
MsgsWrittenTotal = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"MessagesWritten" + suffix}, true);
if (IsLocalDC) {
MsgsDiscarded = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"DiscardedMessages"}, true);
BytesDiscarded = NKikimr::NPQ::TMultiCounter(subGroup, labels, {}, {"DiscardedBytes"}, true);
}

TVector<NPersQueue::TPQLabelsInfo> aggr = {{{{"Account", TopicConverter->GetAccount()}}, {"total"}}};
ui32 border = AppData(ctx)->PQConfig.GetWriteLatencyBigMs();
Expand Down Expand Up @@ -872,8 +876,14 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
{"topic.write.messages"}, true, "name");


BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
MsgsDiscarded = NKikimr::NPQ::TMultiCounter(
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
{"topic.write.discarded_messages"}, true, "name");
BytesDiscarded = NKikimr::NPQ::TMultiCounter(
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
{"topic.write.discarded_bytes"} , true, "name");

BytesWrittenUncompressed = NKikimr::NPQ::TMultiCounter(
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups,
{"topic.write.uncompressed_bytes"}, true, "name");

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,9 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
);

TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ALREADY].Increment(1);
MsgsDiscarded.Inc();
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ALREADY].Increment(p.Msg.Data.size());
BytesDiscarded.Inc(p.Msg.Data.size());
} else {
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_SMALL_OFFSET].Increment(1);
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_SMALL_OFFSET].Increment(p.Msg.Data.size());
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/persqueue/ut/counters_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ Y_UNIT_TEST(Partition) {
CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true);
CmdWrite(0, "sourceid1", TestData(), tc, false);
CmdWrite(0, "sourceid2", TestData(), tc, false);
CmdWrite(0, "sourceid1", TestData(), tc, false);
CmdWrite(0, "sourceid2", TestData(), tc, false);
PQGetPartInfo(0, 30, tc);


Expand All @@ -93,15 +95,15 @@ Y_UNIT_TEST(Partition) {
dbGroup->OutputHtml(countersStr);
TString referenceCounters = NResource::Find(TStringBuf("counters_pqproxy.html"));

UNIT_ASSERT_EQUAL(countersStr.Str() + "\n", referenceCounters);
UNIT_ASSERT_VALUES_EQUAL(countersStr.Str() + "\n", referenceCounters);
}

{
auto counters = tc.Runtime->GetAppData(0).Counters;
auto dbGroup = GetServiceCounters(counters, "datastreams");
TStringStream countersStr;
dbGroup->OutputHtml(countersStr);
UNIT_ASSERT_EQUAL(countersStr.Str(), "<pre></pre>");
UNIT_ASSERT_VALUES_EQUAL(countersStr.Str(), "<pre></pre>");
}
}

Expand All @@ -116,6 +118,7 @@ Y_UNIT_TEST(PartitionFirstClass) {
CmdWrite(0, "sourceid0", TestData(), tc, false, {}, true);
CmdWrite(0, "sourceid1", TestData(), tc, false);
CmdWrite(0, "sourceid2", TestData(), tc, false);
CmdWrite(0, "sourceid0", TestData(), tc, false);
PQGetPartInfo(0, 30, tc);

{
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/persqueue/ut/resources/counters_datastreams.html
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
name=api.grpc.topic.stream_write.bytes: 540
name=api.grpc.topic.stream_write.messages: 30
name=topic.write.bytes: 540
name=topic.write.discarded_bytes: 90
name=topic.write.discarded_messages: 10
name=topic.write.messages: 30
name=topic.write.uncompressed_bytes: 270

Expand Down Expand Up @@ -55,7 +57,7 @@
bin=99999999: 0

name=topic.write.partition_throttled_milliseconds:
bin=0: 30
bin=0: 40
bin=1: 0
bin=10: 0
bin=100: 0
Expand Down
28 changes: 19 additions & 9 deletions ydb/core/persqueue/ut/resources/counters_pqproxy.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
OriginDC=Dc1:

sensor=PartitionWriteQuotaWaitOriginal:
Interval=0ms: 30
Interval=0ms: 50
Interval=10000ms: 0
Interval=1000ms: 0
Interval=100ms: 0
Expand All @@ -67,7 +67,7 @@
OriginDC=cluster:

sensor=PartitionWriteQuotaWaitOriginal:
Interval=0ms: 30
Interval=0ms: 50
Interval=10000ms: 0
Interval=1000ms: 0
Interval=100ms: 0
Expand All @@ -88,7 +88,7 @@
OriginDC=cluster:

sensor=PartitionWriteQuotaWaitOriginal:
Interval=0ms: 30
Interval=0ms: 50
Interval=10000ms: 0
Interval=1000ms: 0
Interval=100ms: 0
Expand All @@ -111,7 +111,7 @@
OriginDC=cluster:

sensor=PartitionWriteQuotaWaitOriginal:
Interval=0ms: 30
Interval=0ms: 50
Interval=10000ms: 0
Interval=1000ms: 0
Interval=100ms: 0
Expand All @@ -136,7 +136,7 @@
OriginDC=cluster:

sensor=PartitionWriteQuotaWaitOriginal:
Interval=0ms: 30
Interval=0ms: 50
Interval=10000ms: 0
Interval=1000ms: 0
Interval=100ms: 0
Expand Down Expand Up @@ -470,17 +470,21 @@
TopicPath=asdfgs/topic:

ClientDC=Unknown:
sensor=BytesWrittenFromDC: 1560
sensor=BytesWrittenFromDC: 2600

OriginDC=Dc1:
sensor=BytesWrittenOriginal: 540
sensor=CompactedBytesWrittenOriginal: 747
sensor=DiscardedBytes: 180
sensor=DiscardedMessages: 20
sensor=MessagesWrittenOriginal: 30
sensor=UncompressedBytesWrittenOriginal: 270

OriginDC=cluster:
sensor=BytesWrittenOriginal: 540
sensor=CompactedBytesWrittenOriginal: 747
sensor=DiscardedBytes: 180
sensor=DiscardedMessages: 20
sensor=MessagesWrittenOriginal: 30
sensor=UncompressedBytesWrittenOriginal: 270

Expand All @@ -489,11 +493,13 @@
TopicPath=total:

ClientDC=Unknown:
sensor=BytesWrittenFromDC: 1560
sensor=BytesWrittenFromDC: 2600

OriginDC=cluster:
sensor=BytesWrittenOriginal: 540
sensor=CompactedBytesWrittenOriginal: 747
sensor=DiscardedBytes: 180
sensor=DiscardedMessages: 20
sensor=MessagesWrittenOriginal: 30
sensor=UncompressedBytesWrittenOriginal: 270

Expand All @@ -504,11 +510,13 @@
TopicPath=total:

ClientDC=Unknown:
sensor=BytesWrittenFromDC: 1560
sensor=BytesWrittenFromDC: 2600

OriginDC=cluster:
sensor=BytesWrittenOriginal: 540
sensor=CompactedBytesWrittenOriginal: 747
sensor=DiscardedBytes: 180
sensor=DiscardedMessages: 20
sensor=MessagesWrittenOriginal: 30
sensor=UncompressedBytesWrittenOriginal: 270

Expand All @@ -521,11 +529,13 @@
TopicPath=total:

ClientDC=Unknown:
sensor=BytesWrittenFromDC: 1560
sensor=BytesWrittenFromDC: 2600

OriginDC=cluster:
sensor=BytesWrittenOriginal: 540
sensor=CompactedBytesWrittenOriginal: 747
sensor=DiscardedBytes: 180
sensor=DiscardedMessages: 20
sensor=MessagesWrittenOriginal: 30
sensor=UncompressedBytesWrittenOriginal: 270

Expand Down
20 changes: 11 additions & 9 deletions ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ namespace NKikimr::NPersQueueTests {
Sleep(TDuration::MilliSeconds(10));
}

// Ts and firstOffset and expectingQuantities will be set in first iteration of reading by received messages.
// Ts and firstOffset and expectingQuantities will be set in first iteration of reading by received messages.
// Each will contains shifts from the message: before, equals and after.
// It allow check reading from different shift. First iteration read from zero.
TVector<TInstant> ts { TInstant::Zero() };
Expand Down Expand Up @@ -254,10 +254,10 @@ namespace NKikimr::NPersQueueTests {
ui32 lastOffset = 0;

settings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) mutable {
Cerr << ">>>>> Iteration: " << i << " TDataReceivedEvent: " << event.DebugString(false)
Cerr << ">>>>> Iteration: " << i << " TDataReceivedEvent: " << event.DebugString(false)
<< " size=" << event.GetMessages().size() << Endl << Flush;
for (const auto& msg : event.GetMessages()) {
Cerr << ">>>>> Iteration: " << i << " Got message: " << msg.GetData().substr(0, 16)
Cerr << ">>>>> Iteration: " << i << " Got message: " << msg.GetData().substr(0, 16)
<< " :: " << msg.DebugString(false) << Endl << Flush;

auto count = ++map[msg.GetData()];
Expand All @@ -281,12 +281,12 @@ namespace NKikimr::NPersQueueTests {
} else {
if (map.size() == 1) {
auto expectedOffset = firstOffset[i];
UNIT_ASSERT_EQUAL_C(msg.GetOffset(), expectedOffset, "Iteration: " << i
<< " Expected first message offset " << expectedOffset
UNIT_ASSERT_EQUAL_C(msg.GetOffset(), expectedOffset, "Iteration: " << i
<< " Expected first message offset " << expectedOffset
<< " but got " << msg.GetOffset());
} else {
UNIT_ASSERT_C(lastOffset < msg.GetOffset(), "Iteration: " << i
<< " unexpected offset order. Last offset " << lastOffset
UNIT_ASSERT_C(lastOffset < msg.GetOffset(), "Iteration: " << i
<< " unexpected offset order. Last offset " << lastOffset
<< " Message offset " << msg.GetOffset());
}

Expand All @@ -310,8 +310,8 @@ namespace NKikimr::NPersQueueTests {

if (i == 0) {
for (ui32 j = 1; j < ts.size(); ++j) {
Cerr << ">>>>> Planed iteration: " << j
<< ". Start reading from time: " << ts[j]
Cerr << ">>>>> Planed iteration: " << j
<< ". Start reading from time: " << ts[j]
<< ". Expected first message offset: " << firstOffset[j]
<< ". Expected message quantity: " << expectingQuantities[j] << Endl;
}
Expand Down Expand Up @@ -462,6 +462,8 @@ namespace NKikimr::NPersQueueTests {
"topic.read.lag_milliseconds",
"topic.write.bytes",
"topic.write.messages",
"topic.write.discarded_bytes",
"topic.write.discarded_messages",
"api.grpc.topic.stream_write.bytes",
"topic.write.partition_throttled_milliseconds",
"topic.write.message_size_bytes",
Expand Down
2 changes: 2 additions & 0 deletions ydb/services/persqueue_v1/persqueue_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3676,6 +3676,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
{
"BytesWrittenOriginal",
"CompactedBytesWrittenOriginal",
"DiscardedBytes",
"DiscardedMessages",
"MessagesWrittenOriginal",
"UncompressedBytesWrittenOriginal"
},
Expand Down

0 comments on commit 309333d

Please sign in to comment.