diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index a30b8ddfa376..53771041766a 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -123,7 +123,7 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl { , RequestType(requestType) , InternalCall(internalCall) { - if (token) { + if (token && !token->empty()) { InternalToken = new NACLib::TUserToken(*token); } } diff --git a/ydb/core/persqueue/blob.cpp b/ydb/core/persqueue/blob.cpp index 9a2c6d2f70c4..80027c17577b 100644 --- a/ydb/core/persqueue/blob.cpp +++ b/ydb/core/persqueue/blob.cpp @@ -24,7 +24,7 @@ TBlobIterator::TBlobIterator(const TKey& key, const TString& blob) void TBlobIterator::ParseBatch() { Y_ABORT_UNLESS(Data < End); Header = ExtractHeader(Data, End - Data); - Y_ABORT_UNLESS(Header.GetOffset() == Offset); + //Y_ABORT_UNLESS(Header.GetOffset() == Offset); Count += Header.GetCount(); Offset += Header.GetCount(); InternalPartsCount += Header.GetInternalPartsCount(); @@ -686,6 +686,13 @@ ui32 THead::FindPos(const ui64 offset, const ui16 partNo) const { return i - 1; } +TPartitionedBlob::TRenameFormedBlobInfo::TRenameFormedBlobInfo(const TKey& oldKey, const TKey& newKey, ui32 size) : + OldKey(oldKey), + NewKey(newKey), + Size(size) +{ +} + TPartitionedBlob& TPartitionedBlob::operator=(const TPartitionedBlob& x) { Partition = x.Partition; @@ -737,7 +744,8 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionedBlob& x) {} TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts, - const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize) + const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize, + const ui16 nextPartNo) : Partition(partition) , Offset(offset) , InternalPartsCount(0) @@ -747,7 +755,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 off , SeqNo(seqNo) , TotalParts(totalParts) , TotalSize(totalSize) - , NextPartNo(0) + , NextPartNo(nextPartNo) , HeadPartNo(0) , BlobsSize(0) , Head(head) @@ -773,7 +781,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 off if (HeadSize == 0) { StartOffset = offset; NewHead.Offset = offset; - Y_ABORT_UNLESS(StartPartNo == 0); + //Y_ABORT_UNLESS(StartPartNo == 0); } } @@ -804,58 +812,93 @@ TString TPartitionedBlob::CompactHead(bool glueHead, THead& head, bool glueNewHe return valueD; } -std::optional> TPartitionedBlob::Add(TClientBlob&& blob) +auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optional +{ + HeadPartNo = NextPartNo; + ui32 count = (GlueHead ? Head.GetCount() : 0) + (GlueNewHead ? NewHead.GetCount() : 0); + + Y_ABORT_UNLESS(Offset >= (GlueHead ? Head.Offset : NewHead.Offset)); + + Y_ABORT_UNLESS(NewHead.GetNextOffset() >= (GlueHead ? Head.Offset : NewHead.Offset)); + + TKey tmpKey(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false); + TKey dataKey(TKeyPrefix::TypeData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false); + + StartOffset = Offset; + StartPartNo = NextPartNo; + InternalPartsCount = 0; + + TString valueD = CompactHead(GlueHead, Head, GlueNewHead, NewHead, HeadSize + BlobsSize + (BlobsSize > 0 ? GetMaxHeaderSize() : 0)); + + GlueHead = GlueNewHead = false; + if (!Blobs.empty()) { + TBatch batch{Offset, Blobs.front().GetPartNo(), std::move(Blobs)}; + Blobs.clear(); + batch.Pack(); + Y_ABORT_UNLESS(batch.Packed); + batch.SerializeTo(valueD); + } + + Y_ABORT_UNLESS(valueD.size() <= MaxBlobSize && (valueD.size() + size + 1_MB > MaxBlobSize || HeadSize + BlobsSize + size + GetMaxHeaderSize() <= MaxBlobSize)); + HeadSize = 0; + BlobsSize = 0; + TClientBlob::CheckBlob(tmpKey, valueD); + if (useRename) { + FormedBlobs.emplace_back(tmpKey, dataKey, valueD.size()); + } + Blobs.clear(); + + return {{useRename ? tmpKey : dataKey, valueD}}; +} + +auto TPartitionedBlob::Add(TClientBlob&& blob) -> std::optional { Y_ABORT_UNLESS(NewHead.Offset >= Head.Offset); ui32 size = blob.GetBlobSize(); Y_ABORT_UNLESS(InternalPartsCount < 1000); //just check for future packing - if (HeadSize + BlobsSize + size + GetMaxHeaderSize() > MaxBlobSize) + if (HeadSize + BlobsSize + size + GetMaxHeaderSize() > MaxBlobSize) { NeedCompactHead = true; + } if (HeadSize + BlobsSize == 0) { //if nothing to compact at all NeedCompactHead = false; } - std::optional> res; + std::optional res; if (NeedCompactHead) { // need form blob without last chunk, on start or in case of big head NeedCompactHead = false; - HeadPartNo = NextPartNo; - ui32 count = (GlueHead ? Head.GetCount() : 0) + (GlueNewHead ? NewHead.GetCount() : 0); - - Y_ABORT_UNLESS(Offset >= (GlueHead ? Head.Offset : NewHead.Offset)); - - Y_ABORT_UNLESS(NewHead.GetNextOffset() >= (GlueHead ? Head.Offset : NewHead.Offset)); - - TKey key(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false); - - StartOffset = Offset; - StartPartNo = NextPartNo; - InternalPartsCount = 0; - - TString valueD = CompactHead(GlueHead, Head, GlueNewHead, NewHead, HeadSize + BlobsSize + (BlobsSize > 0 ? GetMaxHeaderSize() : 0)); - - GlueHead = GlueNewHead = false; - if (!Blobs.empty()) { - TBatch batch{Offset, Blobs.front().GetPartNo(), std::move(Blobs)}; - Blobs.clear(); - batch.Pack(); - Y_ABORT_UNLESS(batch.Packed); - batch.SerializeTo(valueD); - } - - Y_ABORT_UNLESS(valueD.size() <= MaxBlobSize && (valueD.size() + size + 1_MB > MaxBlobSize || HeadSize + BlobsSize + size + GetMaxHeaderSize() <= MaxBlobSize)); - HeadSize = 0; - BlobsSize = 0; - TClientBlob::CheckBlob(key, valueD); - FormedBlobs.emplace_back(key, valueD.size()); - Blobs.clear(); - - res = {key, valueD}; + res = CreateFormedBlob(size, true); } BlobsSize += size + GetMaxHeaderSize(); ++NextPartNo; Blobs.push_back(blob); - if (!IsComplete()) + if (!IsComplete()) { ++InternalPartsCount; + } + return res; +} + +auto TPartitionedBlob::Add(const TKey& oldKey, ui32 size) -> std::optional +{ + std::optional res; + if (NeedCompactHead) { + NeedCompactHead = false; + GlueNewHead = false; + res = CreateFormedBlob(0, false); + } + + TKey newKey(TKeyPrefix::TypeData, + Partition, + NewHead.Offset + oldKey.GetOffset(), + oldKey.GetPartNo(), + oldKey.GetCount(), + oldKey.GetInternalPartsCount(), + oldKey.IsHead()); + + FormedBlobs.emplace_back(oldKey, newKey, size); + + StartOffset += oldKey.GetCount(); + //NewHead.Offset += oldKey.GetOffset() + oldKey.GetCount(); + return res; } diff --git a/ydb/core/persqueue/blob.h b/ydb/core/persqueue/blob.h index 7a4b836a8994..24aa479a2eaa 100644 --- a/ydb/core/persqueue/blob.h +++ b/ydb/core/persqueue/blob.h @@ -84,6 +84,14 @@ struct TClientBlob { return PartData ? PartData->PartNo : 0; } + ui16 GetTotalParts() const { + return PartData ? PartData->TotalParts : 1; + } + + ui16 GetTotalSize() const { + return PartData ? PartData->TotalSize : UncompressedSize; + } + bool IsLastPart() const { return !PartData || PartData->PartNo + 1 == PartData->TotalParts; } @@ -184,7 +192,8 @@ struct TBatch { : Packed(true) , Header(header) , PackedData(data, header.GetPayloadSize()) - {} + { + } ui32 GetPackedSize() const { Y_ABORT_UNLESS(Packed); return sizeof(ui16) + PackedData.size() + Header.ByteSize(); } void Pack(); @@ -265,9 +274,16 @@ class TPartitionedBlob { TPartitionedBlob(const TPartitionedBlob& x); TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, - const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize); + const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize, + ui16 nextPartNo = 0); + + struct TFormedBlobInfo { + TKey Key; + TString Value; + }; - std::optional> Add(TClientBlob&& blob); + std::optional Add(TClientBlob&& blob); + std::optional Add(const TKey& key, ui32 size); bool IsInited() const { return !SourceId.empty(); } @@ -280,11 +296,21 @@ class TPartitionedBlob { bool IsNextPart(const TString& sourceId, const ui64 seqNo, const ui16 partNo, TString *reason) const; + struct TRenameFormedBlobInfo { + TRenameFormedBlobInfo() = default; + TRenameFormedBlobInfo(const TKey& oldKey, const TKey& newKey, ui32 size); + + TKey OldKey; + TKey NewKey; + ui32 Size; + }; + const std::deque& GetClientBlobs() const { return Blobs; } - const std::deque> GetFormedBlobs() const { return FormedBlobs; } + const std::deque& GetFormedBlobs() const { return FormedBlobs; } private: TString CompactHead(bool glueHead, THead& head, bool glueNewHead, THead& newHead, ui32 estimatedSize); + std::optional CreateFormedBlob(ui32 size, bool useRename); private: TPartitionId Partition; @@ -300,7 +326,7 @@ class TPartitionedBlob { ui16 HeadPartNo; std::deque Blobs; ui32 BlobsSize; - std::deque> FormedBlobs; + std::deque FormedBlobs; THead &Head; THead &NewHead; ui32 HeadSize; diff --git a/ydb/core/persqueue/key.cpp b/ydb/core/persqueue/key.cpp index c16449d90fcd..e4ad6f71d76d 100644 --- a/ydb/core/persqueue/key.cpp +++ b/ydb/core/persqueue/key.cpp @@ -22,4 +22,18 @@ TKey MakeKeyFromString(const TString& s, const TPartitionId& partition) t.IsHead()); } +bool TKeyPrefix::HasServiceType() const +{ + switch (*PtrType()) { + case ServiceTypeInfo: + case ServiceTypeData: + case ServiceTypeTmpData: + case ServiceTypeMeta: + case ServiceTypeTxMeta: + return true; + default: + return false; + } +} + } diff --git a/ydb/core/persqueue/key.h b/ydb/core/persqueue/key.h index 3cbf84c92b7e..291ec57ffe0e 100644 --- a/ydb/core/persqueue/key.h +++ b/ydb/core/persqueue/key.h @@ -53,6 +53,10 @@ class TKeyPrefix : public TBuffer virtual ~TKeyPrefix() {} + TString ToString() const { + return TString(Data(), Size()); + } + bool Marked(EMark mark) { if (Size() >= MarkedSize()) return *PtrMark() == mark; @@ -64,7 +68,7 @@ class TKeyPrefix : public TBuffer void SetType(EType type) { - if (!IsServicePartition()) { + if (!IsServicePartition() && !HasServiceType()) { *PtrType() = type; return; } @@ -92,7 +96,6 @@ class TKeyPrefix : public TBuffer } } - EType GetType() const { switch (*PtrType()) { case TypeNone: @@ -130,6 +133,7 @@ class TKeyPrefix : public TBuffer Partition.InternalPartitionId = Partition.OriginalPartitionId; } + bool HasServiceType() const; private: enum EServiceType : char { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 66a380463b81..8141f7635d53 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1888,7 +1888,7 @@ void TPartition::RunPersist() { OnProcessTxsAndUserActsWriteComplete(ActorContext()); AnswerCurrentWrites(ctx); AnswerCurrentReplies(ctx); - HaveWriteMsg = false; + HaveWriteMsg = false; } PersistRequest = nullptr; } @@ -2078,16 +2078,85 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event) void TPartition::CommitWriteOperations(TTransaction& t) { + Y_ABORT_UNLESS(PersistRequest); + Y_ABORT_UNLESS(!PartitionedBlob.IsInited()); + if (!t.WriteInfo) { return; } const auto& ctx = ActorContext(); - for (auto i = t.WriteInfo->BlobsFromHead.rbegin(); i != t.WriteInfo->BlobsFromHead.rend(); ++i) { - auto& blob = *i; + if (!HaveWriteMsg) { + BeginHandleRequests(PersistRequest.Get(), ctx); + if (!DiskIsFull) { + BeginProcessWrites(ctx); + BeginAppendHeadWithNewWrites(ctx); + } + HaveWriteMsg = true; + } + + if (!t.WriteInfo->BodyKeys.empty()) { + PartitionedBlob = TPartitionedBlob(Partition, + NewHead.Offset, + "", // SourceId + 0, // SeqNo + 1, // TotalParts + 0, // TotalSize + Head, + NewHead, + Parameters->HeadCleared, // headCleared + Head.PackedSize != 0, // needCompactHead + MaxBlobSize); + + for (auto& k : t.WriteInfo->BodyKeys) { + auto write = PartitionedBlob.Add(k.Key, k.Size); + if (write && !write->Value.empty()) { + AddCmdWrite(write, PersistRequest.Get(), ctx); + CompactedKeys.emplace_back(write->Key, write->Value.size()); + ClearOldHead(write->Key.GetOffset(), write->Key.GetPartNo(), PersistRequest.Get()); + } + } + + } + + if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) { + ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get()); + RenameFormedBlobs(formedBlobs, + *Parameters, + curWrites, + PersistRequest.Get(), + ctx); + } + + if (!t.WriteInfo->BodyKeys.empty()) { + const auto& last = t.WriteInfo->BodyKeys.back(); + + NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount()); + } - TWriteMsg msg{Max(), Nothing(), TEvPQ::TEvWrite::TMsg{ - .SourceId = blob.SourceId, + if (!t.WriteInfo->BlobsFromHead.empty()) { + auto& first = t.WriteInfo->BlobsFromHead.front(); + NewHead.PartNo = first.GetPartNo(); + + Parameters->CurOffset = NewHead.Offset; + Parameters->HeadCleared = !t.WriteInfo->BodyKeys.empty(); + + PartitionedBlob = TPartitionedBlob(Partition, + NewHead.Offset, + first.SourceId, + first.SeqNo, + first.GetTotalParts(), + first.GetTotalSize(), + Head, + NewHead, + Parameters->HeadCleared, // headCleared + false, // needCompactHead + MaxBlobSize, + first.GetPartNo()); + + for (auto& blob : t.WriteInfo->BlobsFromHead) { + TWriteMsg msg{Max(), Nothing(), TEvPQ::TEvWrite::TMsg{ + .SourceId = blob.SourceId, .SeqNo = blob.SeqNo, .PartNo = (ui16)(blob.PartData ? blob.PartData->PartNo : 0), .TotalParts = (ui16)(blob.PartData ? blob.PartData->TotalParts : 1), @@ -2103,12 +2172,17 @@ void TPartition::CommitWriteOperations(TTransaction& t) .External = false, .IgnoreQuotaDeadline = true, .HeartbeatVersion = std::nullopt, - }, std::nullopt}; - msg.Internal = true; - TMessage message(std::move(msg), ctx.Now() - TInstant::Zero()); + }, std::nullopt}; + msg.Internal = true; - UserActionAndTxPendingCommit.emplace_front(std::move(message)); + ExecRequest(msg, *Parameters, PersistRequest.Get()); + + auto& info = TxSourceIdForPostPersist[blob.SourceId]; + info.SeqNo = blob.SeqNo; + info.Offset = NewHead.Offset; + } } + WriteInfosApplied.emplace_back(std::move(t.WriteInfo)); } diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index d9ede898d6d0..f907594bbfbb 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -287,6 +287,7 @@ class TPartition : public TActorBootstrapped { ui32 NextChannel(bool isHead, ui32 blobSize); ui64 GetSizeLag(i64 offset); std::pair GetNewWriteKey(bool headCleared); + std::pair GetNewWriteKeyImpl(bool headCleared, bool needCompaction, ui32 headSize); THashMap::iterator DropOwner(THashMap::iterator& it, const TActorContext& ctx); // will return rcount and rsize also @@ -637,10 +638,16 @@ class TPartition : public TActorBootstrapped { TPartitionGraph PartitionGraph; TPartitionSourceManager SourceManager; + struct TSourceIdPostPersistInfo { + ui64 SeqNo = 0; + ui64 Offset = 0; + }; + THashSet TxAffectedSourcesIds; THashSet WriteAffectedSourcesIds; THashSet TxAffectedConsumers; THashSet SetOffsetAffectedConsumers; + THashMap TxSourceIdForPostPersist; ui32 MaxBlobSize; const ui32 TotalLevels = 4; @@ -932,6 +939,16 @@ class TPartition : public TActorBootstrapped { void DestroyActor(const TActorContext& ctx); TActorId OffloadActor; + + void AddCmdWrite(const std::optional& newWrite, + TEvKeyValue::TEvRequest* request, + const TActorContext& ctx); + void RenameFormedBlobs(const std::deque& formedBlobs, + ProcessParameters& parameters, + ui32 curWrites, + TEvKeyValue::TEvRequest* request, + const TActorContext& ctx); + ui32 RenameTmpCmdWrites(TEvKeyValue::TEvRequest* request); }; } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index e914205436d4..2f2b9b4c6f42 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -19,8 +19,8 @@ bool ValidateResponse(const TInitializerStep& step, TEvKeyValue::TEvResponse::TP TInitializer::TInitializer(TPartition* partition) : Partition(partition) - , InProgress(false) { - + , InProgress(false) +{ Steps.push_back(MakeHolder(this)); Steps.push_back(MakeHolder(this)); Steps.push_back(MakeHolder(this)); diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 2a34ce33ec7c..100ce3e6a827 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -338,6 +338,13 @@ static void AddResultDebugInfo(const TEvPQ::TEvBlobResponse* response, T* readRe readResult->SetBlobsFromDisk(diskBlobs); } +ui64 GetFirstHeaderOffset(const TKey& key, const TString& blob) +{ + TBlobIterator it(key, blob); + Y_ABORT_UNLESS(it.IsValid()); + return it.GetBatch().GetOffset(); +} + TReadAnswer TReadInfo::FormAnswer( const TActorContext& ctx, const TEvPQ::TEvBlobResponse& blobResponse, @@ -438,16 +445,19 @@ TReadAnswer TReadInfo::FormAnswer( Y_ABORT_UNLESS(offset <= Offset); Y_ABORT_UNLESS(offset < Offset || partNo <= PartNo); TKey key(TKeyPrefix::TypeData, TPartitionId(0), offset, partNo, count, internalPartsCount, false); + ui64 firstHeaderOffset = GetFirstHeaderOffset(key, blobValue); for (TBlobIterator it(key, blobValue); it.IsValid() && !needStop; it.Next()) { TBatch batch = it.GetBatch(); auto& header = batch.Header; batch.Unpack(); + ui64 trueOffset = blobs[pos].Key.GetOffset() + (header.GetOffset() - firstHeaderOffset); ui32 pos = 0; - if (header.GetOffset() > Offset || header.GetOffset() == Offset && header.GetPartNo() >= PartNo) { + if (trueOffset > Offset || trueOffset == Offset && header.GetPartNo() >= PartNo) { pos = 0; } else { - pos = batch.FindPos(Offset, PartNo); + ui64 trueSearchOffset = Offset - blobs[pos].Key.GetOffset() + firstHeaderOffset; + pos = batch.FindPos(trueSearchOffset, PartNo); } offset += header.GetCount(); @@ -768,7 +778,6 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim << " offset " << read->Offset << " count " << read->Count << " size " << read->Size << " endOffset " << EndOffset << " max time lag " << read->MaxTimeLagMs << "ms effective offset " << offset); - if (offset == EndOffset) { if (read->Timeout > 30000) { LOG_DEBUG_S( @@ -1015,7 +1024,6 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u THolder request(new TEvPQ::TEvBlobRequest(user, cookie, Partition, lastOffset, std::move(blobs))); - ctx.Send(BlobCache, request.Release()); } diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 5a328df71205..e0346e2f9010 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -485,6 +485,17 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { } HaveWriteMsg = false; + for (auto& [sourceId, info] : TxSourceIdForPostPersist) { + auto it = SourceIdStorage.GetInMemorySourceIds().find(sourceId); + if (it.IsEnd()) { + SourceIdStorage.RegisterSourceId(sourceId, info.SeqNo, info.Offset, ctx.Now()); + } else { + ui64 seqNo = std::max(info.SeqNo, it->second.SeqNo); + SourceIdStorage.RegisterSourceId(sourceId, it->second.Updated(seqNo, info.Offset, ctx.Now())); + } + } + TxSourceIdForPostPersist.clear(); + TxAffectedSourcesIds.clear(); WriteAffectedSourcesIds.clear(); TxAffectedConsumers.clear(); @@ -1001,6 +1012,77 @@ TPartition::EProcessResult TPartition::PreProcessRequest(TWriteMsg& p) { return EProcessResult::Continue; } +void TPartition::AddCmdWrite(const std::optional& newWrite, + TEvKeyValue::TEvRequest* request, + const TActorContext& ctx) +{ + auto write = request->Record.AddCmdWrite(); + write->SetKey(newWrite->Key.ToString()); + write->SetValue(newWrite->Value); + Y_ABORT_UNLESS(!newWrite->Key.IsHead()); + auto channel = GetChannel(NextChannel(newWrite->Key.IsHead(), newWrite->Value.Size())); + write->SetStorageChannel(channel); + write->SetTactic(AppData(ctx)->PQConfig.GetTactic()); + + TKey resKey = newWrite->Key; + resKey.SetType(TKeyPrefix::TypeData); + write->SetKeyToCache(resKey.ToString()); + WriteCycleSize += newWrite->Value.size(); +} + +void TPartition::RenameFormedBlobs(const std::deque& formedBlobs, + ProcessParameters& parameters, + ui32 curWrites, + TEvKeyValue::TEvRequest* request, + const TActorContext& ctx) +{ + for (ui32 i = 0; i < formedBlobs.size(); ++i) { + const auto& x = formedBlobs[i]; + if (i + curWrites < formedBlobs.size()) { //this KV pair is already writed, rename needed + auto rename = request->Record.AddCmdRename(); + rename->SetOldKey(x.OldKey.ToString()); + rename->SetNewKey(x.NewKey.ToString()); + } + if (!DataKeysBody.empty() && CompactedKeys.empty()) { + Y_ABORT_UNLESS(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= x.NewKey.GetOffset(), + "LAST KEY %s, HeadOffset %lu, NEWKEY %s", + DataKeysBody.back().Key.ToString().c_str(), + Head.Offset, + x.NewKey.ToString().c_str()); + } + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "writing blob: topic '" << TopicName() << "' partition " << Partition + << " " << x.OldKey.ToString() << " size " << x.Size << " WTime " << ctx.Now().MilliSeconds() + ); + + CompactedKeys.emplace_back(x.NewKey, x.Size); + } + + if (!formedBlobs.empty()) { + parameters.HeadCleared = true; + + NewHead.Clear(); + NewHead.Offset = PartitionedBlob.GetOffset(); + NewHead.PartNo = PartitionedBlob.GetHeadPartNo(); + NewHead.PackedSize = 0; + } +} + +ui32 TPartition::RenameTmpCmdWrites(TEvKeyValue::TEvRequest* request) +{ + ui32 curWrites = 0; + for (ui32 i = 0; i < request->Record.CmdWriteSize(); ++i) { //change keys for yet to be writed KV pairs + TKey key(request->Record.GetCmdWrite(i).GetKey()); + if (key.GetType() == TKeyPrefix::TypeTmpData) { + key.SetType(TKeyPrefix::TypeData); + request->Record.MutableCmdWrite(i)->SetKey(TString(key.Data(), key.Size())); + ++curWrites; + } + } + return curWrites; +} + bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request) { if (!CanWrite()) { ScheduleReplyError(p.Cookie, InactivePartitionErrorCode, @@ -1200,73 +1282,29 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey //will return compacted tmp blob auto newWrite = PartitionedBlob.Add(std::move(blob)); - if (newWrite && !newWrite->second.empty()) { - auto write = request->Record.AddCmdWrite(); - write->SetKey(newWrite->first.Data(), newWrite->first.Size()); - write->SetValue(newWrite->second); - Y_ABORT_UNLESS(!newWrite->first.IsHead()); - auto channel = GetChannel(NextChannel(newWrite->first.IsHead(), newWrite->second.Size())); - write->SetStorageChannel(channel); - write->SetTactic(AppData(ctx)->PQConfig.GetTactic()); - - TKey resKey = newWrite->first; - resKey.SetType(TKeyPrefix::TypeData); - write->SetKeyToCache(resKey.Data(), resKey.Size()); - WriteCycleSize += newWrite->second.size(); + if (newWrite && !newWrite->Value.empty()) { + AddCmdWrite(newWrite, request, ctx); LOG_DEBUG_S( - ctx, NKikimrServices::PERSQUEUE, - "Topic '" << TopicName() << + ctx, NKikimrServices::PERSQUEUE, + "Topic '" << TopicName() << "' partition " << Partition << " part blob sourceId '" << EscapeC(p.Msg.SourceId) << "' seqNo " << p.Msg.SeqNo << " partNo " << p.Msg.PartNo << - " result is " << TStringBuf(newWrite->first.Data(), newWrite->first.Size()) << - " size " << newWrite->second.size() - ); + " result is " << newWrite->Key.ToString() << + " size " << newWrite->Value.size() + ); } if (lastBlobPart) { Y_ABORT_UNLESS(PartitionedBlob.IsComplete()); - ui32 curWrites = 0; - for (ui32 i = 0; i < request->Record.CmdWriteSize(); ++i) { //change keys for yet to be writed KV pairs - TKey key(request->Record.GetCmdWrite(i).GetKey()); - if (key.GetType() == TKeyPrefix::TypeTmpData) { - key.SetType(TKeyPrefix::TypeData); - request->Record.MutableCmdWrite(i)->SetKey(TString(key.Data(), key.Size())); - ++curWrites; - } - } + ui32 curWrites = RenameTmpCmdWrites(request); Y_ABORT_UNLESS(curWrites <= PartitionedBlob.GetFormedBlobs().size()); - auto formedBlobs = PartitionedBlob.GetFormedBlobs(); - for (ui32 i = 0; i < formedBlobs.size(); ++i) { - const auto& x = formedBlobs[i]; - if (i + curWrites < formedBlobs.size()) { //this KV pair is already writed, rename needed - auto rename = request->Record.AddCmdRename(); - TKey key = x.first; - rename->SetOldKey(TString(key.Data(), key.Size())); - key.SetType(TKeyPrefix::TypeData); - rename->SetNewKey(TString(key.Data(), key.Size())); - } - if (!DataKeysBody.empty() && CompactedKeys.empty()) { - Y_ABORT_UNLESS(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= x.first.GetOffset(), - "LAST KEY %s, HeadOffset %lu, NEWKEY %s", DataKeysBody.back().Key.ToString().c_str(), Head.Offset, x.first.ToString().c_str()); - } - LOG_DEBUG_S( - ctx, NKikimrServices::PERSQUEUE, - "writing blob: topic '" << TopicName() << "' partition " << Partition - << " " << x.first.ToString() << " size " << x.second << " WTime " << ctx.Now().MilliSeconds() - ); - - CompactedKeys.push_back(x); - CompactedKeys.back().first.SetType(TKeyPrefix::TypeData); - } - if (PartitionedBlob.HasFormedBlobs()) { //Head and newHead are cleared - parameters.HeadCleared = true; - NewHead.Clear(); - NewHead.Offset = PartitionedBlob.GetOffset(); - NewHead.PartNo = PartitionedBlob.GetHeadPartNo(); - NewHead.PackedSize = 0; - } + RenameFormedBlobs(PartitionedBlob.GetFormedBlobs(), + parameters, + curWrites, + request, + ctx); ui32 countOfLastParts = 0; for (auto& x : PartitionedBlob.GetClientBlobs()) { if (NewHead.Batches.empty() || NewHead.Batches.back().Packed) { @@ -1308,19 +1346,8 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey return true; } -std::pair TPartition::GetNewWriteKey(bool headCleared) { - bool needCompaction = false; - ui32 HeadSize = headCleared ? 0 : Head.PackedSize; - if (HeadSize + NewHead.PackedSize > 0 && HeadSize + NewHead.PackedSize - >= Min(MaxBlobSize, Config.GetPartitionConfig().GetLowWatermark())) - needCompaction = true; - - if (PartitionedBlob.IsInited()) { //has active partitioned blob - compaction is forbiden, head and newHead will be compacted when this partitioned blob is finished - needCompaction = false; - } - - Y_ABORT_UNLESS(NewHead.PackedSize > 0 || needCompaction); //smthing must be here - +std::pair TPartition::GetNewWriteKeyImpl(bool headCleared, bool needCompaction, ui32 HeadSize) +{ TKey key(TKeyPrefix::TypeData, Partition, NewHead.Offset, NewHead.PartNo, NewHead.GetCount(), NewHead.GetInternalPartsCount(), !needCompaction); if (NewHead.PackedSize > 0) @@ -1347,6 +1374,22 @@ std::pair TPartition::GetNewWriteKey(bool headCleared) { return res; } +std::pair TPartition::GetNewWriteKey(bool headCleared) { + bool needCompaction = false; + ui32 HeadSize = headCleared ? 0 : Head.PackedSize; + if (HeadSize + NewHead.PackedSize > 0 && HeadSize + NewHead.PackedSize + >= Min(MaxBlobSize, Config.GetPartitionConfig().GetLowWatermark())) + needCompaction = true; + + if (PartitionedBlob.IsInited()) { //has active partitioned blob - compaction is forbiden, head and newHead will be compacted when this partitioned blob is finished + needCompaction = false; + } + + Y_ABORT_UNLESS(NewHead.PackedSize > 0 || needCompaction); //smthing must be here + + return GetNewWriteKeyImpl(headCleared, needCompaction, HeadSize); +} + void TPartition::AddNewWriteBlob(std::pair& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx) { PQ_LOG_T("TPartition::AddNewWriteBlob."); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 8982991610fa..5e89d767f0e9 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -4366,6 +4366,7 @@ void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadR TxQueue.clear(); std::deque> plannedTxs; + const auto& ctx = ActorContext(); for (size_t i = 0; i < readRange.PairSize(); ++i) { auto& pair = readRange.GetPair(i); @@ -4373,6 +4374,9 @@ void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadR NKikimrPQ::TTransaction tx; Y_ABORT_UNLESS(tx.ParseFromString(pair.GetValue())); + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " " << + "Tx: " << tx.DebugString()); + Txs.emplace(tx.GetTxId(), tx); if (tx.HasStep()) { @@ -4387,6 +4391,11 @@ void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadR TxQueue.push(item); } + if (!TxQueue.empty()) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " " << + "top tx queue (" << TxQueue.front().first << ", " << TxQueue.front().second << ")"); + } + Y_UNUSED(partitionTxs); } diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index a9c7c6947d0b..52ba8e874706 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -1,5 +1,6 @@ #include "transaction.h" #include "utils.h" +#include "partition_log.h" namespace NKikimr::NPQ { @@ -323,6 +324,8 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque Y_ABORT_UNLESS(SourceActor != TActorId()); ActorIdToProto(SourceActor, tx.MutableSourceActor()); + PQ_LOG_D("save tx " << tx.DebugString()); + TString value; Y_ABORT_UNLESS(tx.SerializeToString(&value)); diff --git a/ydb/core/persqueue/ut/internals_ut.cpp b/ydb/core/persqueue/ut/internals_ut.cpp index b5a7e8be9016..f24e5e4f604a 100644 --- a/ydb/core/persqueue/ut/internals_ut.cpp +++ b/ydb/core/persqueue/ut/internals_ut.cpp @@ -75,7 +75,7 @@ void Test(bool headCompacted, ui32 parts, ui32 partSize, ui32 leftInHead) ui32 maxBlobSize = 8 << 20; TPartitionedBlob blob(TPartitionId(0), newHead.GetNextOffset(), "sourceId3", 1, parts, parts * value2.size(), head, newHead, headCompacted, false, maxBlobSize); - TVector> formed; + TVector formed; TString error; for (ui32 i = 0; i < parts; ++i) { @@ -88,23 +88,23 @@ void Test(bool headCompacted, ui32 parts, ui32 partSize, ui32 leftInHead) ); all.push_back(clientBlob); auto res = blob.Add(std::move(clientBlob)); - if (res && !res->second.empty()) - formed.push_back(*res); + if (res && !res->Value.empty()) + formed.emplace_back(*res); } UNIT_ASSERT(blob.IsComplete()); UNIT_ASSERT(formed.size() == blob.GetFormedBlobs().size()); for (ui32 i = 0; i < formed.size(); ++i) { - UNIT_ASSERT(formed[i].first == blob.GetFormedBlobs()[i].first); - UNIT_ASSERT(formed[i].second.size() == blob.GetFormedBlobs()[i].second); - UNIT_ASSERT(formed[i].second.size() <= 8_MB); - UNIT_ASSERT(formed[i].second.size() > 6_MB); + UNIT_ASSERT(formed[i].Key == blob.GetFormedBlobs()[i].OldKey); + UNIT_ASSERT(formed[i].Value.size() == blob.GetFormedBlobs()[i].Size); + UNIT_ASSERT(formed[i].Value.size() <= 8_MB); + UNIT_ASSERT(formed[i].Value.size() > 6_MB); } TVector real; ui32 nextOffset = headCompacted ? newHead.Offset : head.Offset; for (auto& p : formed) { - const char* data = p.second.c_str(); - const char* end = data + p.second.size(); - ui64 offset = p.first.GetOffset(); + const char* data = p.Value.c_str(); + const char* end = data + p.Value.size(); + ui64 offset = p.Key.GetOffset(); UNIT_ASSERT(offset == nextOffset); while(data < end) { auto header = ExtractHeader(data, end - data); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index debc6229a212..0c2cb14892b6 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include @@ -108,6 +109,7 @@ class TFixture : public NUnitTest::TBaseFixture { void TestTheCompletionOfATransaction(const TTransactionCompletionTestDescription& d); void RestartLongTxService(); void RestartPQTablet(const TString& topicPath, ui32 partition); + void DumpPQTabletKeys(const TString& topicName, ui32 partition); void DeleteSupportivePartition(const TString& topicName, ui32 partition); @@ -129,9 +131,25 @@ class TFixture : public NUnitTest::TBaseFixture { NTable::TTransaction* tx); size_t GetTableRecordsCount(const TString& tablePath); + enum ERestartPQTabletMode { + ERestartNo, + ERestartBeforeCommit, + ERestartAfterCommit, + }; + + struct TTestTxWithBigBlobsParams { + size_t OldHeadCount = 0; + size_t BigBlobsCount = 2; + size_t NewHeadCount = 0; + ERestartPQTabletMode RestartMode = ERestartNo; + }; + + void TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params); + const TDriver& GetDriver() const; void CheckTabletKeys(const TString& topicName); + void DumpPQTabletKeys(const TString& topicName); private: template @@ -142,8 +160,8 @@ class TFixture : public NUnitTest::TBaseFixture { ui64 GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition); - THashSet GetTabletKeys(const TActorId& actorId, - ui64 tabletId); + TVector GetTabletKeys(const TActorId& actorId, + ui64 tabletId); ui64 GetTransactionWriteId(const TActorId& actorId, ui64 tabletId); void SendLongTxLockStatus(const TActorId& actorId, @@ -498,6 +516,7 @@ auto TFixture::CreateTopicWriteSession(const TString& topicPath, options.ProducerId(messageGroupId); options.MessageGroupId(messageGroupId); options.PartitionId(partitionId); + options.Codec(ECodec::RAW); return client.CreateWriteSession(options); } @@ -655,6 +674,7 @@ TVector TFixture::ReadFromTopic(const TString& topicPath, for (auto& event : session->GetEvents(settings)) { if (auto* e = std::get_if(&event)) { + Cerr << e->HasCompressedMessages() << " " << e->GetMessagesCount() << Endl; for (auto& m : e->GetMessages()) { messages.push_back(m.GetData()); } @@ -778,7 +798,7 @@ ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPat return Max(); } -THashSet TFixture::GetTabletKeys(const TActorId& actorId, ui64 tabletId) +TVector TFixture::GetTabletKeys(const TActorId& actorId, ui64 tabletId) { using TEvKeyValue = NKikimr::TEvKeyValue; @@ -803,12 +823,12 @@ THashSet TFixture::GetTabletKeys(const TActorId& actorId, ui64 tabletId UNIT_ASSERT_VALUES_EQUAL(response->Record.GetCookie(), 12345); UNIT_ASSERT_VALUES_EQUAL(response->Record.ReadRangeResultSize(), 1); - THashSet keys; + TVector keys; auto& result = response->Record.GetReadRangeResult(0); for (size_t i = 0; i < result.PairSize(); ++i) { auto& kv = result.GetPair(i); - keys.insert(kv.GetKey()); + keys.emplace_back(kv.GetKey()); } return keys; @@ -1309,6 +1329,16 @@ void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const TActorId& actorId, } } +void TFixture::RestartPQTablet(const TString& topicName, ui32 partition) +{ + auto& runtime = Setup->GetRuntime(); + TActorId edge = runtime.AllocateEdgeActor(); + ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicName, partition); + runtime.SendToPipe(tabletId, edge, new TEvents::TEvPoison()); + + Sleep(TDuration::Seconds(2)); +} + void TFixture::DeleteSupportivePartition(const TString& topicName, ui32 partition) { auto& runtime = Setup->GetRuntime(); @@ -1336,7 +1366,7 @@ void TFixture::CheckTabletKeys(const TString& topicName) }; bool found; - THashSet keys; + TVector keys; for (size_t i = 0; i < 20; ++i) { keys = GetTabletKeys(edge, tabletId); @@ -1371,6 +1401,17 @@ void TFixture::CheckTabletKeys(const TString& topicName) } } +void TFixture::DumpPQTabletKeys(const TString& topicName) +{ + auto& runtime = Setup->GetRuntime(); + TActorId edge = runtime.AllocateEdgeActor(); + ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicName, 0); + auto keys = GetTabletKeys(edge, tabletId); + for (const auto& key : keys) { + Cerr << key << Endl; + } +} + void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestDescription& d) { for (auto& topic : d.Topics) { @@ -1405,16 +1446,6 @@ void TFixture::TestTheCompletionOfATransaction(const TTransactionCompletionTestD } } -void TFixture::RestartPQTablet(const TString& topicName, ui32 partition) -{ - auto& runtime = Setup->GetRuntime(); - TActorId edge = runtime.AllocateEdgeActor(); - ui64 tabletId = GetTopicTabletId(edge, "/Root/" + topicName, partition); - runtime.SendToPipe(tabletId, edge, new TEvents::TEvPoison()); - - Sleep(TDuration::Seconds(2)); -} - Y_UNIT_TEST_F(WriteToTopic_Demo_11, TFixture) { for (auto endOfTransaction : {Commit, Rollback, CloseTableSession}) { @@ -1514,6 +1545,125 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_16, TFixture) UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #2"); } +Y_UNIT_TEST_F(WriteToTopic_Demo_17, TFixture) +{ + CreateTopic("topic_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(22'000'000, 'x')); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100, 'x')); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(200, 'x')); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(300, 'x')); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(10'000'000, 'x')); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString( 6'000'000, 'x'), &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(20'000'000, 'x'), &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString( 7'000'000, 'x'), &tx); + + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + CommitTx(tx, EStatus::SUCCESS); + + //RestartPQTablet("topic_A", 0); + + auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 8); + UNIT_ASSERT_VALUES_EQUAL(messages[0].size(), 22'000'000); + UNIT_ASSERT_VALUES_EQUAL(messages[1].size(), 100); + UNIT_ASSERT_VALUES_EQUAL(messages[2].size(), 200); + UNIT_ASSERT_VALUES_EQUAL(messages[3].size(), 300); + UNIT_ASSERT_VALUES_EQUAL(messages[4].size(), 10'000'000); + UNIT_ASSERT_VALUES_EQUAL(messages[5].size(), 6'000'000); + UNIT_ASSERT_VALUES_EQUAL(messages[6].size(), 20'000'000); + UNIT_ASSERT_VALUES_EQUAL(messages[7].size(), 7'000'000); +} + +void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params) +{ + size_t oldHeadMsgCount = 0; + size_t bigBlobMsgCount = 0; + size_t newHeadMsgCount = 0; + + CreateTopic("topic_A"); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + for (size_t i = 0; i < params.OldHeadCount; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x')); + ++oldHeadMsgCount; + } + + for (size_t i = 0; i < params.BigBlobsCount; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'900'000, 'x'), &tx); + ++bigBlobMsgCount; + } + + for (size_t i = 0; i < params.NewHeadCount; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'), &tx); + ++newHeadMsgCount; + } + + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + if (params.RestartMode == ERestartBeforeCommit) { + RestartPQTablet("topic_A", 0); + } + + CommitTx(tx, EStatus::SUCCESS); + + if (params.RestartMode == ERestartAfterCommit) { + RestartPQTablet("topic_A", 0); + } + + TVector messages; + for (size_t i = 0; (i < 10) && (messages.size() < (oldHeadMsgCount + bigBlobMsgCount + newHeadMsgCount)); ++i) { + auto block = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2)); + for (auto& m : block) { + messages.push_back(std::move(m)); + } + } + + UNIT_ASSERT_VALUES_EQUAL(messages.size(), oldHeadMsgCount + bigBlobMsgCount + newHeadMsgCount); + + size_t start = 0; + + for (size_t i = 0; i < oldHeadMsgCount; ++i) { + UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 100'000); + } + start += oldHeadMsgCount; + + for (size_t i = 0; i < bigBlobMsgCount; ++i) { + UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'900'000); + } + start += bigBlobMsgCount; + + for (size_t i = 0; i < newHeadMsgCount; ++i) { + UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 100'000); + } +} + +#define Y_UNIT_TEST_WITH_REBOOTS(name, oldHeadCount, bigBlobsCount, newHeadCount) \ +Y_UNIT_TEST_F(name##_RestartNo, TFixture) { \ + TestTxWithBigBlobs({.OldHeadCount = oldHeadCount, .BigBlobsCount = bigBlobsCount, .NewHeadCount = newHeadCount, .RestartMode = ERestartNo}); \ +} \ +Y_UNIT_TEST_F(name##_RestartBeforeCommit, TFixture) { \ + TestTxWithBigBlobs({.OldHeadCount = oldHeadCount, .BigBlobsCount = bigBlobsCount, .NewHeadCount = newHeadCount, .RestartMode = ERestartBeforeCommit}); \ +} \ +Y_UNIT_TEST_F(name##_RestartAfterCommit, TFixture) { \ + TestTxWithBigBlobs({.OldHeadCount = oldHeadCount, .BigBlobsCount = bigBlobsCount, .NewHeadCount = newHeadCount, .RestartMode = ERestartAfterCommit}); \ +} + +Y_UNIT_TEST_WITH_REBOOTS(WriteToTopic_Demo_18, 10, 2, 10); +Y_UNIT_TEST_WITH_REBOOTS(WriteToTopic_Demo_19, 10, 0, 10); +Y_UNIT_TEST_WITH_REBOOTS(WriteToTopic_Demo_20, 10, 2, 0); + +Y_UNIT_TEST_WITH_REBOOTS(WriteToTopic_Demo_21, 0, 2, 10); +Y_UNIT_TEST_WITH_REBOOTS(WriteToTopic_Demo_22, 0, 0, 10); +Y_UNIT_TEST_WITH_REBOOTS(WriteToTopic_Demo_23, 0, 2, 0); + void TFixture::CreateTable(const TString& tablePath) { UNIT_ASSERT(!tablePath.empty());