Skip to content

Commit 2300bb4

Browse files
authored
Use "mix" shuffle spec for target size with nil clusterBy. (#17810)
When a nil clusterBy is used, we have no way of achieving a particular target size, so we need to fall back to a "mix" spec (unsorted single partition). This comes up for queries like "SELECT COUNT(*) FROM FOO LIMIT 1" when results use a target size, such as when we are inserting into another table or when we are writing to durable storage.
1 parent bb0c502 commit 2300bb4

File tree

2 files changed

+46
-3
lines changed

2 files changed

+46
-3
lines changed

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/ShuffleSpecFactories.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.msq.querykit;
2121

22+
import org.apache.druid.frame.key.ClusterBy;
2223
import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
2324
import org.apache.druid.msq.kernel.GlobalSortTargetSizeShuffleSpec;
2425
import org.apache.druid.msq.kernel.MixShuffleSpec;
@@ -69,11 +70,21 @@ public static ShuffleSpecFactory globalSortWithMaxPartitionCount(final int parti
6970
}
7071

7172
/**
72-
* Factory that produces globally sorted partitions of a target size.
73+
* Factory that produces globally sorted partitions of a target size, using the {@link ClusterBy} to partition
74+
* rows across partitions.
75+
*
76+
* Produces {@link MixShuffleSpec}, ignoring the target size, if the provided {@link ClusterBy} is empty.
7377
*/
7478
public static ShuffleSpecFactory getGlobalSortWithTargetSize(int targetSize)
7579
{
76-
return (clusterBy, aggregate) ->
77-
new GlobalSortTargetSizeShuffleSpec(clusterBy, targetSize, aggregate);
80+
return (clusterBy, aggregate) -> {
81+
if (clusterBy.isEmpty()) {
82+
// Cannot partition or sort meaningfully because there are no cluster-by keys. Generate a MixShuffleSpec
83+
// so everything goes into a single partition.
84+
return MixShuffleSpec.instance();
85+
} else {
86+
return new GlobalSortTargetSizeShuffleSpec(clusterBy, targetSize, aggregate);
87+
}
88+
};
7889
}
7990
}

extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java

+32
Original file line numberDiff line numberDiff line change
@@ -1606,6 +1606,38 @@ public void testInsertOnFoo1WithLimit(String contextName, Map<String, Object> co
16061606
.verifyResults();
16071607
}
16081608

1609+
@MethodSource("data")
1610+
@ParameterizedTest(name = "{index}:with context {0}")
1611+
public void testInsertOnFoo1NoDimensionsWithLimit(String contextName, Map<String, Object> context)
1612+
{
1613+
Map<String, Object> queryContext = ImmutableMap.<String, Object>builder()
1614+
.putAll(context)
1615+
.put(MultiStageQueryContext.CTX_ROWS_PER_SEGMENT, 2)
1616+
.build();
1617+
1618+
List<Object[]> expectedRows = ImmutableList.of(new Object[]{DateTimes.utc(0L).getMillis(), 5L});
1619+
1620+
RowSignature rowSignature = RowSignature.builder()
1621+
.addTimeColumn()
1622+
.add("cnt", ColumnType.LONG)
1623+
.build();
1624+
1625+
testIngestQuery()
1626+
.setSql("insert into foo1 select count(*) cnt from foo where dim1 != '' limit 4 partitioned by all")
1627+
.setExpectedDataSource("foo1")
1628+
.setQueryContext(queryContext)
1629+
.setExpectedRowSignature(rowSignature)
1630+
.setExpectedSegments(ImmutableSet.of(SegmentId.of("foo1", Intervals.ETERNITY, "test", 0)))
1631+
.setExpectedResultRows(expectedRows)
1632+
.setExpectedMSQSegmentReport(
1633+
new MSQSegmentReport(
1634+
NumberedShardSpec.class.getSimpleName(),
1635+
"Using NumberedShardSpec to generate segments since the query is inserting rows."
1636+
)
1637+
)
1638+
.verifyResults();
1639+
}
1640+
16091641
@MethodSource("data")
16101642
@ParameterizedTest(name = "{index}:with context {0}")
16111643
public void testInsertOnRestricted(String contextName, Map<String, Object> context)

0 commit comments

Comments
 (0)