Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Improved Q9 by moving BuildFlatmapStage to a later stage and adding a Flatmap pushdown (#11860)" #13774

Merged
merged 2 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>));
AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>));
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildPureFlatmapStage));
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>));
AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>));
AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<false>));
AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<false>));
Expand Down Expand Up @@ -86,15 +87,14 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
AddHandler(0, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<false>));
AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>));
AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>));
AddHandler(0, &TCoFlatMapBase::Match, HNDL(PushFlatmapToStage<false>));

AddHandler(0, &TCoAggregateCombine::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateCombineState::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateMergeState::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateMergeFinalize::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateMergeManyFinalize::Match, HNDL(ExpandAggregatePhase));
AddHandler(0, &TCoAggregateFinalize::Match, HNDL(ExpandAggregatePhase));

AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>));
AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>));
AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>));
AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<true>));
Expand Down Expand Up @@ -297,15 +297,6 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> PushFlatmapToStage(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
{
TExprBase output = DqPushFlatmapToStage(node, ctx, optCtx, *getParents(), IsGlobal);
DumpAppliedRule("DqPushFlatmapToStage", node.Ptr(), output.Ptr(), ctx);
return output;
}

template <bool IsGlobal>
TMaybeNode<TExprBase> PushCombineToStage(TExprBase node, TExprContext& ctx,
IOptimizationContext& optCtx, const TGetParents& getParents)
Expand Down
84 changes: 0 additions & 84 deletions ydb/core/kqp/ut/join/data/queries/tpcds9.sql

This file was deleted.

22 changes: 0 additions & 22 deletions ydb/core/kqp/ut/join/data/queries/tpcds9_small.sql

This file was deleted.

8 changes: 0 additions & 8 deletions ydb/core/kqp/ut/join/kqp_join_order_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,14 +468,6 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
ExecuteJoinOrderTestDataQueryWithStats("queries/tpch21.sql", "stats/tpch1000s.json", StreamLookupJoin, ColumnStore);
}

Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TPCDS9, StreamLookupJoin, ColumnStore) {
ExecuteJoinOrderTestDataQueryWithStats("queries/tpcds9.sql", "stats/tpcds1000s.json", StreamLookupJoin, ColumnStore);
}

Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TPCDS9_SMALL, StreamLookupJoin, ColumnStore) {
ExecuteJoinOrderTestDataQueryWithStats("queries/tpcds9_small.sql", "stats/tpcds1000s.json", StreamLookupJoin, ColumnStore);
}

Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TPCDS16, StreamLookupJoin, ColumnStore) {
ExecuteJoinOrderTestDataQueryWithStats("queries/tpcds16.sql", "stats/tpcds1000s.json", StreamLookupJoin, ColumnStore);
}
Expand Down
89 changes: 8 additions & 81 deletions ydb/library/yql/dq/opt/dq_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,8 +748,6 @@ TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) {
TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage)
{
Y_UNUSED(optCtx);

if (!node.Maybe<TCoFlatMapBase>().Input().Maybe<TDqCnUnionAll>()) {
return node;
}
Expand Down Expand Up @@ -781,25 +779,14 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo
return TExprBase(ctx.ChangeChild(*node.Raw(), TCoFlatMapBase::idx_Input, std::move(connToPushableStage)));
}

TCoLambda lambda = flatmap.Lambda();

if (flatmap.Maybe<TCoFlatMap>()) {
lambda = Build<TCoLambda>(ctx, flatmap.Lambda().Pos())
.Args({"stream"})
.Body<TCoFlatMap>()
.Input("stream")
.Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
.Build()
.Done();
} else {
lambda = Build<TCoLambda>(ctx, flatmap.Lambda().Pos())
.Args({"stream"})
.Body<TCoOrderedFlatMap>()
.Input("stream")
.Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
.Build()
.Done();
}
auto lambda = TCoLambda(ctx.Builder(flatmap.Lambda().Pos())
.Lambda()
.Param("stream")
.Callable(flatmap.Ref().Content())
.Arg(0, "stream")
.Add(1, ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
.Seal()
.Seal().Build());

auto pushResult = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx);
if (pushResult) {
Expand All @@ -825,66 +812,6 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo
.Done();
}

TExprBase DqPushFlatmapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage)
{
if (!node.Maybe<TCoFlatMapBase>().Input().Maybe<TDqCnUnionAll>()) {
return node;
}

auto flatmap = node.Cast<TCoFlatMapBase>();
if (!IsDqSelfContainedExpr(flatmap.Lambda())) {
return node;
}
auto dqUnion = flatmap.Input().Cast<TDqCnUnionAll>();
if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) {
return node;
}

bool isPure;
TVector<TDqConnection> innerConnections;
FindDqConnections(flatmap.Lambda(), innerConnections, isPure);
if (!isPure) {
return node;
}

TMaybeNode<TDqStage> flatmapStage;
if (!innerConnections.empty()) {
return node;
} else {
if (auto connToPushableStage = DqBuildPushableStage(dqUnion, ctx)) {
return TExprBase(ctx.ChangeChild(*node.Raw(), TCoFlatMapBase::idx_Input, std::move(connToPushableStage)));
}

TCoLambda lambda = flatmap.Lambda();

if (flatmap.Maybe<TCoFlatMap>()) {
lambda = Build<TCoLambda>(ctx, flatmap.Lambda().Pos())
.Args({"stream"})
.Body<TCoFlatMap>()
.Input("stream")
.Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
.Build()
.Done();
} else {
lambda = Build<TCoLambda>(ctx, flatmap.Lambda().Pos())
.Args({"stream"})
.Body<TCoOrderedFlatMap>()
.Input("stream")
.Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
.Build()
.Done();
}

auto pushResult = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx);
if (pushResult) {
return pushResult.Cast();
}
}

return node;
}

template <typename BaseLMap>
TExprBase DqPushBaseLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true)
Expand Down
3 changes: 0 additions & 3 deletions ydb/library/yql/dq/opt/dq_opt_phy.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ NNodes::TExprBase DqBuildPureFlatmapStage(NNodes::TExprBase node, TExprContext&
NNodes::TExprBase DqBuildFlatmapStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);

NNodes::TExprBase DqPushFlatmapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);

NNodes::TExprBase DqPushCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@
"PlanNodeType": "Connection",
"Plans": [
{
"CTE Name": "precompute_0_0",
"CTE Name": "precompute_0_1",
"Node Type": "ConstantExpr",
"Operators": [
{
"Inputs": [],
"Iterator": "precompute_0_0",
"Iterator": "precompute_0_1",
"Name": "Iterator"
}
],
Expand All @@ -201,12 +201,12 @@
"PlanNodeType": "Connection",
"Plans": [
{
"CTE Name": "precompute_0_1",
"CTE Name": "precompute_0_0",
"Node Type": "ConstantExpr",
"Operators": [
{
"Inputs": [],
"Iterator": "precompute_0_1",
"Iterator": "precompute_0_0",
"Name": "Iterator"
}
],
Expand All @@ -232,11 +232,11 @@
"PlanNodeType": "Materialize",
"Plans": [
{
"CTE Name": "precompute_0_0",
"CTE Name": "precompute_0_1",
"Node Type": "Aggregate",
"Operators": [
{
"Input": "precompute_0_0",
"Input": "precompute_0_1",
"Inputs": [],
"Name": "PartitionByKey"
}
Expand Down
Loading