Skip to content

Commit

Permalink
Merge stable-24-1-16-analytics into stable-24-1 (#5596)
Browse files Browse the repository at this point in the history
Co-authored-by: Vitalii Gridnev <[email protected]>
Co-authored-by: Andrey Neporada <[email protected]>
Co-authored-by: niksaveliev <[email protected]>
Co-authored-by: Sergey Veselov <[email protected]>
Co-authored-by: alexnick88 <[email protected]>
Co-authored-by: Ilnaz Nizametdinov <[email protected]>
Co-authored-by: Iuliia Sidorina <[email protected]>
Co-authored-by: kungurtsev <[email protected]>
Co-authored-by: Nikolay Shestakov <[email protected]>
Co-authored-by: azevaykin <[email protected]>
Co-authored-by: DimasKovas <[email protected]>
Co-authored-by: ijon <[email protected]>
Co-authored-by: vporyadke <[email protected]>
Co-authored-by: kruall <[email protected]>
Co-authored-by: ivanmorozov333 <[email protected]>
Co-authored-by: Alexander Rutkovsky <[email protected]>
Co-authored-by: ildar-khisambeev <[email protected]>
Co-authored-by: nsofya <[email protected]>
Co-authored-by: nsofya <[email protected]>
Co-authored-by: Sofya Novozhilova <[email protected]>
Co-authored-by: Олег <[email protected]>
Co-authored-by: Ivan Morozov <[email protected]>
Co-authored-by: Andrei Rykov <[email protected]>
Co-authored-by: qyryq <[email protected]>
Co-authored-by: Daniil Cherednik <[email protected]>
Co-authored-by: Aleksei Borzenkov <[email protected]>
  • Loading branch information
1 parent fbfd87d commit 1339035
Show file tree
Hide file tree
Showing 921 changed files with 44,977 additions and 17,007 deletions.
2 changes: 0 additions & 2 deletions .github/config/muted_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ ydb-core-blobstorage-ut_blobstorage/SpaceCheckForDiskReassign::*
ydb-services-ydb-sdk_sessions_pool_ut/YdbSdkSessionsPool::StressTestSync10
ydb-tests-functional-kqp-kqp_query_session/KqpQuerySession::NoLocalAttach
ydb-core-blobstorage-ut_blobstorage/VDiskAssimilation::Test
ydb-core-tx-columnshard-ut_schema/TColumnShardTestSchema::ForgetAfterFail
ydb-core-tx-columnshard-ut_schema/TColumnShardTestSchema::RebootForgetAfterFail
ydb-library-yql-sql-pg-ut/PgSqlParsingAutoparam::AutoParamValues_DifferentTypes
ydb-core-blobstorage-ut_blobstorage/[6/10]*
ydb/core/blobstorage/ut_blobstorage/Defragmentation::DoesItWork
11 changes: 7 additions & 4 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ ydb/core/kafka_proxy/ut KafkaProtocol.CreatePartitionsScenario
ydb/core/kafka_proxy/ut KafkaProtocol.ProduceScenario
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
ydb/core/kqp/ut/federated_query/generic *
ydb/core/kqp/ut/olap *
ydb/core/kqp/ut/olap KqpOlapAggregations.Json_Exists
ydb/core/kqp/ut/olap KqpOlapIndexes.Indexes
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
ydb/core/kqp/ut/olap KqpOlap.ScanQueryOltpAndOlap
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
ydb/core/kqp/ut/olap KqpOlap.YqlScriptOltpAndOlap
ydb/core/kqp/ut/pg KqpPg.CreateIndex
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
Expand All @@ -32,9 +38,6 @@ ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
ydb/core/kqp/ut/service KqpQueryServiceScripts.ForgetScriptExecutionRace
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
ydb/core/kqp/ut/service [38/50]*
ydb/core/tx/columnshard/ut_schema TColumnShardTestSchema.ForgetAfterFail
ydb/core/tx/columnshard/ut_schema TColumnShardTestSchema.RebootForgetAfterFail
ydb/core/tx/columnshard/engines/ut *
ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration
ydb/core/tx/datashard/ut_change_exchange Cdc.InitialScanDebezium
ydb/core/tx/replication/ydb_proxy/ut YdbProxyTests.ReadTopic
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -981,14 +981,16 @@ struct TEvBlobStorage {
bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
mutable NLWTrace::TOrbit Orbit;
std::shared_ptr<TExecutionRelay> ExecutionRelay;
const TString StorageId;

TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare)
ui32 groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
: Status(status)
, Id(id)
, StatusFlags(statusFlags)
, GroupId(groupId)
, ApproximateFreeSpaceShare(approximateFreeSpaceShare)
, StorageId(storageId)
{}

TString Print(bool isFull) const {
Expand Down
18 changes: 12 additions & 6 deletions ydb/core/formats/arrow/arrow_batch_builder.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "arrow_batch_builder.h"
#include "switch/switch_type.h"
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h>

namespace NKikimr::NArrow {

namespace {
Expand Down Expand Up @@ -195,12 +195,18 @@ TArrowBatchBuilder::TArrowBatchBuilder(arrow::Compression::type codec, const std
WriteOptions.use_threads = false;
}

bool TArrowBatchBuilder::Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbColumns) {
arrow::Status TArrowBatchBuilder::Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbColumns) {
YdbSchema = ydbColumns;
auto schema = MakeArrowSchema(ydbColumns, NotNullColumns);
auto status = arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), RowsToReserve, &BatchBuilder);
if (!schema.ok()) {
return arrow::Status::FromArgs(schema.status().code(), "Cannot make arrow schema: ", schema.status().ToString());
}
auto status = arrow::RecordBatchBuilder::Make(*schema, arrow::default_memory_pool(), RowsToReserve, &BatchBuilder);
NumRows = NumBytes = 0;
return status.ok();
if (!status.ok()) {
return arrow::Status::FromArgs(schema.status().code(), "Cannot make arrow builder: ", status.ToString());
}
return arrow::Status::OK();
}

void TArrowBatchBuilder::AppendCell(const TCell& cell, ui32 colNum) {
Expand Down Expand Up @@ -259,7 +265,7 @@ void TArrowBatchBuilder::ReserveData(ui32 columnNo, size_t size) {
Y_ABORT_UNLESS(columnNo < YdbSchema.size());
auto type = YdbSchema[columnNo].second;

SwitchYqlTypeToArrowType(type, [&](const auto& type) {
Y_ABORT_UNLESS(SwitchYqlTypeToArrowType(type, [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
using TBuilder = typename arrow::TypeTraits<typename TWrap::T>::BuilderType;

Expand All @@ -270,7 +276,7 @@ void TArrowBatchBuilder::ReserveData(ui32 columnNo, size_t size) {
Y_ABORT_UNLESS(status.ok());
}
return true;
});
}));
}

std::shared_ptr<arrow::RecordBatch> TArrowBatchBuilder::FlushBatch(bool reinitialize) {
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/formats/arrow/arrow_batch_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "arrow_helpers.h"
#include <ydb/core/formats/factory.h>
#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/library/conclusion/status.h>

namespace NKikimr::NArrow {

Expand Down Expand Up @@ -155,8 +156,11 @@ class TArrowBatchBuilder : public NKikimr::IBlockBuilder {
ui64 maxRowsInBlock, ui64 maxBytesInBlock, TString& err) override {
Y_UNUSED(maxRowsInBlock);
Y_UNUSED(maxBytesInBlock);
Y_UNUSED(err);
return Start(columns);
const auto result = Start(columns);
if (!result.ok()) {
err = result.ToString();
}
return result.ok();
}

void AddRow(const NKikimr::TDbTupleRef& key, const NKikimr::TDbTupleRef& value) override;
Expand All @@ -175,7 +179,7 @@ class TArrowBatchBuilder : public NKikimr::IBlockBuilder {
return NumBytes;
}

bool Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns);
arrow::Status Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns);
std::shared_ptr<arrow::RecordBatch> FlushBatch(bool reinitialize);
std::shared_ptr<arrow::RecordBatch> GetBatch() const { return Batch; }

Expand Down
30 changes: 13 additions & 17 deletions ydb/core/formats/arrow/arrow_filter.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "arrow_filter.h"
#include "switch_type.h"
#include "common/container.h"
#include "common/adapter.h"

#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/chunked_array.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api_vector.h>
Expand Down Expand Up @@ -307,7 +310,7 @@ NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter(const arrow::D
return NArrow::TColumnFilter(std::move(bits));
}

template <arrow::Datum::Kind kindExpected, class TData>
template <class TData>
bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) {
if (!batch || !batch->num_rows()) {
return false;
Expand All @@ -322,33 +325,26 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
}
}
if (filter.IsTotalDenyFilter()) {
batch = batch->Slice(0, 0);
batch = NAdapter::TDataBuilderPolicy<TData>::GetEmptySame(batch);
return true;
}
if (filter.IsTotalAllowFilter()) {
return true;
}
auto res = arrow::compute::Filter(batch, filter.BuildArrowFilter(batch->num_rows(), startPos, count));
Y_VERIFY_S(res.ok(), res.status().message());
Y_ABORT_UNLESS((*res).kind() == kindExpected);
if constexpr (kindExpected == arrow::Datum::TABLE) {
batch = (*res).table();
return batch->num_rows();
}
if constexpr (kindExpected == arrow::Datum::RECORD_BATCH) {
batch = (*res).record_batch();
return batch->num_rows();
}
AFL_VERIFY(false);
return false;
batch = NAdapter::TDataBuilderPolicy<TData>::ApplyArrowFilter(batch, filter.BuildArrowFilter(batch->num_rows(), startPos, count));
return batch->num_rows();
}

