diff --git a/ydb/core/load_test/group_write.cpp b/ydb/core/load_test/group_write.cpp index d3eac05e732c..6b062b3fc699 100644 --- a/ydb/core/load_test/group_write.cpp +++ b/ydb/core/load_test/group_write.cpp @@ -90,6 +90,50 @@ class TLogWriterLoadTestActor : public TActorBootstrapped pregenerated->size()) { + return FastGenDataForLZ4(blobId.BlobSize()); + } + TSharedData buffer(*pregenerated); + buffer.TrimBack(blobId.BlobSize()); + return buffer; + } + case EContentType::Validated: { + TSharedData buffer = TSharedData::Uninitialized(blobId.BlobSize()); + char* data = buffer.Detach(); + const char* src = reinterpret_cast(&blobId); + for (ui32 pos = 0; pos < blobId.BlobSize(); pos += sizeof(TLogoBlobID)) { + memcpy(data + pos, src, std::min(static_cast(sizeof(TLogoBlobID)), + blobId.BlobSize() - pos)); + } + return buffer; + } + } + } + + static bool ValidateBuffer(const TLogoBlobID& blobId, const char* buffer, TContentType contentType) { + switch (contentType) { + case EContentType::Random: { + return true; + } + case EContentType::Validated: { + const char* reference = reinterpret_cast(&blobId); + for (ui32 i : xrange(blobId.BlobSize())) { + if (buffer[i] != reference[i % sizeof(TLogoBlobID)]) { + return false; + } + } + return true; + } + } + } + struct TInFlightTracker { public: TInFlightTracker(ui32 maxRequestsInFlight = 0, ui64 maxBytesInFlight = 0) @@ -183,11 +227,13 @@ class TLogWriterLoadTestActor : public TActorBootstrapped MakePutMessage(ui64 tabletId, ui32 gen, ui32 step, ui32 channel) { + std::unique_ptr MakePutMessage(ui64 tabletId, ui32 gen, ui32 step, ui32 channel, + TContentType contentType) { Y_DEBUG_ABORT_UNLESS(CanSendRequest()); ui32 blobSize = SizeGenerator.Generate(); const TLogoBlobID id(tabletId, gen, step, channel, blobSize, BlobCookie++); - const TSharedData buffer = FastGenDataForLZ4(id.BlobSize()); + const TSharedData buffer = GenerateBuffer(id, contentType); + auto ev = std::make_unique(id, buffer, TInstant::Max(), PutHandleClass); InFlightTracker.Request(blobSize); return std::move(ev); @@ -395,6 +441,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped counters, TLogWriterLoadTestActor& self, ui64 tabletId, ui32 channel, - TMaybe generation, ui32 groupId, + TMaybe generation, ui32 groupId, TContentType contentType, NKikimrBlobStorage::EPutHandleClass putHandleClass, const TRequestDispatchingSettings& writeSettings, NKikimrBlobStorage::EGetHandleClass getHandleClass, const TRequestDispatchingSettings& readSettings, TIntervalGenerator garbageCollectIntervalGen, @@ -484,6 +531,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped(); const TLogoBlobID id(TabletId, Generation, lastStep, Channel, size, 0); - const TSharedData buffer = Self.GenerateBuffer(id); + const TSharedData buffer = GenerateBuffer(id, ContentType, &Self.BlobData); auto ev = std::make_unique(id, buffer, TInstant::Max(), PutHandleClass); auto callback = [this] (IEventBase *event, const TActorContext& ctx) { @@ -652,7 +700,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped(event); @@ -914,7 +962,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped(id, buffer, TInstant::Max(), putHandleClass); const ui64 writeQueryId = ++WriteQueryId; @@ -1104,6 +1152,17 @@ class TLogWriterLoadTestActor : public TActorBootstrapped(event); Y_ABORT_UNLESS(res); + if (ContentType != EContentType::Random) { + for (ui32 i : xrange(res->ResponseSz)) { + TEvBlobStorage::TEvGetResult::TResponse response = res->Responses[i]; + TString buffer = response.Buffer.ConvertToString(); + if (!ValidateBuffer(response.Id, buffer.data(), ContentType)) { + LOG_ERROR_S(ctx, NKikimrServices::BS_LOAD_TEST, + "Data corruption detected, BlobId# " << response.Id.ToString()); + } + } + } + ReadSettings.DelayManager->CountResponse(); if (!CheckStatus(ctx, res, {NKikimrProto::EReplyStatus::OK})) { return; @@ -1201,6 +1260,10 @@ class TLogWriterLoadTestActor : public TActorBootstrapped(counters, *this, tabletId, tablet.GetChannel(), tablet.HasGeneration() ? TMaybe(tablet.GetGeneration()) : TMaybe(), - tablet.GetGroupId(), putHandleClass, writeSettings, + tablet.GetGroupId(), contentType, putHandleClass, writeSettings, getHandleClass, readSettings, garbageCollectIntervalGen, scriptedRoundDuration, std::move(scriptedRequests), @@ -1527,15 +1590,6 @@ class TLogWriterLoadTestActor : public TActorBootstrappedGet()->SourceType, ev->Sender.ToString().data()); } - TSharedData GenerateBuffer(const TLogoBlobID& id) const { - if (id.BlobSize() > BlobData.size()) - return FastGenDataForLZ4(id.BlobSize()); - Y_ABORT_UNLESS(id.BlobSize() <= BlobData.size()); - TSharedData data(BlobData); - data.TrimBack(id.BlobSize()); - return data; - } - STRICT_STFUNC(StateFunc, CFunc(EvStopTest, HandleStopTest); CFunc(EvUpdateQuantile, HandleUpdateQuantile); diff --git a/ydb/core/protos/load_test.proto b/ydb/core/protos/load_test.proto index 07e523ce0546..a9d2c1b83671 100644 --- a/ydb/core/protos/load_test.proto +++ b/ydb/core/protos/load_test.proto @@ -31,6 +31,12 @@ message TEvLoadTestRequest { optional double Weight = 3; } message TStorageLoad { + message TContentType { + enum E { + Random = 1; + Validated = 2; + } + } message TInitialBlobAllocation { oneof DataSize { uint64 TotalSize = 1; @@ -91,6 +97,7 @@ message TEvLoadTestRequest { optional uint32 TracingThrottlerBurst = 19 [default = 0]; optional uint32 NumberOfRandomGroupsToPick = 20 [default = 0]; + optional TContentType.E ContentType = 21; }; optional uint64 Tag = 1; optional uint32 DurationSeconds = 2;