Skip to content

Commit

Permalink
recording a large amount of data in a transaction (ydb-platform#5902)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Jul 1, 2024
1 parent c7123e3 commit a874417
Show file tree
Hide file tree
Showing 14 changed files with 550 additions and 159 deletions.
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/local_rpc/local_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl<TRpc, TCbWrapper, IsOperation> {
, RequestType(requestType)
, InternalCall(internalCall)
{
if (token) {
if (token && !token->empty()) {
InternalToken = new NACLib::TUserToken(*token);
}
}
Expand Down
123 changes: 83 additions & 40 deletions ydb/core/persqueue/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ TBlobIterator::TBlobIterator(const TKey& key, const TString& blob)
void TBlobIterator::ParseBatch() {
Y_ABORT_UNLESS(Data < End);
Header = ExtractHeader(Data, End - Data);
Y_ABORT_UNLESS(Header.GetOffset() == Offset);
//Y_ABORT_UNLESS(Header.GetOffset() == Offset);
Count += Header.GetCount();
Offset += Header.GetCount();
InternalPartsCount += Header.GetInternalPartsCount();
Expand Down Expand Up @@ -686,6 +686,13 @@ ui32 THead::FindPos(const ui64 offset, const ui16 partNo) const {
return i - 1;
}

TPartitionedBlob::TRenameFormedBlobInfo::TRenameFormedBlobInfo(const TKey& oldKey, const TKey& newKey, ui32 size) :
OldKey(oldKey),
NewKey(newKey),
Size(size)
{
}

TPartitionedBlob& TPartitionedBlob::operator=(const TPartitionedBlob& x)
{
Partition = x.Partition;
Expand Down Expand Up @@ -737,7 +744,8 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionedBlob& x)
{}

TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts,
const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize)
const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize,
const ui16 nextPartNo)
: Partition(partition)
, Offset(offset)
, InternalPartsCount(0)
Expand All @@ -747,7 +755,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 off
, SeqNo(seqNo)
, TotalParts(totalParts)
, TotalSize(totalSize)
, NextPartNo(0)
, NextPartNo(nextPartNo)
, HeadPartNo(0)
, BlobsSize(0)
, Head(head)
Expand All @@ -773,7 +781,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 off
if (HeadSize == 0) {
StartOffset = offset;
NewHead.Offset = offset;
Y_ABORT_UNLESS(StartPartNo == 0);
//Y_ABORT_UNLESS(StartPartNo == 0);
}
}

Expand Down Expand Up @@ -804,58 +812,93 @@ TString TPartitionedBlob::CompactHead(bool glueHead, THead& head, bool glueNewHe
return valueD;
}

std::optional<std::pair<TKey, TString>> TPartitionedBlob::Add(TClientBlob&& blob)
auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optional<TFormedBlobInfo>
{
HeadPartNo = NextPartNo;
ui32 count = (GlueHead ? Head.GetCount() : 0) + (GlueNewHead ? NewHead.GetCount() : 0);

Y_ABORT_UNLESS(Offset >= (GlueHead ? Head.Offset : NewHead.Offset));

Y_ABORT_UNLESS(NewHead.GetNextOffset() >= (GlueHead ? Head.Offset : NewHead.Offset));

TKey tmpKey(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false);
TKey dataKey(TKeyPrefix::TypeData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false);

StartOffset = Offset;
StartPartNo = NextPartNo;
InternalPartsCount = 0;

TString valueD = CompactHead(GlueHead, Head, GlueNewHead, NewHead, HeadSize + BlobsSize + (BlobsSize > 0 ? GetMaxHeaderSize() : 0));

GlueHead = GlueNewHead = false;
if (!Blobs.empty()) {
TBatch batch{Offset, Blobs.front().GetPartNo(), std::move(Blobs)};
Blobs.clear();
batch.Pack();
Y_ABORT_UNLESS(batch.Packed);
batch.SerializeTo(valueD);
}

Y_ABORT_UNLESS(valueD.size() <= MaxBlobSize && (valueD.size() + size + 1_MB > MaxBlobSize || HeadSize + BlobsSize + size + GetMaxHeaderSize() <= MaxBlobSize));
HeadSize = 0;
BlobsSize = 0;
TClientBlob::CheckBlob(tmpKey, valueD);
if (useRename) {
FormedBlobs.emplace_back(tmpKey, dataKey, valueD.size());
}
Blobs.clear();

return {{useRename ? tmpKey : dataKey, valueD}};
}

auto TPartitionedBlob::Add(TClientBlob&& blob) -> std::optional<TFormedBlobInfo>
{
Y_ABORT_UNLESS(NewHead.Offset >= Head.Offset);
ui32 size = blob.GetBlobSize();
Y_ABORT_UNLESS(InternalPartsCount < 1000); //just check for future packing
if (HeadSize + BlobsSize + size + GetMaxHeaderSize() > MaxBlobSize)
if (HeadSize + BlobsSize + size + GetMaxHeaderSize() > MaxBlobSize) {
NeedCompactHead = true;
}
if (HeadSize + BlobsSize == 0) { //if nothing to compact at all
NeedCompactHead = false;
}

std::optional<std::pair<TKey, TString>> res;
std::optional<TFormedBlobInfo> res;
if (NeedCompactHead) { // need form blob without last chunk, on start or in case of big head
NeedCompactHead = false;
HeadPartNo = NextPartNo;
ui32 count = (GlueHead ? Head.GetCount() : 0) + (GlueNewHead ? NewHead.GetCount() : 0);

Y_ABORT_UNLESS(Offset >= (GlueHead ? Head.Offset : NewHead.Offset));

Y_ABORT_UNLESS(NewHead.GetNextOffset() >= (GlueHead ? Head.Offset : NewHead.Offset));

TKey key(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false);

StartOffset = Offset;
StartPartNo = NextPartNo;
InternalPartsCount = 0;

TString valueD = CompactHead(GlueHead, Head, GlueNewHead, NewHead, HeadSize + BlobsSize + (BlobsSize > 0 ? GetMaxHeaderSize() : 0));

GlueHead = GlueNewHead = false;
if (!Blobs.empty()) {
TBatch batch{Offset, Blobs.front().GetPartNo(), std::move(Blobs)};
Blobs.clear();
batch.Pack();
Y_ABORT_UNLESS(batch.Packed);
batch.SerializeTo(valueD);
}

Y_ABORT_UNLESS(valueD.size() <= MaxBlobSize && (valueD.size() + size + 1_MB > MaxBlobSize || HeadSize + BlobsSize + size + GetMaxHeaderSize() <= MaxBlobSize));
HeadSize = 0;
BlobsSize = 0;
TClientBlob::CheckBlob(key, valueD);
FormedBlobs.emplace_back(key, valueD.size());
Blobs.clear();

res = {key, valueD};
res = CreateFormedBlob(size, true);
}
BlobsSize += size + GetMaxHeaderSize();
++NextPartNo;
Blobs.push_back(blob);
if (!IsComplete())
if (!IsComplete()) {
++InternalPartsCount;
}
return res;
}

auto TPartitionedBlob::Add(const TKey& oldKey, ui32 size) -> std::optional<TFormedBlobInfo>
{
std::optional<TFormedBlobInfo> res;
if (NeedCompactHead) {
NeedCompactHead = false;
GlueNewHead = false;
res = CreateFormedBlob(0, false);
}

TKey newKey(TKeyPrefix::TypeData,
Partition,
NewHead.Offset + oldKey.GetOffset(),
oldKey.GetPartNo(),
oldKey.GetCount(),
oldKey.GetInternalPartsCount(),
oldKey.IsHead());

FormedBlobs.emplace_back(oldKey, newKey, size);

StartOffset += oldKey.GetCount();
//NewHead.Offset += oldKey.GetOffset() + oldKey.GetCount();

return res;
}

Expand Down
36 changes: 31 additions & 5 deletions ydb/core/persqueue/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ struct TClientBlob {
return PartData ? PartData->PartNo : 0;
}

ui16 GetTotalParts() const {
return PartData ? PartData->TotalParts : 1;
}

ui16 GetTotalSize() const {
return PartData ? PartData->TotalSize : UncompressedSize;
}

bool IsLastPart() const {
return !PartData || PartData->PartNo + 1 == PartData->TotalParts;
}
Expand Down Expand Up @@ -184,7 +192,8 @@ struct TBatch {
: Packed(true)
, Header(header)
, PackedData(data, header.GetPayloadSize())
{}
{
}

ui32 GetPackedSize() const { Y_ABORT_UNLESS(Packed); return sizeof(ui16) + PackedData.size() + Header.ByteSize(); }
void Pack();
Expand Down Expand Up @@ -265,9 +274,16 @@ class TPartitionedBlob {
TPartitionedBlob(const TPartitionedBlob& x);

TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo,
const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize);
const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize,
ui16 nextPartNo = 0);

struct TFormedBlobInfo {
TKey Key;
TString Value;
};

std::optional<std::pair<TKey, TString>> Add(TClientBlob&& blob);
std::optional<TFormedBlobInfo> Add(TClientBlob&& blob);
std::optional<TFormedBlobInfo> Add(const TKey& key, ui32 size);

bool IsInited() const { return !SourceId.empty(); }

Expand All @@ -280,11 +296,21 @@ class TPartitionedBlob {

bool IsNextPart(const TString& sourceId, const ui64 seqNo, const ui16 partNo, TString *reason) const;

struct TRenameFormedBlobInfo {
TRenameFormedBlobInfo() = default;
TRenameFormedBlobInfo(const TKey& oldKey, const TKey& newKey, ui32 size);

TKey OldKey;
TKey NewKey;
ui32 Size;
};

const std::deque<TClientBlob>& GetClientBlobs() const { return Blobs; }
const std::deque<std::pair<TKey, ui32>> GetFormedBlobs() const { return FormedBlobs; }
const std::deque<TRenameFormedBlobInfo>& GetFormedBlobs() const { return FormedBlobs; }

private:
TString CompactHead(bool glueHead, THead& head, bool glueNewHead, THead& newHead, ui32 estimatedSize);
std::optional<TFormedBlobInfo> CreateFormedBlob(ui32 size, bool useRename);

private:
TPartitionId Partition;
Expand All @@ -300,7 +326,7 @@ class TPartitionedBlob {
ui16 HeadPartNo;
std::deque<TClientBlob> Blobs;
ui32 BlobsSize;
std::deque<std::pair<TKey, ui32>> FormedBlobs;
std::deque<TRenameFormedBlobInfo> FormedBlobs;
THead &Head;
THead &NewHead;
ui32 HeadSize;
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/persqueue/key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,18 @@ TKey MakeKeyFromString(const TString& s, const TPartitionId& partition)
t.IsHead());
}

bool TKeyPrefix::HasServiceType() const
{
switch (*PtrType()) {
case ServiceTypeInfo:
case ServiceTypeData:
case ServiceTypeTmpData:
case ServiceTypeMeta:
case ServiceTypeTxMeta:
return true;
default:
return false;
}
}

}
8 changes: 6 additions & 2 deletions ydb/core/persqueue/key.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class TKeyPrefix : public TBuffer
virtual ~TKeyPrefix()
{}

TString ToString() const {
return TString(Data(), Size());
}

bool Marked(EMark mark) {
if (Size() >= MarkedSize())
return *PtrMark() == mark;
Expand All @@ -64,7 +68,7 @@ class TKeyPrefix : public TBuffer


void SetType(EType type) {
if (!IsServicePartition()) {
if (!IsServicePartition() && !HasServiceType()) {
*PtrType() = type;
return;
}
Expand Down Expand Up @@ -92,7 +96,6 @@ class TKeyPrefix : public TBuffer
}
}


EType GetType() const {
switch (*PtrType()) {
case TypeNone:
Expand Down Expand Up @@ -130,6 +133,7 @@ class TKeyPrefix : public TBuffer
Partition.InternalPartitionId = Partition.OriginalPartitionId;
}

bool HasServiceType() const;

private:
enum EServiceType : char {
Expand Down
Loading

0 comments on commit a874417

Please sign in to comment.