Skip to content

Commit e89acca

Browse files
authored
Add optional data validation to Storage LoadActor (#27453)
1 parent 19643b6 commit e89acca

File tree

2 files changed

+77
-16
lines changed

2 files changed

+77
-16
lines changed

ydb/core/load_test/group_write.cpp

Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,50 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
9090
NKikimrBlobStorage::EPutHandleClass PutHandleClass;
9191
};
9292

93+
using TContentType = NKikimr::TEvLoadTestRequest::TStorageLoad::TContentType::E;
94+
using EContentType = NKikimr::TEvLoadTestRequest::TStorageLoad::TContentType;
95+
96+
static TSharedData GenerateBuffer(const TLogoBlobID& blobId, TContentType contentType,
97+
TSharedData* pregenerated = nullptr) {
98+
switch (contentType) {
99+
case EContentType::Random: {
100+
if (!pregenerated || blobId.BlobSize() > pregenerated->size()) {
101+
return FastGenDataForLZ4<TSharedData>(blobId.BlobSize());
102+
}
103+
TSharedData buffer(*pregenerated);
104+
buffer.TrimBack(blobId.BlobSize());
105+
return buffer;
106+
}
107+
case EContentType::Validated: {
108+
TSharedData buffer = TSharedData::Uninitialized(blobId.BlobSize());
109+
char* data = buffer.Detach();
110+
const char* src = reinterpret_cast<const char*>(&blobId);
111+
for (ui32 pos = 0; pos < blobId.BlobSize(); pos += sizeof(TLogoBlobID)) {
112+
memcpy(data + pos, src, std::min(static_cast<ui32>(sizeof(TLogoBlobID)),
113+
blobId.BlobSize() - pos));
114+
}
115+
return buffer;
116+
}
117+
}
118+
}
119+
120+
static bool ValidateBuffer(const TLogoBlobID& blobId, const char* buffer, TContentType contentType) {
121+
switch (contentType) {
122+
case EContentType::Random: {
123+
return true;
124+
}
125+
case EContentType::Validated: {
126+
const char* reference = reinterpret_cast<const char*>(&blobId);
127+
for (ui32 i : xrange(blobId.BlobSize())) {
128+
if (buffer[i] != reference[i % sizeof(TLogoBlobID)]) {
129+
return false;
130+
}
131+
}
132+
return true;
133+
}
134+
}
135+
}
136+
93137
struct TInFlightTracker {
94138
public:
95139
TInFlightTracker(ui32 maxRequestsInFlight = 0, ui64 maxBytesInFlight = 0)
@@ -183,11 +227,13 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
183227
}
184228
}
185229

186-
std::unique_ptr<TEvBlobStorage::TEvPut> MakePutMessage(ui64 tabletId, ui32 gen, ui32 step, ui32 channel) {
230+
std::unique_ptr<TEvBlobStorage::TEvPut> MakePutMessage(ui64 tabletId, ui32 gen, ui32 step, ui32 channel,
231+
TContentType contentType) {
187232
Y_DEBUG_ABORT_UNLESS(CanSendRequest());
188233
ui32 blobSize = SizeGenerator.Generate();
189234
const TLogoBlobID id(tabletId, gen, step, channel, blobSize, BlobCookie++);
190-
const TSharedData buffer = FastGenDataForLZ4<TSharedData>(id.BlobSize());
235+
const TSharedData buffer = GenerateBuffer(id, contentType);
236+
191237
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), PutHandleClass);
192238
InFlightTracker.Request(blobSize);
193239
return std::move(ev);
@@ -395,6 +441,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
395441
ui32 Cookie;
396442
ui32 GroupBlockRetries;
397443
const ui32 GroupId;
444+
TContentType ContentType;
398445

399446
// Writes
400447
const NKikimrBlobStorage::EPutHandleClass PutHandleClass;
@@ -466,7 +513,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
466513
public:
467514
TTabletWriter(TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
468515
TLogWriterLoadTestActor& self, ui64 tabletId, ui32 channel,
469-
TMaybe<ui32> generation, ui32 groupId,
516+
TMaybe<ui32> generation, ui32 groupId, TContentType contentType,
470517
NKikimrBlobStorage::EPutHandleClass putHandleClass, const TRequestDispatchingSettings& writeSettings,
471518
NKikimrBlobStorage::EGetHandleClass getHandleClass, const TRequestDispatchingSettings& readSettings,
472519
TIntervalGenerator garbageCollectIntervalGen,
@@ -484,6 +531,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
484531
, Cookie(1)
485532
, GroupBlockRetries(3)
486533
, GroupId(groupId)
534+
, ContentType(contentType)
487535
, PutHandleClass(putHandleClass)
488536
, WriteSettings(writeSettings)
489537
, MegabytesPerSecondST(TDuration::Seconds(3)) // average speed at last 3 seconds
@@ -591,7 +639,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
591639
const ui32 size = 1;
592640
const ui32 lastStep = Max<ui32>();
593641
const TLogoBlobID id(TabletId, Generation, lastStep, Channel, size, 0);
594-
const TSharedData buffer = Self.GenerateBuffer(id);
642+
const TSharedData buffer = GenerateBuffer(id, ContentType, &Self.BlobData);
595643
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), PutHandleClass);
596644

