Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 70 additions & 16 deletions ydb/core/load_test/group_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,50 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
NKikimrBlobStorage::EPutHandleClass PutHandleClass;
};

using TContentType = NKikimr::TEvLoadTestRequest::TStorageLoad::TContentType::E;
using EContentType = NKikimr::TEvLoadTestRequest::TStorageLoad::TContentType;

static TSharedData GenerateBuffer(const TLogoBlobID& blobId, TContentType contentType,
TSharedData* pregenerated = nullptr) {
switch (contentType) {
case EContentType::Random: {
if (!pregenerated || blobId.BlobSize() > pregenerated->size()) {
return FastGenDataForLZ4<TSharedData>(blobId.BlobSize());
}
TSharedData buffer(*pregenerated);
buffer.TrimBack(blobId.BlobSize());
return buffer;
}
case EContentType::Validated: {
TSharedData buffer = TSharedData::Uninitialized(blobId.BlobSize());
char* data = buffer.Detach();
const char* src = reinterpret_cast<const char*>(&blobId);
for (ui32 pos = 0; pos < blobId.BlobSize(); pos += sizeof(TLogoBlobID)) {
memcpy(data + pos, src, std::min(static_cast<ui32>(sizeof(TLogoBlobID)),
blobId.BlobSize() - pos));
}
return buffer;
}
}
}

static bool ValidateBuffer(const TLogoBlobID& blobId, const char* buffer, TContentType contentType) {
switch (contentType) {
case EContentType::Random: {
return true;
}
case EContentType::Validated: {
const char* reference = reinterpret_cast<const char*>(&blobId);
for (ui32 i : xrange(blobId.BlobSize())) {
if (buffer[i] != reference[i % sizeof(TLogoBlobID)]) {
return false;
}
}
return true;
}
}
}

struct TInFlightTracker {
public:
TInFlightTracker(ui32 maxRequestsInFlight = 0, ui64 maxBytesInFlight = 0)
Expand Down Expand Up @@ -183,11 +227,13 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
}
}

std::unique_ptr<TEvBlobStorage::TEvPut> MakePutMessage(ui64 tabletId, ui32 gen, ui32 step, ui32 channel) {
std::unique_ptr<TEvBlobStorage::TEvPut> MakePutMessage(ui64 tabletId, ui32 gen, ui32 step, ui32 channel,
TContentType contentType) {
Y_DEBUG_ABORT_UNLESS(CanSendRequest());
ui32 blobSize = SizeGenerator.Generate();
const TLogoBlobID id(tabletId, gen, step, channel, blobSize, BlobCookie++);
const TSharedData buffer = FastGenDataForLZ4<TSharedData>(id.BlobSize());
const TSharedData buffer = GenerateBuffer(id, contentType);

auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), PutHandleClass);
InFlightTracker.Request(blobSize);
return std::move(ev);
Expand Down Expand Up @@ -395,6 +441,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
ui32 Cookie;
ui32 GroupBlockRetries;
const ui32 GroupId;
TContentType ContentType;

// Writes
const NKikimrBlobStorage::EPutHandleClass PutHandleClass;
Expand Down Expand Up @@ -466,7 +513,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
public:
TTabletWriter(TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
TLogWriterLoadTestActor& self, ui64 tabletId, ui32 channel,
TMaybe<ui32> generation, ui32 groupId,
TMaybe<ui32> generation, ui32 groupId, TContentType contentType,
NKikimrBlobStorage::EPutHandleClass putHandleClass, const TRequestDispatchingSettings& writeSettings,
NKikimrBlobStorage::EGetHandleClass getHandleClass, const TRequestDispatchingSettings& readSettings,
TIntervalGenerator garbageCollectIntervalGen,
Expand All @@ -484,6 +531,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
, Cookie(1)
, GroupBlockRetries(3)
, GroupId(groupId)
, ContentType(contentType)
, PutHandleClass(putHandleClass)
, WriteSettings(writeSettings)
, MegabytesPerSecondST(TDuration::Seconds(3)) // average speed at last 3 seconds
Expand Down Expand Up @@ -591,7 +639,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
const ui32 size = 1;
const ui32 lastStep = Max<ui32>();
const TLogoBlobID id(TabletId, Generation, lastStep, Channel, size, 0);
const TSharedData buffer = Self.GenerateBuffer(id);
const TSharedData buffer = GenerateBuffer(id, ContentType, &Self.BlobData);
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), PutHandleClass);

