Skip to content

Commit

Permalink
sys view for granules (ydb-platform#3498)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Apr 5, 2024
1 parent c97ef92 commit a833a43
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 54 deletions.
2 changes: 2 additions & 0 deletions ydb/core/sys_view/common/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,10 @@ class TSystemViewResolver : public ISystemViewResolver {

RegisterOlapStoreSystemView<Schema::PrimaryIndexStats>(StorePrimaryIndexStatsName);
RegisterOlapStoreSystemView<Schema::PrimaryIndexPortionStats>(StorePrimaryIndexPortionStatsName);
RegisterOlapStoreSystemView<Schema::PrimaryIndexGranuleStats>(StorePrimaryIndexGranuleStatsName);
RegisterColumnTableSystemView<Schema::PrimaryIndexStats>(TablePrimaryIndexStatsName);
RegisterColumnTableSystemView<Schema::PrimaryIndexPortionStats>(TablePrimaryIndexPortionStatsName);
RegisterColumnTableSystemView<Schema::PrimaryIndexGranuleStats>(TablePrimaryIndexGranuleStatsName);

RegisterSystemView<Schema::TopPartitions>(TopPartitions1MinuteName);
RegisterSystemView<Schema::TopPartitions>(TopPartitions1HourName);
Expand Down
19 changes: 19 additions & 0 deletions ydb/core/sys_view/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ constexpr TStringBuf QueryMetricsName = "query_metrics_one_minute";

constexpr TStringBuf StorePrimaryIndexStatsName = "store_primary_index_stats";
constexpr TStringBuf StorePrimaryIndexPortionStatsName = "store_primary_index_portion_stats";
constexpr TStringBuf StorePrimaryIndexGranuleStatsName = "store_primary_index_granule_stats";
constexpr TStringBuf TablePrimaryIndexStatsName = "primary_index_stats";
constexpr TStringBuf TablePrimaryIndexPortionStatsName = "primary_index_portion_stats";
constexpr TStringBuf TablePrimaryIndexGranuleStatsName = "primary_index_granule_stats";

constexpr TStringBuf TopPartitions1MinuteName = "top_partitions_one_minute";
constexpr TStringBuf TopPartitions1HourName = "top_partitions_one_hour";
Expand Down Expand Up @@ -532,6 +534,23 @@ struct Schema : NIceDb::Schema {
>;
};

struct PrimaryIndexGranuleStats: Table<14> {
struct PathId: Column<1, NScheme::NTypeIds::Uint64> {};
struct TabletId: Column<2, NScheme::NTypeIds::Uint64> {};
struct PortionsCount: Column<3, NScheme::NTypeIds::Uint64> {};
struct HostName: Column<4, NScheme::NTypeIds::Utf8> {};
struct NodeId: Column<5, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<PathId, TabletId>;
using TColumns = TableColumns<
PathId,
TabletId,
PortionsCount,
HostName,
NodeId
>;
};

};

bool MaybeSystemViewPath(const TVector<TString>& path);
Expand Down
92 changes: 65 additions & 27 deletions ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,47 @@
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/abstract.h>
#include <ydb/core/tx/columnshard/engines/storage/granule.h>

namespace NKikimr::NOlap::NReader::NSysView::NAbstract {

class TGranuleMetaView {
private:
using TPortions = std::deque<std::shared_ptr<TPortionInfo>>;
YDB_READONLY(ui64, PathId, 0);
YDB_READONLY_DEF(TPortions, Portions);
public:
TGranuleMetaView(const TGranuleMeta& granule, const bool reverse)
: PathId(granule.GetPathId())
{
for (auto&& i : granule.GetPortions()) {
Portions.emplace_back(i.second);
}

const auto predSort = [](const std::shared_ptr<TPortionInfo>& l, const std::shared_ptr<TPortionInfo>& r) {
return l->GetPortionId() < r->GetPortionId();
};

std::sort(Portions.begin(), Portions.end(), predSort);
if (reverse) {
std::reverse(Portions.begin(), Portions.end());
}
}

bool operator<(const TGranuleMetaView& item) const {
return PathId < item.PathId;
}

std::shared_ptr<TPortionInfo> PopFrontPortion() {
if (Portions.empty()) {
return nullptr;
}
auto result = Portions.front();
Portions.pop_front();
return result;
}
};

struct TReadStatsMetadata: public TReadMetadataBase {
private:
using TBase = TReadMetadataBase;
Expand All @@ -15,9 +53,10 @@ struct TReadStatsMetadata: public TReadMetadataBase {
const ui64 TabletId;
std::vector<ui32> ReadColumnIds;
std::vector<ui32> ResultColumnIds;
std::deque<std::shared_ptr<TPortionInfo>> IndexPortions;
std::deque<TGranuleMetaView> IndexGranules;

explicit TReadStatsMetadata(const std::shared_ptr<TVersionedIndex>& info, ui64 tabletId, const ESorting sorting, const TProgramContainer& ssaProgram, const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot)
explicit TReadStatsMetadata(const std::shared_ptr<TVersionedIndex>& info, ui64 tabletId, const ESorting sorting,
const TProgramContainer& ssaProgram, const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot)
: TBase(info, sorting, ssaProgram, schema, requestSnapshot)
, TabletId(tabletId) {
}
Expand Down Expand Up @@ -63,31 +102,27 @@ class TStatsIterator : public TScanIteratorBase {

TStatsIterator(const NAbstract::TReadStatsMetadata::TConstPtr& readMetadata)
: ReadMetadata(readMetadata)
, Reverse(ReadMetadata->IsDescSorted())
, KeySchema(MakeArrowSchema(StatsSchema.Columns, StatsSchema.KeyColumns))
, ResultSchema(MakeArrowSchema(StatsSchema.Columns, ReadMetadata->ResultColumnIds))
, IndexPortions(ReadMetadata->IndexPortions)
, IndexGranules(ReadMetadata->IndexGranules)
{
if (ResultSchema->num_fields() == 0) {
ResultSchema = KeySchema;
}
if (Reverse) {
std::reverse(IndexPortions.begin(), IndexPortions.end());
}
}

bool Finished() const override {
return IndexPortions.empty();
return IndexGranules.empty();
}
protected:
virtual void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const TPortionInfo& portion) const = 0;
virtual ui32 GetConstructionRecordsCount(const TPortionInfo& portion) const = 0;
virtual void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, TGranuleMetaView& granule) const = 0;
virtual ui32 PredictRecordsCount(const TGranuleMetaView& granule) const = 0;
TReadStatsMetadata::TConstPtr ReadMetadata;
const bool Reverse = false;
std::shared_ptr<arrow::Schema> KeySchema;
std::shared_ptr<arrow::Schema> ResultSchema;

std::deque<std::shared_ptr<TPortionInfo>> IndexPortions;
std::deque<TGranuleMetaView> IndexGranules;

virtual TConclusion<std::optional<TPartialReadResult>> GetBatch() override {
// Take next raw batch
Expand Down Expand Up @@ -117,31 +152,34 @@ class TStatsIterator : public TScanIteratorBase {
}

std::shared_ptr<arrow::RecordBatch> FillStatsBatch() {
std::vector<std::shared_ptr<TPortionInfo>> portions;
ui32 recordsCount = 0;
while (IndexPortions.size()) {
auto& i = IndexPortions.front();
recordsCount += GetConstructionRecordsCount(*i);
portions.emplace_back(i);
IndexPortions.pop_front();
if (recordsCount > 10000) {
break;
}
}
std::vector<ui32> allColumnIds;
for (const auto& c : StatsSchema.Columns) {
allColumnIds.push_back(c.second.Id);
}
std::sort(allColumnIds.begin(), allColumnIds.end());
auto schema = MakeArrowSchema(StatsSchema.Columns, allColumnIds);
auto builders = NArrow::MakeBuilders(schema, recordsCount);

for (auto&& p : portions) {
AppendStats(builders, *p);
std::vector<std::unique_ptr<arrow::ArrayBuilder>> builders;
if (IndexGranules.size()) {
builders = NArrow::MakeBuilders(schema, PredictRecordsCount(IndexGranules.front()));
AppendStats(builders, IndexGranules.front());
if (IndexGranules.front().GetPortions().empty()) {
IndexGranules.pop_front();
}
} else {
builders = NArrow::MakeBuilders(schema);
}

auto columns = NArrow::Finish(std::move(builders));
return arrow::RecordBatch::Make(schema, recordsCount, columns);
AFL_VERIFY(columns.size());
std::optional<ui32> count;
for (auto&& i : columns) {
if (!count) {
count = i->length();
} else {
AFL_VERIFY(*count == i->length());
}
}
return arrow::RecordBatch::Make(schema, columns.front()->length(), columns);
}

void ApplyRangePredicates(std::shared_ptr<arrow::RecordBatch>& batch) {
Expand Down
22 changes: 22 additions & 0 deletions ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,26 @@ std::shared_ptr<NKikimr::NOlap::NReader::NSysView::NAbstract::TReadStatsMetadata
read.GetProgram(), index ? index->GetVersionedIndex().GetSchema(read.GetSnapshot()) : nullptr, read.GetSnapshot());
}

void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const {
ui64 recordsCount = 0;
while (auto portion = granule.PopFrontPortion()) {
recordsCount += portion->GetRecords().size() + portion->GetIndexes().size();
AppendStats(builders, *portion);
if (recordsCount > 10000) {
break;
}
}
}

ui32 TStatsIterator::PredictRecordsCount(const NAbstract::TGranuleMetaView& granule) const {
ui32 recordsCount = 0;
for (auto&& portion : granule.GetPortions()) {
recordsCount += portion->GetRecords().size() + portion->GetIndexes().size();
if (recordsCount > 10000) {
break;
}
}
return recordsCount;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ class TReadStatsMetadata: public NAbstract::TReadStatsMetadata, std::enable_shar
class TStatsIterator: public NAbstract::TStatsIterator<NKikimr::NSysView::Schema::PrimaryIndexStats> {
private:
using TBase = NAbstract::TStatsIterator<NKikimr::NSysView::Schema::PrimaryIndexStats>;
virtual void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const TPortionInfo& portion) const override;
virtual ui32 GetConstructionRecordsCount(const TPortionInfo& portion) const override {
return portion.GetRecords().size() + portion.GetIndexes().size();
}

virtual void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const override;
virtual ui32 PredictRecordsCount(const NAbstract::TGranuleMetaView& granule) const override;
void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const TPortionInfo& portion) const;
public:
using TBase::TBase;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,39 +36,39 @@ class TStatScannerConstructor: public IScannerConstructor {
if (!logsIndex) {
return dynamic_pointer_cast<TReadMetadataBase>(out);
}
THashMap<ui64, THashSet<ui64>> portionsInUse;
const auto predStatSchema = [](const std::shared_ptr<TPortionInfo>& l, const std::shared_ptr<TPortionInfo>& r) {
return std::tuple(l->GetPathId(), l->GetPortionId()) < std::tuple(r->GetPathId(), r->GetPortionId());
};
THashSet<ui64> pathIds;
for (auto&& filter : read.PKRangesFilter) {
const ui64 fromPathId = *filter.GetPredicateFrom().Get<arrow::UInt64Array>(0, 0, 1);
const ui64 toPathId = *filter.GetPredicateTo().Get<arrow::UInt64Array>(0, 0, Max<ui64>());
if (read.TableName.EndsWith(IIndexInfo::TABLE_INDEX_STATS_TABLE) || read.TableName.EndsWith(IIndexInfo::TABLE_INDEX_PORTION_STATS_TABLE)) {
if (fromPathId <= read.PathId && toPathId >= read.PathId) {
if (read.TableName.EndsWith(IIndexInfo::TABLE_INDEX_STATS_TABLE)
|| read.TableName.EndsWith(IIndexInfo::TABLE_INDEX_PORTION_STATS_TABLE)
|| read.TableName.EndsWith(IIndexInfo::TABLE_INDEX_GRANULE_STATS_TABLE)
) {
if (fromPathId <= read.PathId && read.PathId <= toPathId) {
auto pathInfo = logsIndex->GetGranuleOptional(read.PathId);
if (!pathInfo) {
continue;
}
for (auto&& p : pathInfo->GetPortions()) {
if (portionsInUse[read.PathId].emplace(p.first).second) {
out->IndexPortions.emplace_back(p.second);
}
if (pathIds.emplace(pathInfo->GetPathId()).second) {
out->IndexGranules.emplace_back(NAbstract::TGranuleMetaView(*pathInfo, out->IsDescSorted()));
}
}
std::sort(out->IndexPortions.begin(), out->IndexPortions.end(), predStatSchema);
} else if (read.TableName.EndsWith(IIndexInfo::STORE_INDEX_STATS_TABLE) || read.TableName.EndsWith(IIndexInfo::STORE_INDEX_PORTION_STATS_TABLE)) {
} else if (read.TableName.EndsWith(IIndexInfo::STORE_INDEX_STATS_TABLE)
|| read.TableName.EndsWith(IIndexInfo::STORE_INDEX_PORTION_STATS_TABLE)
|| read.TableName.EndsWith(IIndexInfo::STORE_INDEX_GRANULE_STATS_TABLE)
) {
auto pathInfos = logsIndex->GetTables(fromPathId, toPathId);
for (auto&& pathInfo : pathInfos) {
for (auto&& p : pathInfo->GetPortions()) {
if (portionsInUse[p.second->GetPathId()].emplace(p.first).second) {
out->IndexPortions.emplace_back(p.second);
}
if (pathIds.emplace(pathInfo->GetPathId()).second) {
out->IndexGranules.emplace_back(NAbstract::TGranuleMetaView(*pathInfo, out->IsDescSorted()));
}
}
std::sort(out->IndexPortions.begin(), out->IndexPortions.end(), predStatSchema);
}
}

std::sort(out->IndexGranules.begin(), out->IndexGranules.end());
if (out->IsDescSorted()) {
std::reverse(out->IndexGranules.begin(), out->IndexGranules.end());
}
return dynamic_pointer_cast<TReadMetadataBase>(out);
}
public:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include "granules.h"
#include <ydb/core/formats/arrow/switch/switch_type.h>
#include <ydb/core/tx/columnshard/blobs_action/common/const.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
#include <util/system/hostname.h>

namespace NKikimr::NOlap::NReader::NSysView::NGranules {

void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const {
NArrow::Append<arrow::UInt64Type>(*builders[0], granule.GetPathId());
NArrow::Append<arrow::UInt64Type>(*builders[1], ReadMetadata->TabletId);
NArrow::Append<arrow::UInt64Type>(*builders[2], granule.GetPortions().size());
NArrow::Append<arrow::StringType>(*builders[3], HostNameField);
NArrow::Append<arrow::UInt64Type>(*builders[4], NActors::TActivationContext::AsActorContext().SelfID.NodeId());
while (granule.PopFrontPortion()) {
}
}

std::unique_ptr<TScanIteratorBase> TReadStatsMetadata::StartScan(const std::shared_ptr<TReadContext>& readContext) const {
return std::make_unique<TStatsIterator>(readContext->GetReadMetadataPtrVerifiedAs<TReadStatsMetadata>());
}

std::vector<std::pair<TString, NKikimr::NScheme::TTypeInfo>> TReadStatsMetadata::GetKeyYqlSchema() const {
return GetColumns(TStatsIterator::StatsSchema, TStatsIterator::StatsSchema.KeyColumns);
}

std::shared_ptr<NAbstract::TReadStatsMetadata> TConstructor::BuildMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const {
auto* index = self->GetIndexOptional();
return std::make_shared<TReadStatsMetadata>(index ? index->CopyVersionedIndexPtr() : nullptr, self->TabletID(),
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC,
read.GetProgram(), index ? index->GetVersionedIndex().GetSchema(read.GetSnapshot()) : nullptr, read.GetSnapshot());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once
#include <ydb/core/sys_view/common/schema.h>
#include <ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h>
#include <ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h>
#include <util/system/hostname.h>

namespace NKikimr::NOlap::NReader::NSysView::NGranules {

class TConstructor: public TStatScannerConstructor<NKikimr::NSysView::Schema::PrimaryIndexGranuleStats> {
private:
using TBase = TStatScannerConstructor<NKikimr::NSysView::Schema::PrimaryIndexGranuleStats>;
protected:
virtual std::shared_ptr<NAbstract::TReadStatsMetadata> BuildMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const override;

public:
using TBase::TBase;
};

struct TReadStatsMetadata: public NAbstract::TReadStatsMetadata {
private:
using TBase = NAbstract::TReadStatsMetadata;
using TSysViewSchema = NKikimr::NSysView::Schema::PrimaryIndexGranuleStats;
public:
using TBase::TBase;

virtual std::unique_ptr<TScanIteratorBase> StartScan(const std::shared_ptr<TReadContext>& readContext) const override;
virtual std::vector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const override;
};

class TStatsIterator : public NAbstract::TStatsIterator<NKikimr::NSysView::Schema::PrimaryIndexGranuleStats> {
private:
const std::string HostNameField = HostName();
using TBase = NAbstract::TStatsIterator<NKikimr::NSysView::Schema::PrimaryIndexGranuleStats>;
virtual ui32 PredictRecordsCount(const NAbstract::TGranuleMetaView& /*granule*/) const override {
return 1;
}
virtual void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const override;
public:
using TBase::TBase;
};

}
12 changes: 12 additions & 0 deletions ydb/core/tx/columnshard/engines/reader/sys_view/granules/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
LIBRARY()

PEERDIR(
ydb/core/tx/columnshard/engines/reader/sys_view/abstract
)

SRCS(
granules.cpp
)

END()

Loading

0 comments on commit a833a43

Please sign in to comment.