597645
auto callback = [this] (IEventBase *event, const TActorContext& ctx) {
@@ -652,7 +700,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
652700
}
653701

654702
void IssueInitialPut(const TActorContext& ctx) {
655-
auto ev = InitialAllocation.MakePutMessage(TabletId, Generation, GarbageCollectStep, Channel);
703+
auto ev = InitialAllocation.MakePutMessage(TabletId, Generation, GarbageCollectStep, Channel, ContentType);
656704

657705
auto callback = [this](IEventBase *event, const TActorContext& ctx) {
658706
auto *res = dynamic_cast<TEvBlobStorage::TEvPutResult*>(event);
@@ -914,7 +962,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
914962
putHandleClass = PutHandleClass;
915963
}
916964
const TLogoBlobID id(TabletId, Generation, WriteStep, Channel, size, Cookie);
917-
const TSharedData buffer = Self.GenerateBuffer(id);
965+
const TSharedData buffer = GenerateBuffer(id, ContentType, &Self.BlobData);
918966
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), putHandleClass);
919967
const ui64 writeQueryId = ++WriteQueryId;
920968

@@ -1104,6 +1152,17 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
11041152
auto *res = dynamic_cast<TEvBlobStorage::TEvGetResult*>(event);
11051153
Y_ABORT_UNLESS(res);
11061154

1155+
if (ContentType != EContentType::Random) {
1156+
for (ui32 i : xrange(res->ResponseSz)) {
1157+
TEvBlobStorage::TEvGetResult::TResponse response = res->Responses[i];
1158+
TString buffer = response.Buffer.ConvertToString();
1159+
if (!ValidateBuffer(response.Id, buffer.data(), ContentType)) {
1160+
LOG_ERROR_S(ctx, NKikimrServices::BS_LOAD_TEST,
1161+
"Data corruption detected, BlobId# " << response.Id.ToString());
1162+
}
1163+
}
1164+
}
1165+
11071166
ReadSettings.DelayManager->CountResponse();
11081167
if (!CheckStatus(ctx, res, {NKikimrProto::EReplyStatus::OK})) {
11091168
return;
@@ -1201,6 +1260,10 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
12011260
bool enableWrites = profile.WriteSizesSize() && profile.GetPutHandleClass() &&
12021261
(profile.WriteIntervalsSize() || profile.HasWriteHardRateDispatcher());
12031262

1263+
TContentType contentType = profile.HasContentType()
1264+
? profile.GetContentType()
1265+
: EContentType::Random;
1266+
12041267
TInitialAllocation initialAllocation;
12051268
if (profile.HasInitialAllocation()) {
12061269
auto initialAllocationProto = profile.GetInitialAllocation();
@@ -1316,7 +1379,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
13161379

13171380
TabletWriters.emplace_back(std::make_unique<TTabletWriter>(counters, *this, tabletId,
13181381
tablet.GetChannel(), tablet.HasGeneration() ? TMaybe<ui32>(tablet.GetGeneration()) : TMaybe<ui32>(),
1319-
tablet.GetGroupId(), putHandleClass, writeSettings,
1382+
tablet.GetGroupId(), contentType, putHandleClass, writeSettings,
13201383
getHandleClass, readSettings,
13211384
garbageCollectIntervalGen,
13221385
scriptedRoundDuration, std::move(scriptedRequests),
@@ -1527,15 +1590,6 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
15271590
Y_ABORT("TEvUndelivered# 0x%08" PRIx32 " ActorId# %s", ev->Get()->SourceType, ev->Sender.ToString().data());
15281591
}
15291592

1530-
TSharedData GenerateBuffer(const TLogoBlobID& id) const {
1531-
if (id.BlobSize() > BlobData.size())
1532-
return FastGenDataForLZ4<TSharedData>(id.BlobSize());
1533-
Y_ABORT_UNLESS(id.BlobSize() <= BlobData.size());
1534-
TSharedData data(BlobData);
1535-
data.TrimBack(id.BlobSize());
1536-
return data;
1537-
}
1538-
15391593
STRICT_STFUNC(StateFunc,
15401594
CFunc(EvStopTest, HandleStopTest);
15411595
CFunc(EvUpdateQuantile, HandleUpdateQuantile);

ydb/core/protos/load_test.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ message TEvLoadTestRequest {
3131
optional double Weight = 3;
3232
}
3333
message TStorageLoad {
34+
message TContentType {
35+
enum E {
36+
Random = 1;
37+
Validated = 2;
38+
}
39+
}
3440
message TInitialBlobAllocation {
3541
oneof DataSize {
3642
uint64 TotalSize = 1;
@@ -91,6 +97,7 @@ message TEvLoadTestRequest {
9197
optional uint32 TracingThrottlerBurst = 19 [default = 0];
9298

9399
optional uint32 NumberOfRandomGroupsToPick = 20 [default = 0];
100+
optional TContentType.E ContentType = 21;
94101
};
95102
optional uint64 Tag = 1;
96103
optional uint32 DurationSeconds = 2;

0 commit comments

Comments
 (0)