auto callback = [this] (IEventBase *event, const TActorContext& ctx) {
Expand Down Expand Up @@ -652,7 +700,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
}

void IssueInitialPut(const TActorContext& ctx) {
auto ev = InitialAllocation.MakePutMessage(TabletId, Generation, GarbageCollectStep, Channel);
auto ev = InitialAllocation.MakePutMessage(TabletId, Generation, GarbageCollectStep, Channel, ContentType);

auto callback = [this](IEventBase *event, const TActorContext& ctx) {
auto *res = dynamic_cast<TEvBlobStorage::TEvPutResult*>(event);
Expand Down Expand Up @@ -914,7 +962,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
putHandleClass = PutHandleClass;
}
const TLogoBlobID id(TabletId, Generation, WriteStep, Channel, size, Cookie);
const TSharedData buffer = Self.GenerateBuffer(id);
const TSharedData buffer = GenerateBuffer(id, ContentType, &Self.BlobData);
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, buffer, TInstant::Max(), putHandleClass);
const ui64 writeQueryId = ++WriteQueryId;

Expand Down Expand Up @@ -1104,6 +1152,17 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
auto *res = dynamic_cast<TEvBlobStorage::TEvGetResult*>(event);
Y_ABORT_UNLESS(res);

if (ContentType != EContentType::Random) {
for (ui32 i : xrange(res->ResponseSz)) {
TEvBlobStorage::TEvGetResult::TResponse response = res->Responses[i];
TString buffer = response.Buffer.ConvertToString();
if (!ValidateBuffer(response.Id, buffer.data(), ContentType)) {
LOG_ERROR_S(ctx, NKikimrServices::BS_LOAD_TEST,
"Data corruption detected, BlobId# " << response.Id.ToString());
}
}
}

ReadSettings.DelayManager->CountResponse();
if (!CheckStatus(ctx, res, {NKikimrProto::EReplyStatus::OK})) {
return;
Expand Down Expand Up @@ -1201,6 +1260,10 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
bool enableWrites = profile.WriteSizesSize() && profile.GetPutHandleClass() &&
(profile.WriteIntervalsSize() || profile.HasWriteHardRateDispatcher());

TContentType contentType = profile.HasContentType()
? profile.GetContentType()
: EContentType::Random;

TInitialAllocation initialAllocation;
if (profile.HasInitialAllocation()) {
auto initialAllocationProto = profile.GetInitialAllocation();
Expand Down Expand Up @@ -1316,7 +1379,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo

TabletWriters.emplace_back(std::make_unique<TTabletWriter>(counters, *this, tabletId,
tablet.GetChannel(), tablet.HasGeneration() ? TMaybe<ui32>(tablet.GetGeneration()) : TMaybe<ui32>(),
tablet.GetGroupId(), putHandleClass, writeSettings,
tablet.GetGroupId(), contentType, putHandleClass, writeSettings,
getHandleClass, readSettings,
garbageCollectIntervalGen,
scriptedRoundDuration, std::move(scriptedRequests),
Expand Down Expand Up @@ -1527,15 +1590,6 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
Y_ABORT("TEvUndelivered# 0x%08" PRIx32 " ActorId# %s", ev->Get()->SourceType, ev->Sender.ToString().data());
}

TSharedData GenerateBuffer(const TLogoBlobID& id) const {
if (id.BlobSize() > BlobData.size())
return FastGenDataForLZ4<TSharedData>(id.BlobSize());
Y_ABORT_UNLESS(id.BlobSize() <= BlobData.size());
TSharedData data(BlobData);
data.TrimBack(id.BlobSize());
return data;
}

STRICT_STFUNC(StateFunc,
CFunc(EvStopTest, HandleStopTest);
CFunc(EvUpdateQuantile, HandleUpdateQuantile);
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/protos/load_test.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ message TEvLoadTestRequest {
optional double Weight = 3;
}
message TStorageLoad {
message TContentType {
enum E {
Random = 1;
Validated = 2;
}
}
message TInitialBlobAllocation {
oneof DataSize {
uint64 TotalSize = 1;
Expand Down Expand Up @@ -91,6 +97,7 @@ message TEvLoadTestRequest {
optional uint32 TracingThrottlerBurst = 19 [default = 0];

optional uint32 NumberOfRandomGroupsToPick = 20 [default = 0];
optional TContentType.E ContentType = 21;
};
optional uint64 Tag = 1;
optional uint32 DurationSeconds = 2;
Expand Down
Loading