bool TColumnFilter::Apply(std::shared_ptr<TGeneralContainer>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl(*this, batch, startPos, count);
}

bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl<arrow::Datum::TABLE>(*this, batch, startPos, count);
return ApplyImpl(*this, batch, startPos, count);
}

bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos, const std::optional<ui32> count) const {
return ApplyImpl<arrow::Datum::RECORD_BATCH>(*this, batch, startPos, count);
return ApplyImpl(*this, batch, startPos, count);
}

void TColumnFilter::Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const {
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/formats/arrow/arrow_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

namespace NKikimr::NArrow {

class TGeneralContainer;

enum class ECompareType {
LESS = 1,
LESS_OR_EQUAL,
Expand Down Expand Up @@ -62,6 +64,10 @@ class TColumnFilter {
return Filter.capacity() * sizeof(ui32) + Count * sizeof(bool);
}

static ui64 GetPredictedMemorySize(const ui32 recordsCount) {
return 2 /* capacity */ * recordsCount * (sizeof(ui32) + sizeof(bool));
}

class TIterator {
private:
i64 InternalPosition = 0;
Expand Down Expand Up @@ -172,6 +178,7 @@ class TColumnFilter {
// It makes a filter using composite predicate
static TColumnFilter MakePredicateFilter(const arrow::Datum& datum, const arrow::Datum& border, ECompareType compareType);

bool Apply(std::shared_ptr<TGeneralContainer>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
bool Apply(std::shared_ptr<arrow::Table>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const;
void Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const;
Expand Down
Loading

0 comments on commit 1339035

Please sign in to comment.