Skip to content

Commit

Permalink
Revert "Improved Q9 by moving BuildFlatmapStage to a later stage and …
Browse files Browse the repository at this point in the history
…adding a Flatmap pushdown (#11860)" (#13774)

Co-authored-by: Pavel Velikhov <[email protected]>
  • Loading branch information
pavelvelikhov and Pavel Velikhov authored Jan 24, 2025
1 parent 9ca35af commit cdeec5d
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 215 deletions.
13 changes: 2 additions & 11 deletions ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>));
AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>));
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildPureFlatmapStage));
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>));
AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>));
AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<false>));
AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<false>));
Expand Down Expand Up @@ -86,15 +87,14 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(0, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<false>));
AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>));
AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>));
AddHandler(0, &TCoFlatMapBase::Match, HNDL(PushFlatmapToStage<false>));

AddHandler(0, &TCoAggregateCombine::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateCombineState::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateMergeState::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateMergeFinalize::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateMergeManyFinalize::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateFinalize::Match, HNDL(ExpandAggregatePhase));

AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>));
AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>));
AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>));
AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<true>));
Expand Down Expand Up @@ -297,15 +297,6 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> PushFlatmapToStage(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
{
TExprBase output = DqPushFlatmapToStage(node, ctx, optCtx, *getParents(), IsGlobal);
DumpAppliedRule("DqPushFlatmapToStage", node.Ptr(), output.Ptr(), ctx);
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> PushCombineToStage(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
Expand Down
84 changes: 0 additions & 84 deletions ydb/core/kqp/ut/join/data/queries/tpcds9.sql

This file was deleted.

22 changes: 0 additions & 22 deletions ydb/core/kqp/ut/join/data/queries/tpcds9_small.sql

This file was deleted.

8 changes: 0 additions & 8 deletions ydb/core/kqp/ut/join/kqp_join_order_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,14 +468,6 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
ExecuteJoinOrderTestDataQueryWithStats("queries/tpch21.sql", "stats/tpch1000s.json", StreamLookupJoin, ColumnStore);
}

Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TPCDS9, StreamLookupJoin, ColumnStore) {
ExecuteJoinOrderTestDataQueryWithStats("queries/tpcds9.sql", "stats/tpcds1000s.json", StreamLookupJoin, ColumnStore);
}

Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TPCDS9_SMALL, StreamLookupJoin, ColumnStore) {
ExecuteJoinOrderTestDataQueryWithStats("queries/tpcds9_small.sql", "stats/tpcds1000s.json", StreamLookupJoin, ColumnStore);
}

Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TPCDS16, StreamLookupJoin, ColumnStore) {
ExecuteJoinOrderTestDataQueryWithStats("queries/tpcds16.sql", "stats/tpcds1000s.json", StreamLookupJoin, ColumnStore);
}
Expand Down
89 changes: 8 additions & 81 deletions ydb/library/yql/dq/opt/dq_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,6 @@ TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) {
TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage)
{
Y_UNUSED(optCtx);

if (!node.Maybe<TCoFlatMapBase>().Input().Maybe<TDqCnUnionAll>()) {
return node;
}
Expand Down Expand Up @@ -781,25 +779,14 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo
return TExprBase(ctx.ChangeChild(*node.Raw(), TCoFlatMapBase::idx_Input, std::move(connToPushableStage)));
}

TCoLambda lambda = flatmap.Lambda();

if (flatmap.Maybe<TCoFlatMap>()) {
lambda = Build<TCoLambda>(ctx, flatmap.Lambda().Pos())
.Args({"stream"})
.Body<TCoFlatMap>()
.Input("stream")
.Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
.Build()
.Done();
} else {
lambda = Build<TCoLambda>(ctx, flatmap.Lambda().Pos())
.Args({"stream"})
.Body<TCoOrderedFlatMap>()
.Input("stream")
.Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
.Build()
.Done();
}
auto lambda = TCoLambda(ctx.Builder(flatmap.Lambda().Pos())
.Lambda()
.Param("stream")
.Callable(flatmap.Ref().Content())
.Arg(0, "stream")
.Add(1, ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
.Seal()
.Seal().Build());

auto pushResult = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx);
if (pushResult) {
Expand All @@ -825,66 +812,6 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo
.Done();
}

TExprBase DqPushFlatmapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage)
{
if (!node.Maybe<TCoFlatMapBase>().Input().Maybe<TDqCnUnionAll>()) {
return node;
}

auto flatmap = node.Cast<TCoFlatMapBase>();
if (!IsDqSelfContainedExpr(flatmap.Lambda())) {
return node;
}
auto dqUnion = flatmap.Input().Cast<TDqCnUnionAll>();
if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) {
return node;
}

bool isPure;
TVector<TDqConnection> innerConnections;
FindDqConnections(flatmap.Lambda(), innerConnections, isPure);
if (!isPure) {
return node;
}

TMaybeNode<TDqStage> flatmapStage;
if (!innerConnections.empty()) {
return node;
} else {
if (auto connToPushableStage = DqBuildPushableStage(dqUnion, ctx)) {
return TExprBase(ctx.ChangeChild(*node.Raw(), TCoFlatMapBase::idx_Input, std::move(connToPushableStage)));
}

TCoLambda lambda = flatmap.Lambda();

if (flatmap.Maybe<TCoFlatMap>()) {
lambda = Build<TCoLambda>(ctx, flatmap.Lambda().Pos())
.Args({"stream"})
.Body<TCoFlatMap>()
.Input("stream")
.Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
.Build()
.Done();
} else {
lambda = Build<TCoLambda>(ctx, flatmap.Lambda().Pos())
.Args({"stream"})
.Body<TCoOrderedFlatMap>()
.Input("stream")
.Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
.Build()
.Done();
}

auto pushResult = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx);
if (pushResult) {
return pushResult.Cast();
}
}

return node;
}

template <typename BaseLMap>
TExprBase DqPushBaseLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true)
Expand Down
3 changes: 0 additions & 3 deletions ydb/library/yql/dq/opt/dq_opt_phy.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ NNodes::TExprBase DqBuildPureFlatmapStage(NNodes::TExprBase node, TExprContext&
NNodes::TExprBase DqBuildFlatmapStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);

NNodes::TExprBase DqPushFlatmapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);

NNodes::TExprBase DqPushCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@
"PlanNodeType": "Connection",
"Plans": [
{
"CTE Name": "precompute_0_0",
"CTE Name": "precompute_0_1",
"Node Type": "ConstantExpr",
"Operators": [
{
"Inputs": [],
"Iterator": "precompute_0_0",
"Iterator": "precompute_0_1",
"Name": "Iterator"
}
],
Expand All @@ -201,12 +201,12 @@
"PlanNodeType": "Connection",
"Plans": [
{
"CTE Name": "precompute_0_1",
"CTE Name": "precompute_0_0",
"Node Type": "ConstantExpr",
"Operators": [
{
"Inputs": [],
"Iterator": "precompute_0_1",
"Iterator": "precompute_0_0",
"Name": "Iterator"
}
],
Expand All @@ -232,11 +232,11 @@
"PlanNodeType": "Materialize",
"Plans": [
{
"CTE Name": "precompute_0_0",
"CTE Name": "precompute_0_1",
"Node Type": "Aggregate",
"Operators": [
{
"Input": "precompute_0_0",
"Input": "precompute_0_1",
"Inputs": [],
"Name": "PartitionByKey"
}
Expand Down

0 comments on commit cdeec5d

Please sign in to comment.