Skip to content

Commit

Permalink
Fixed the use of the parameter ForcedCompactionRangeCountPerRun (#3085)
Browse files Browse the repository at this point in the history
  • Loading branch information
agalibin authored Feb 27, 2025
1 parent 3c9a25e commit 6a5bab7
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 87 deletions.
34 changes: 0 additions & 34 deletions cloud/blockstore/libs/storage/core/compaction_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,40 +537,6 @@ TVector<ui32> TCompactionMap::GetNonEmptyRanges() const
return result;
}

TVector<TCompactionCounter> TCompactionMap::GetNonEmptyRanges(
ui32 blockIndex,
ui32 rangesCount) const
{
if (!rangesCount) {
return {};
}

const ui32 groupStart = GetGroupStart(blockIndex, Impl->RangeSize);
auto rangeIndex = (blockIndex - groupStart) / Impl->RangeSize;

TVector<TCompactionCounter> result(Reserve(rangesCount));

for (auto groupIt = TImpl::TGroupByBlockIndexTree::TIterator(
Impl->GroupByBlockIndex.Find(groupStart));
groupIt != Impl->GroupByBlockIndex.End();
++groupIt, rangeIndex = 0)
{
const auto& group = static_cast<TImpl::TGroupNode&>(*groupIt);
for (; rangeIndex < group.Stats.size(); ++rangeIndex) {
if (group.Stats[rangeIndex].BlobCount > 0) {
const auto groupRangeStart =
group.BlockIndex + (rangeIndex * Impl->RangeSize);
result.emplace_back(groupRangeStart, group.Stats[rangeIndex]);
if (result.size() == rangesCount) {
return result;
}
}
}
}

return result;
}

