diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index ffaf73ea42cd..cfffab5447fb 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -50,6 +50,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage)); AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage)); AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildPureFlatmapStage)); + AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage)); AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage)); AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage)); AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage)); @@ -86,7 +87,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { AddHandler(0, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset)); AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake)); AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap)); - AddHandler(0, &TCoFlatMapBase::Match, HNDL(PushFlatmapToStage)); + AddHandler(0, &TCoAggregateCombine::Match, HNDL(ExpandAggregatePhase)); AddHandler(0, &TCoAggregateCombineState::Match, HNDL(ExpandAggregatePhase)); AddHandler(0, &TCoAggregateMergeState::Match, HNDL(ExpandAggregatePhase)); @@ -94,7 +95,6 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { AddHandler(0, &TCoAggregateMergeManyFinalize::Match, HNDL(ExpandAggregatePhase)); AddHandler(0, &TCoAggregateFinalize::Match, HNDL(ExpandAggregatePhase)); - AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage)); AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage)); AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage)); AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage)); @@ -297,15 +297,6 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { return output; } - template - TMaybeNode PushFlatmapToStage(TExprBase node, TExprContext& ctx, - IOptimizationContext& optCtx, const TGetParents& getParents) - { - TExprBase output = DqPushFlatmapToStage(node, ctx, optCtx, *getParents(), IsGlobal); - DumpAppliedRule("DqPushFlatmapToStage", node.Ptr(), output.Ptr(), ctx); - return output; - } - template TMaybeNode PushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) diff --git a/ydb/core/kqp/ut/join/data/queries/tpcds9.sql b/ydb/core/kqp/ut/join/data/queries/tpcds9.sql deleted file mode 100644 index 370fce6aa35e..000000000000 --- a/ydb/core/kqp/ut/join/data/queries/tpcds9.sql +++ /dev/null @@ -1,84 +0,0 @@ -pragma TablePathPrefix = "/Root/test/ds/"; - -$count_1_20 = (select count(*) - from store_sales as store_sales - where ss_quantity between 1 and 20); - -$avg_ss_ext_discount_amt_1_20 = (select avg(ss_ext_discount_amt) - from store_sales as store_sales - where ss_quantity between 1 and 20); - -$avg_ss_net_profit_1_20 = (select avg(ss_net_profit) - from store_sales as store_sales - where ss_quantity between 1 and 20); - - -$count_21_40 = (select count(*) - from store_sales as store_sales - where ss_quantity between 21 and 40); - -$avg_ss_ext_discount_amt_21_40 = (select avg(ss_ext_discount_amt) - from store_sales as store_sales - where ss_quantity between 21 and 40); - -$avg_ss_net_profit_21_40 = (select avg(ss_net_profit) - from store_sales as store_sales - where ss_quantity between 21 and 40); - - -$count_41_60 = (select count(*) - from store_sales as store_sales - where ss_quantity between 41 and 60); - -$avg_ss_ext_discount_amt_41_60 = (select avg(ss_ext_discount_amt) - from store_sales as store_sales - where ss_quantity between 41 and 60); - -$avg_ss_net_profit_41_60 = (select avg(ss_net_profit) - from store_sales as store_sales - where ss_quantity between 41 and 60); - - -$count_61_80 = (select count(*) - from store_sales as store_sales - where ss_quantity between 61 and 80); - -$avg_ss_ext_discount_amt_61_80 = (select avg(ss_ext_discount_amt) - from store_sales as store_sales - where ss_quantity between 61 and 80); - -$avg_ss_net_profit_61_80 = (select avg(ss_net_profit) - from store_sales as store_sales - where ss_quantity between 61 and 80); - - -$count_81_100 = (select count(*) - from store_sales as store_sales - where ss_quantity between 81 and 100); - -$avg_ss_ext_discount_amt_81_100 = (select avg(ss_ext_discount_amt) - from store_sales as store_sales - where ss_quantity between 81 and 100); - -$avg_ss_net_profit_81_100 = (select avg(ss_net_profit) - from store_sales as store_sales - where ss_quantity between 81 and 100); - - -select case when $count_1_20 > 98972190 - then $avg_ss_ext_discount_amt_1_20 - else $avg_ss_net_profit_1_20 end bucket1 , - case when $count_21_40 > 160856845 - then $avg_ss_ext_discount_amt_21_40 - else $avg_ss_net_profit_21_40 end bucket2, - case when $count_41_60 > 12733327 - then $avg_ss_ext_discount_amt_41_60 - else $avg_ss_net_profit_41_60 end bucket3, - case when $count_61_80 > 96251173 - then $avg_ss_ext_discount_amt_61_80 - else $avg_ss_net_profit_61_80 end bucket4, - case when $count_81_100 > 80049606 - then $avg_ss_ext_discount_amt_81_100 - else $avg_ss_net_profit_81_100 end bucket5 -from reason as reason -where r_reason_sk = 1; \ No newline at end of file diff --git a/ydb/core/kqp/ut/join/data/queries/tpcds9_small.sql b/ydb/core/kqp/ut/join/data/queries/tpcds9_small.sql deleted file mode 100644 index e5abf3345490..000000000000 --- a/ydb/core/kqp/ut/join/data/queries/tpcds9_small.sql +++ /dev/null @@ -1,22 +0,0 @@ -pragma TablePathPrefix = "/Root/test/ds/"; - ----$count_1_20 = (select count(*) --- from store_sales as store_sales --- where ss_quantity between 1 and 20); - -$avg_ss_ext_discount_amt_1_20 = (select avg(ss_ext_discount_amt) - from store_sales as store_sales - where ss_quantity between 1 and 20); - -$avg_ss_net_profit_1_20 = (select avg(ss_net_profit) - from store_sales as store_sales - where ss_quantity between 1 and 20); - - ---select case when $count_1_20 > 98972190 -select case when 1e20 > 98972190 - then $avg_ss_ext_discount_amt_1_20 - else $avg_ss_net_profit_1_20 end bucket1 - -from reason as reason -where r_reason_sk = 1; \ No newline at end of file diff --git a/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp b/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp index 9f8cc21b0320..98f14236d903 100644 --- a/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_join_order_ut.cpp @@ -468,14 +468,6 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) { ExecuteJoinOrderTestDataQueryWithStats("queries/tpch21.sql", "stats/tpch1000s.json", StreamLookupJoin, ColumnStore); } - Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TPCDS9, StreamLookupJoin, ColumnStore) { - ExecuteJoinOrderTestDataQueryWithStats("queries/tpcds9.sql", "stats/tpcds1000s.json", StreamLookupJoin, ColumnStore); - } - - Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TPCDS9_SMALL, StreamLookupJoin, ColumnStore) { - ExecuteJoinOrderTestDataQueryWithStats("queries/tpcds9_small.sql", "stats/tpcds1000s.json", StreamLookupJoin, ColumnStore); - } - Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TPCDS16, StreamLookupJoin, ColumnStore) { ExecuteJoinOrderTestDataQueryWithStats("queries/tpcds16.sql", "stats/tpcds1000s.json", StreamLookupJoin, ColumnStore); } diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 17a952e2c2df..379a08565dc7 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -748,8 +748,6 @@ TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) { TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage) { - Y_UNUSED(optCtx); - if (!node.Maybe().Input().Maybe()) { return node; } @@ -781,25 +779,14 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo return TExprBase(ctx.ChangeChild(*node.Raw(), TCoFlatMapBase::idx_Input, std::move(connToPushableStage))); } - TCoLambda lambda = flatmap.Lambda(); - - if (flatmap.Maybe()) { - lambda = Build(ctx, flatmap.Lambda().Pos()) - .Args({"stream"}) - .Body() - .Input("stream") - .Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref())) - .Build() - .Done(); - } else { - lambda = Build(ctx, flatmap.Lambda().Pos()) - .Args({"stream"}) - .Body() - .Input("stream") - .Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref())) - .Build() - .Done(); - } + auto lambda = TCoLambda(ctx.Builder(flatmap.Lambda().Pos()) + .Lambda() + .Param("stream") + .Callable(flatmap.Ref().Content()) + .Arg(0, "stream") + .Add(1, ctx.DeepCopyLambda(flatmap.Lambda().Ref())) + .Seal() + .Seal().Build()); auto pushResult = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx); if (pushResult) { @@ -825,66 +812,6 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo .Done(); } -TExprBase DqPushFlatmapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage) -{ - if (!node.Maybe().Input().Maybe()) { - return node; - } - - auto flatmap = node.Cast(); - if (!IsDqSelfContainedExpr(flatmap.Lambda())) { - return node; - } - auto dqUnion = flatmap.Input().Cast(); - if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) { - return node; - } - - bool isPure; - TVector innerConnections; - FindDqConnections(flatmap.Lambda(), innerConnections, isPure); - if (!isPure) { - return node; - } - - TMaybeNode flatmapStage; - if (!innerConnections.empty()) { - return node; - } else { - if (auto connToPushableStage = DqBuildPushableStage(dqUnion, ctx)) { - return TExprBase(ctx.ChangeChild(*node.Raw(), TCoFlatMapBase::idx_Input, std::move(connToPushableStage))); - } - - TCoLambda lambda = flatmap.Lambda(); - - if (flatmap.Maybe()) { - lambda = Build(ctx, flatmap.Lambda().Pos()) - .Args({"stream"}) - .Body() - .Input("stream") - .Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref())) - .Build() - .Done(); - } else { - lambda = Build(ctx, flatmap.Lambda().Pos()) - .Args({"stream"}) - .Body() - .Input("stream") - .Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref())) - .Build() - .Done(); - } - - auto pushResult = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx); - if (pushResult) { - return pushResult.Cast(); - } - } - - return node; -} - template TExprBase DqPushBaseLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true) diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 689cd7351f34..b2912ad24334 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -38,9 +38,6 @@ NNodes::TExprBase DqBuildPureFlatmapStage(NNodes::TExprBase node, TExprContext& NNodes::TExprBase DqBuildFlatmapStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); -NNodes::TExprBase DqPushFlatmapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage = true); - NNodes::TExprBase DqPushCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage = true); diff --git a/ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join2.test_/query_12.plan b/ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join2.test_/query_12.plan index 859bc60e1a85..17c137295633 100644 --- a/ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join2.test_/query_12.plan +++ b/ydb/tests/functional/suite_tests/canondata/test_postgres.TestPGSQL.test_sql_suite_plan-jointest_join2.test_/query_12.plan @@ -178,12 +178,12 @@ "PlanNodeType": "Connection", "Plans": [ { - "CTE Name": "precompute_0_0", + "CTE Name": "precompute_0_1", "Node Type": "ConstantExpr", "Operators": [ { "Inputs": [], - "Iterator": "precompute_0_0", + "Iterator": "precompute_0_1", "Name": "Iterator" } ], @@ -201,12 +201,12 @@ "PlanNodeType": "Connection", "Plans": [ { - "CTE Name": "precompute_0_1", + "CTE Name": "precompute_0_0", "Node Type": "ConstantExpr", "Operators": [ { "Inputs": [], - "Iterator": "precompute_0_1", + "Iterator": "precompute_0_0", "Name": "Iterator" } ], @@ -232,11 +232,11 @@ "PlanNodeType": "Materialize", "Plans": [ { - "CTE Name": "precompute_0_0", + "CTE Name": "precompute_0_1", "Node Type": "Aggregate", "Operators": [ { - "Input": "precompute_0_0", + "Input": "precompute_0_1", "Inputs": [], "Name": "PartitionByKey" }