ui32 TCompactionMap::GetNonEmptyRangeCount() const
{
return Impl->GetNonEmptyRangeCount();
Expand Down
2 changes: 0 additions & 2 deletions cloud/blockstore/libs/storage/core/compaction_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ class TCompactionMap
TVector<TCompactionCounter> GetTop(size_t count) const;
TVector<TCompactionCounter> GetTopByGarbageBlockCount(size_t count) const;
TVector<ui32> GetNonEmptyRanges() const;
// Returns non-empty ranges, starting from the blockIndex
TVector<TCompactionCounter> GetNonEmptyRanges(ui32 blockIndex, ui32 rangesCount) const;
ui32 GetNonEmptyRangeCount() const;
ui32 GetRangeStart(ui32 blockIndex) const;
ui32 GetRangeIndex(ui32 blockIndex) const;
Expand Down
26 changes: 0 additions & 26 deletions cloud/blockstore/libs/storage/core/compaction_map_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,42 +256,16 @@ Y_UNIT_TEST_SUITE(TCompactionMapTest)

{
const auto nonEmptyCount = map.GetNonEmptyRanges().size();
const auto nonEmptyRanges = map.GetNonEmptyRanges(GetGroupIndex(1), 10);
UNIT_ASSERT_VALUES_EQUAL(nonEmptyCount, map.GetNonEmptyRangeCount());
UNIT_ASSERT_VALUES_EQUAL(nonEmptyCount, 100);
UNIT_ASSERT_VALUES_EQUAL(nonEmptyRanges.size(), 10);
UNIT_ASSERT_VALUES_EQUAL(nonEmptyRanges[0].BlockIndex, GetGroupIndex(1));
}

// empty range must be skipped
map.Update(GetGroupIndex(46), 0, blockCount, usedBlockCount, false);
{
const auto nonEmptyCount = map.GetNonEmptyRanges().size();
const auto nonEmptyRanges = map.GetNonEmptyRanges(GetGroupIndex(45), 10);
UNIT_ASSERT_VALUES_EQUAL(nonEmptyCount, map.GetNonEmptyRangeCount());
UNIT_ASSERT_VALUES_EQUAL(nonEmptyCount, 99);
UNIT_ASSERT_VALUES_EQUAL(nonEmptyRanges.size(), 10);
UNIT_ASSERT_VALUES_EQUAL(nonEmptyRanges[1].BlockIndex, GetGroupIndex(47));
}

// if first range is empty need to start from first non empty after it
map.Update(GetGroupIndex(45), 0, blockCount, usedBlockCount, false);
{
const auto nonEmptyRanges = map.GetNonEmptyRanges(GetGroupIndex(45), 10);
UNIT_ASSERT_VALUES_EQUAL(nonEmptyRanges.size(), 10);
UNIT_ASSERT_VALUES_EQUAL(nonEmptyRanges[0].BlockIndex, GetGroupIndex(47));
}

{
const auto nonEmptyRanges = map.GetNonEmptyRanges(GetGroupIndex(95), 10);
UNIT_ASSERT_VALUES_EQUAL(nonEmptyRanges.size(), 6);
}

map.Update(GetGroupIndex(96), 0, blockCount, usedBlockCount, false);
map.Update(GetGroupIndex(99), 0, blockCount, usedBlockCount, false);
{
const auto nonEmptyRanges = map.GetNonEmptyRanges(GetGroupIndex(95), 10);
UNIT_ASSERT_VALUES_EQUAL(nonEmptyRanges.size(), 4);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1314,15 +1314,12 @@ void TPartitionActor::HandleCompaction(

const auto& cm = State->GetCompactionMap();

if (msg->BlockIndex.Defined()) {
if (batchCompactionEnabled) {
tops = cm.GetNonEmptyRanges(
*msg->BlockIndex, Config->GetForcedCompactionRangeCountPerRun());
} else {
const auto startIndex = cm.GetRangeStart(*msg->BlockIndex);
if (!msg->RangeBlockIndices.empty()) {
for (const auto blockIndex: msg->RangeBlockIndices) {
const auto startIndex = cm.GetRangeStart(blockIndex);
tops.push_back({startIndex, cm.Get(startIndex)});
}
State->OnNewCompactionRange();
State->OnNewCompactionRange(msg->RangeBlockIndices.size());
} else if (msg->Mode == TEvPartitionPrivate::GarbageCompaction) {
if (batchCompactionEnabled &&
Config->GetGarbageCompactionRangeCountPerRun() > 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ class TForcedCompactionActor final
const TActorId Tablet;
const TVector<ui32> RangesToCompact;
const TDuration RetryTimeout;
const ui32 RangeCountPerRun;

size_t CurrentBlock = 0;
size_t CurrentRangeIndex = 0;

public:
TForcedCompactionActor(
const TActorId& tablet,
TVector<ui32> rangesToCompact,
TDuration retryTimeout);
TDuration retryTimeout,
ui32 rangeCountPerRun);

void Bootstrap(const TActorContext& ctx);

Expand Down Expand Up @@ -76,10 +78,12 @@ class TForcedCompactionActor final
TForcedCompactionActor::TForcedCompactionActor(
const TActorId& tablet,
TVector<ui32> rangesToCompact,
TDuration retryTimeout)
TDuration retryTimeout,
ui32 rangeCountPerRun)
: Tablet(tablet)
, RangesToCompact(std::move(rangesToCompact))
, RetryTimeout(retryTimeout)
, RangeCountPerRun(rangeCountPerRun)
{}

void TForcedCompactionActor::Bootstrap(const TActorContext& ctx)
Expand All @@ -90,9 +94,15 @@ void TForcedCompactionActor::Bootstrap(const TActorContext& ctx)

void TForcedCompactionActor::SendCompactionRequest(const TActorContext& ctx)
{
TVector<ui32> ranges(Reserve(RangeCountPerRun));
ranges.assign(
RangesToCompact.begin() + CurrentRangeIndex,
RangesToCompact.begin() +
Min(CurrentRangeIndex + RangeCountPerRun, RangesToCompact.size()));

auto request = std::make_unique<TEvPartitionPrivate::TEvCompactionRequest>(
MakeIntrusive<TCallContext>(),
RangesToCompact[CurrentBlock],
std::move(ranges),
TCompactionOptions().
set(ToBit(ECompactionOption::Forced)).
set(ToBit(ECompactionOption::Full)));
Expand Down Expand Up @@ -153,8 +163,8 @@ void TForcedCompactionActor::HandleCompactionResponse(
return;
}

++CurrentBlock;
if (CurrentBlock < RangesToCompact.size()) {
CurrentRangeIndex += RangeCountPerRun;
if (CurrentRangeIndex < RangesToCompact.size()) {
SendCompactionRequest(ctx);
} else {
ReplyAndDie(ctx);
Expand Down Expand Up @@ -322,11 +332,23 @@ void TPartitionActor::EnqueueForcedCompaction(const TActorContext& ctx)
compactInfo.OperationId,
compactInfo.RangesToCompact.size());

const bool batchCompactionEnabledForCloud =
Config->IsBatchCompactionFeatureEnabled(
PartitionConfig.GetCloudId(),
PartitionConfig.GetFolderId(),
PartitionConfig.GetDiskId());
const bool batchCompactionEnabled =
Config->GetBatchCompactionEnabled() ||
batchCompactionEnabledForCloud;

auto actorId = NCloud::Register<TForcedCompactionActor>(
ctx,
SelfId(),
std::move(compactInfo.RangesToCompact),
Config->GetCompactionRetryTimeout());
Config->GetCompactionRetryTimeout(),
batchCompactionEnabled
? Config->GetForcedCompactionRangeCountPerRun()
: 1);

PendingForcedCompactionRequests.pop_front();

Expand Down
18 changes: 14 additions & 4 deletions cloud/blockstore/libs/storage/partition/part_events_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -445,20 +445,30 @@ struct TEvPartitionPrivate
struct TCompactionRequest
{
ECompactionMode Mode = RangeCompaction;
TMaybe<ui32> BlockIndex;
TVector<ui32> RangeBlockIndices;
TCompactionOptions CompactionOptions;

TCompactionRequest() = default;

TCompactionRequest(
ui32 blockIndex,
TVector<ui32> rangeBlockIndices,
TCompactionOptions compactionOptions)
: BlockIndex(blockIndex)
: RangeBlockIndices(std::move(rangeBlockIndices))
, CompactionOptions(compactionOptions)
{}

TCompactionRequest(
ui32 blockIndex,
TCompactionOptions compactionOptions)
: TCompactionRequest(TVector<ui32>{blockIndex}, compactionOptions)
{}

TCompactionRequest(ui32 blockIndex)
: TCompactionRequest(blockIndex, {})
: TCompactionRequest(TVector<ui32>{blockIndex}, {})
{}

TCompactionRequest(TVector<ui32> rangeBlockIndices)
: TCompactionRequest(std::move(rangeBlockIndices), {})
{}

TCompactionRequest(ECompactionMode mode)
Expand Down
4 changes: 2 additions & 2 deletions cloud/blockstore/libs/storage/partition/part_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -929,9 +929,9 @@ class TPartitionState
ForcedCompactionState.OperationId = operationId;
}

void OnNewCompactionRange()
void OnNewCompactionRange(ui32 rangesCount)
{
++ForcedCompactionState.Progress;
ForcedCompactionState.Progress += rangesCount;
}

void ResetForcedCompaction()
Expand Down
39 changes: 34 additions & 5 deletions cloud/blockstore/libs/storage/partition/part_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11452,10 +11452,27 @@ Y_UNIT_TEST_SUITE(TPartitionTest)
UNIT_ASSERT_VALUES_EQUAL(0, stats.GetMergedBlobsCount());
}

TVector<size_t> rangeSizes;
const auto interceptCompactionRequest =
[&rangeSizes](TAutoPtr<IEventHandle>& event)
{
if (event->GetTypeRewrite() ==
TEvPartitionPrivate::EvCompactionRequest)
{
auto* msg =
event->Get<TEvPartitionPrivate::TEvCompactionRequest>();
rangeSizes.push_back(msg->RangeBlockIndices.size());
}
return TTestActorRuntime::DefaultObserverFunc(event);
};
runtime->SetObserverFunc(interceptCompactionRequest);

const auto blockRange1 = TBlockRange32::WithLength(0, 1024);
const auto blockRange2 = TBlockRange32::WithLength(1024 * 1024, 1024);
const auto blockRange3 =
TBlockRange32::WithLength(2 * 1024 * 1024, 1024);
const auto blockRange4 =
TBlockRange32::WithLength(3 * 1024 * 1024, 1024);

partition.WriteBlocks(blockRange1, 1);
partition.WriteBlocks(blockRange1, 2);
Expand All @@ -11468,17 +11485,22 @@ Y_UNIT_TEST_SUITE(TPartitionTest)
partition.WriteBlocks(blockRange3, 7);
partition.WriteBlocks(blockRange3, 8);

partition.WriteBlocks(blockRange4, 9);

{
const auto response = partition.StatPartition();
const auto& stats = response->Record.GetStats();
UNIT_ASSERT_VALUES_EQUAL(8, stats.GetMergedBlobsCount());
UNIT_ASSERT_VALUES_EQUAL(9, stats.GetMergedBlobsCount());
}

TCompactionOptions options;
options.set(ToBit(ECompactionOption::Forced));
partition.Compaction(0, options);
partition.SendCompactRangeRequest(0, 0);
runtime->DispatchEvents(TDispatchOptions(), TDuration::Seconds(1));
partition.Cleanup();

UNIT_ASSERT_EQUAL(2, rangeSizes.size());
UNIT_ASSERT_EQUAL(3, rangeSizes[0]);
UNIT_ASSERT_EQUAL(1, rangeSizes[1]);

// checking that data wasn't corrupted
UNIT_ASSERT_VALUES_EQUAL(
GetBlockContent(2),
Expand All @@ -11501,11 +11523,18 @@ Y_UNIT_TEST_SUITE(TPartitionTest)
GetBlockContent(8),
GetBlockContent(partition.ReadBlocks(blockRange3.End)));

UNIT_ASSERT_VALUES_EQUAL(
GetBlockContent(9),
GetBlockContent(partition.ReadBlocks(blockRange4.Start)));
UNIT_ASSERT_VALUES_EQUAL(
GetBlockContent(9),
GetBlockContent(partition.ReadBlocks(blockRange4.End)));

// checking that we now have 1 blob in each of the ranges
{
const auto response = partition.StatPartition();
const auto& stats = response->Record.GetStats();
UNIT_ASSERT_VALUES_EQUAL(3, stats.GetMergedBlobsCount());
UNIT_ASSERT_VALUES_EQUAL(4, stats.GetMergedBlobsCount());
}
}

Expand Down

0 comments on commit 6a5bab7

Please sign in to comment.