From 8507a003e9358db123e9414ac99d06deba017be7 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Thu, 19 Mar 2026 18:40:53 +0200 Subject: [PATCH 1/4] HIVE-27190: Implement col stats cache for hive iceberg table --- .../mr/hive/HiveIcebergStorageHandler.java | 7 ++ .../hive/ql/metadata/HiveStorageHandler.java | 10 +++ .../ql/optimizer/calcite/RelOptHiveTable.java | 67 +++++++++---------- .../ql/optimizer/ppr/PartitionPruner.java | 9 ++- .../hadoop/hive/ql/parse/ParseContext.java | 9 ++- .../hive/ql/parse/PrunedPartitionList.java | 10 ++- 6 files changed, 63 insertions(+), 49 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 65852f1a8553..dfd4941019cb 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -712,6 +712,13 @@ private boolean writeColStats(List colStats, Table tbl) { return false; } + @Override + public long getSnapshotId(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { + Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); + Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable); + return snapshot != null ? snapshot.snapshotId() : -1; + } + @Override public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 520c52a24a8f..88b2b868ad76 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -325,6 +325,16 @@ default boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table table, return false; } + /** + * Resolves time-travel context (snapshot ref, version, timestamp) to a canonical snapshot ID. + * Used to build cache keys that are consistent regardless of how a particular version was referenced. + * @param table table object with time-travel attributes set + * @return a snapshot ID, or -1 if not applicable + */ + default long getSnapshotId(org.apache.hadoop.hive.ql.metadata.Table table) { + return -1; + } + /** * Check if the storage handler can provide column statistics. * @param table table object diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java index b3e0249b778b..f44d89671955 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.plan.RelOptSchema; @@ -82,7 +81,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; public class RelOptHiveTable implements RelOptTable { @@ -125,7 +123,7 @@ public RelOptHiveTable(RelOptSchema calciteSchema, RelDataTypeFactory typeFactor this.schema = calciteSchema; this.typeFactory = typeFactory; this.qualifiedTblName = ImmutableList.copyOf(qualifiedTblName); - this.name = this.qualifiedTblName.stream().collect(Collectors.joining(".")); + this.name = String.join(".", this.qualifiedTblName); this.rowType = rowType; this.hiveTblMetadata = hiveTblMetadata; this.hiveColStatsMap = new HashMap<>(); @@ -192,15 +190,15 @@ public List getColumnStrategies() { public RelOptHiveTable copy(RelDataType newRowType) { // 1. Build map of column name to col index of original schema // Assumption: Hive Table can not contain duplicate column names - Map nameToColIndxMap = new HashMap(); + Map nameToColIndxMap = new HashMap<>(); for (RelDataTypeField f : this.rowType.getFieldList()) { nameToColIndxMap.put(f.getName(), f.getIndex()); } // 2. Build nonPart/Part/Virtual column info for new RowSchema - List newHiveNonPartitionCols = new ArrayList(); - List newHivePartitionCols = new ArrayList(); - List newHiveVirtualCols = new ArrayList(); + List newHiveNonPartitionCols = new ArrayList<>(); + List newHivePartitionCols = new ArrayList<>(); + List newHiveVirtualCols = new ArrayList<>(); Map virtualColInfoMap = HiveCalciteUtil.getVColsMap(this.hiveVirtualCols, this.noOfNonVirtualCols); Integer originalColIndx; @@ -329,8 +327,8 @@ private List generateReferentialConstraints() { ImmutableList.Builder builder = ImmutableList.builder(); if (foreignKeyInfo != null && !foreignKeyInfo.getForeignKeys().isEmpty()) { for (List fkCols : foreignKeyInfo.getForeignKeys().values()) { - String parentDatabaseName = fkCols.get(0).parentDatabaseName; - String parentTableName = fkCols.get(0).parentTableName; + String parentDatabaseName = fkCols.getFirst().parentDatabaseName; + String parentTableName = fkCols.getFirst().parentTableName; String qualifiedName; List parentTableQualifiedName = new ArrayList<>(); if (parentDatabaseName != null && !parentDatabaseName.isEmpty()) { @@ -390,7 +388,7 @@ public T unwrap(Class arg0) { @Override public List getCollationList() { - ImmutableList.Builder collationList = new ImmutableList.Builder(); + ImmutableList.Builder collationList = new ImmutableList.Builder<>(); for (Order sortColumn : this.hiveTblMetadata.getSortCols()) { for (int i=0; i getCollationList() { @Override public RelDistribution getDistribution() { - ImmutableList.Builder columnPositions = new ImmutableList.Builder(); + ImmutableList.Builder columnPositions = new ImmutableList.Builder<>(); for (String bucketColumn : this.hiveTblMetadata.getBucketCols()) { for (int i=0; i()); + computePartitionList(hiveConf, null, new HashSet<>()); } rowCount = StatsUtils.getNumRows(hiveConf, getNonPartColumns(), hiveTblMetadata, partitionList, noColsMissingStats); @@ -465,7 +463,7 @@ private String getColNamesForLogging(Set colLst) { public void computePartitionList(HiveConf conf, RexNode pruneNode, Set partOrVirtualCols) { try { if (!hiveTblMetadata.isPartitioned() || pruneNode == null - || InputFinder.bits(pruneNode).length() == 0) { + || InputFinder.bits(pruneNode).isEmpty()) { // there is no predicate on partitioning column, we need all partitions // in this case. partitionList = PartitionPruner.prune(hiveTblMetadata, null, conf, getName(), @@ -485,11 +483,11 @@ public void computePartitionList(HiveConf conf, RexNode pruneNode, Set } private void updateColStats(Set projIndxLst, boolean allowMissingStats) { - List nonPartColNamesThatRqrStats = new ArrayList(); - List nonPartColIndxsThatRqrStats = new ArrayList(); - List partColNamesThatRqrStats = new ArrayList(); - List partColIndxsThatRqrStats = new ArrayList(); - Set colNamesFailedStats = new HashSet(); + List nonPartColNamesThatRqrStats = new ArrayList<>(); + List nonPartColIndxsThatRqrStats = new ArrayList<>(); + List partColNamesThatRqrStats = new ArrayList<>(); + List partColIndxsThatRqrStats = new ArrayList<>(); + Set colNamesFailedStats = new HashSet<>(); // 1. Separate required columns to Non Partition and Partition Cols ColumnInfo tmp; @@ -514,19 +512,19 @@ private void updateColStats(Set projIndxLst, boolean allowMissingStats) if (null == partitionList) { // We could be here either because its an unpartitioned table or because // there are no pruning predicates on a partitioned table. - computePartitionList(hiveConf, null, new HashSet()); + computePartitionList(hiveConf, null, new HashSet<>()); } - String partitionListKey = partitionList.getKey().orElse(null); - ColumnStatsList colStatsCached = colStatsCache.get(partitionListKey); - if (colStatsCached == null) { - colStatsCached = new ColumnStatsList(); - colStatsCache.put(partitionListKey, colStatsCached); - } + String partitionListKey = partitionList.getKey(); + + ColumnStatsList colStatsCached = colStatsCache.computeIfAbsent( + partitionListKey, + k -> new ColumnStatsList() + ); // 2. Obtain Col Stats for Non Partition Cols - if (nonPartColNamesThatRqrStats.size() > 0) { - List hiveColStats = new ArrayList(); + if (!nonPartColNamesThatRqrStats.isEmpty()) { + List hiveColStats = new ArrayList<>(); if (!hiveTblMetadata.isPartitioned()) { // 2.1 Handle the case for unpartitioned table. @@ -547,9 +545,9 @@ private void updateColStats(Set projIndxLst, boolean allowMissingStats) if (hiveColStats.isEmpty()) { colNamesFailedStats.addAll(nonPartColNamesThatRqrStats); } else if (hiveColStats.size() != nonPartColNamesThatRqrStats.size()) { - Set setOfFiledCols = new HashSet(nonPartColNamesThatRqrStats); + Set setOfFiledCols = new HashSet<>(nonPartColNamesThatRqrStats); - Set setOfObtainedColStats = new HashSet(); + Set setOfObtainedColStats = new HashSet<>(); for (ColStatistics cs : hiveColStats) { setOfObtainedColStats.add(cs.getColumnName()); } @@ -561,7 +559,7 @@ private void updateColStats(Set projIndxLst, boolean allowMissingStats) // nonPartColNamesThatRqrStats. reorder hiveColStats so we can build hiveColStatsMap // using nonPartColIndxsThatRqrStats as below Map columnStatsMap = - new HashMap(hiveColStats.size()); + new HashMap<>(hiveColStats.size()); for (ColStatistics cs : hiveColStats) { columnStatsMap.put(cs.getColumnName(), cs); // even though the stats were estimated we need to warn user that @@ -586,7 +584,7 @@ private void updateColStats(Set projIndxLst, boolean allowMissingStats) if (partitionList.getNotDeniedPartns().isEmpty()) { // no need to make a metastore call rowCount = 0; - hiveColStats = new ArrayList(); + hiveColStats = new ArrayList<>(); for (int i = 0; i < nonPartColNamesThatRqrStats.size(); i++) { // add empty stats object for each column hiveColStats.add( @@ -594,14 +592,13 @@ private void updateColStats(Set projIndxLst, boolean allowMissingStats) nonPartColNamesThatRqrStats.get(i), hiveNonPartitionColsMap.get(nonPartColIndxsThatRqrStats.get(i)).getTypeName())); } - colNamesFailedStats.clear(); colStatsCached.updateState(State.COMPLETE); } else { Statistics stats = StatsUtils.collectStatistics(hiveConf, partitionList, hiveTblMetadata, hiveNonPartitionCols, nonPartColNamesThatRqrStats, colStatsCached, nonPartColNamesThatRqrStats, true); rowCount = stats.getNumRows(); - hiveColStats = new ArrayList(); + hiveColStats = new ArrayList<>(); for (String c : nonPartColNamesThatRqrStats) { ColStatistics cs = stats.getColumnStatisticsFromColName(c); if (cs != null) { @@ -622,7 +619,7 @@ private void updateColStats(Set projIndxLst, boolean allowMissingStats) } } - if (hiveColStats != null && hiveColStats.size() == nonPartColNamesThatRqrStats.size()) { + if (hiveColStats.size() == nonPartColNamesThatRqrStats.size()) { for (int i = 0; i < hiveColStats.size(); i++) { // the columns in nonPartColIndxsThatRqrStats/nonPartColNamesThatRqrStats/hiveColStats // are in same order @@ -754,7 +751,7 @@ public int hashCode() { } public String getPartitionListKey() { - return partitionList != null ? partitionList.getKey().orElse(null) : null; + return partitionList != null ? partitionList.getKey() : null; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java index b1bc9eaf0a75..f678f58c8835 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java @@ -184,8 +184,11 @@ public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr, String key = tab.getFullyQualifiedName() + ";"; if (tab.getMetaTable() != null) { key = tab.getFullyQualifiedName() + "." + tab.getMetaTable() + ";"; - } else if (tab.getSnapshotRef() != null) { - key = tab.getFullyQualifiedName() + "." + tab.getSnapshotRef() + ";"; + } else if (tab.isNonNative()) { + long snapshotId = tab.getStorageHandler().getSnapshotId(tab); + if (snapshotId > 0) { + key = tab.getFullyQualifiedName() + "." + snapshotId + ";"; + } } if (!tab.isPartitioned()) { @@ -441,7 +444,7 @@ static private boolean hasUserFunctions(ExprNodeDesc expr) { return false; } - private static PrunedPartitionList getPartitionsFromServer(Table tab, String key, ExprNodeDesc compactExpr, + private static PrunedPartitionList getPartitionsFromServer(Table tab, String key, ExprNodeDesc compactExpr, HiveConf conf, Set partColsUsedInFilter, boolean isPruningByExactFilter) throws SemanticException { try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index b2627bf3bf2b..6ea3b7cc1c75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -128,7 +128,7 @@ public class ParseContext { private boolean needViewColumnAuthorization; private Map rsToRuntimeValuesInfo = - new LinkedHashMap(); + new LinkedHashMap<>(); /** * Mapping holding information about semijoins. * @@ -451,7 +451,7 @@ public Map getColStatsCache() { * @return col stats */ public ColumnStatsList getColStatsCached(PrunedPartitionList partList) { - return ctx.getOpContext().getColStatsCache().get(partList.getKey().orElse(null)); + return ctx.getOpContext().getColStatsCache().get(partList.getKey()); } /** @@ -515,8 +515,7 @@ public Set getSemanticInputs() { return semanticInputs; } - public void replaceRootTask(Task rootTask, - List> tasks) { + public void replaceRootTask(Task rootTask, List> tasks) { this.rootTasks.remove(rootTask); this.rootTasks.addAll(tasks); } @@ -663,7 +662,7 @@ public void setColumnStatsAutoGatherContexts( public Collection getAllOps() { List ops = new ArrayList<>(); - Set visited = new HashSet(); + Set visited = new HashSet<>(); for (Operator op : getTopOps().values()) { getAllOps(ops, visited, op); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java index 398dbf555def..3329d4ae749d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java @@ -18,11 +18,9 @@ package org.apache.hadoop.hive.ql.parse; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Set; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -37,7 +35,7 @@ public class PrunedPartitionList { private final Table source; /** Key to identify this partition list. */ - private final Optional ppListKey; + private final String ppListKey; /** Partitions that either satisfy the partition criteria, or may satisfy it. */ private final Set partitions; @@ -56,7 +54,7 @@ public PrunedPartitionList(Table source, Set partitions, public PrunedPartitionList(Table source, String key, Set partitions, List referred, boolean hasUnknowns) { this.source = Objects.requireNonNull(source); - this.ppListKey = Optional.ofNullable(key); + this.ppListKey = key; this.referred = Objects.requireNonNull(referred); this.partitions = Objects.requireNonNull(partitions); this.hasUnknowns = hasUnknowns; @@ -66,7 +64,7 @@ public Table getSourceTable() { return source; } - public Optional getKey() { + public String getKey() { return ppListKey; } @@ -82,7 +80,7 @@ public Set getPartitions() { * @return all partitions. */ public List getNotDeniedPartns() { - return Collections.unmodifiableList(new ArrayList<>(partitions)); + return List.copyOf(partitions); } /** From 03435da0eee7fbd6e7164889ca5f8a1038352054 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Tue, 12 May 2026 17:17:03 +0300 Subject: [PATCH 2/4] refactor, drop SH API --- .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 7 ------- .../hadoop/hive/ql/metadata/HiveStorageHandler.java | 10 ---------- .../java/org/apache/hadoop/hive/ql/metadata/Table.java | 8 ++++++++ .../hadoop/hive/ql/optimizer/ppr/PartitionPruner.java | 10 +--------- 4 files changed, 9 insertions(+), 26 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index dfd4941019cb..65852f1a8553 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -712,13 +712,6 @@ private boolean writeColStats(List colStats, Table tbl) { return false; } - @Override - public long getSnapshotId(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { - Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable); - return snapshot != null ? snapshot.snapshotId() : -1; - } - @Override public boolean canProvideColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTable) { Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 88b2b868ad76..520c52a24a8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -325,16 +325,6 @@ default boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table table, return false; } - /** - * Resolves time-travel context (snapshot ref, version, timestamp) to a canonical snapshot ID. - * Used to build cache keys that are consistent regardless of how a particular version was referenced. - * @param table table object with time-travel attributes set - * @return a snapshot ID, or -1 if not applicable - */ - default long getSnapshotId(org.apache.hadoop.hive.ql.metadata.Table table) { - return -1; - } - /** * Check if the storage handler can provide column statistics. * @param table table object diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index f857e7d505f1..89caa9e64083 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -32,6 +32,7 @@ import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -1355,6 +1356,13 @@ public void setSnapshotRef(String snapshotRef) { this.snapshotRef = snapshotRef; } + public String getQualifier() { + return Stream.of(metaTable, snapshotRef, asOfVersion, asOfTimestamp) + .filter(Objects::nonNull).findFirst() + .map(s -> "." + s) + .orElse(""); + } + public SourceTable createSourceTable() { SourceTable sourceTable = new SourceTable(); sourceTable.setTable(this.tTable); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java index f678f58c8835..73b7e6356481 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java @@ -181,15 +181,7 @@ public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr, LOG.trace("prune Expression = " + (prunerExpr == null ? "" : prunerExpr)); } - String key = tab.getFullyQualifiedName() + ";"; - if (tab.getMetaTable() != null) { - key = tab.getFullyQualifiedName() + "." + tab.getMetaTable() + ";"; - } else if (tab.isNonNative()) { - long snapshotId = tab.getStorageHandler().getSnapshotId(tab); - if (snapshotId > 0) { - key = tab.getFullyQualifiedName() + "." + snapshotId + ";"; - } - } + String key = tab.getFullyQualifiedName() + tab.getQualifier() + ";"; if (!tab.isPartitioned()) { // If the table is not partitioned, return empty list. From 584159f19da35cb857d606d1ba5a1874a901ff62 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Tue, 12 May 2026 20:40:01 +0300 Subject: [PATCH 3/4] tests --- .../iceberg_partition_pruner_cache_key.q | 49 +++ .../iceberg_metadata_table_alias.q.out | 2 +- .../iceberg_partition_pruner_cache_key.q.out | 387 ++++++++++++++++++ .../apache/hadoop/hive/ql/metadata/Table.java | 1 - .../ql/optimizer/SharedWorkOptimizer.java | 9 +- .../ql/optimizer/ppr/PartitionPruner.java | 3 +- .../hadoop/hive/ql/parse/CalcitePlanner.java | 7 +- 7 files changed, 450 insertions(+), 8 deletions(-) create mode 100644 iceberg/iceberg-handler/src/test/queries/positive/iceberg_partition_pruner_cache_key.q create mode 100644 iceberg/iceberg-handler/src/test/results/positive/iceberg_partition_pruner_cache_key.q.out diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_partition_pruner_cache_key.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_partition_pruner_cache_key.q new file mode 100644 index 000000000000..7975df70267c --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_partition_pruner_cache_key.q @@ -0,0 +1,49 @@ +set hive.fetch.task.conversion=none; +set hive.explain.user=false; + +create external table tbl_ice_pp_key(a int, b string) stored by iceberg; + +insert into tbl_ice_pp_key values (1, 'one'), (2, 'two'); +alter table tbl_ice_pp_key create tag s1; + +insert into tbl_ice_pp_key values + (3, 'three'), (4, 'four'), (5, 'five'), + (6, 'six'), (7, 'seven'),(8, 'eight'), + (9, 'nine'), (10, 'ten'); + +explain select count(*) from tbl_ice_pp_key; +select count(*) from tbl_ice_pp_key; + +explain select count(*) from tbl_ice_pp_key for system_version as of 's1'; +select count(*) from tbl_ice_pp_key for system_version as of 's1'; + +explain +select cur.cnt as cur_cnt, snap.cnt as snap_cnt +from ( + select count(*) as cnt from tbl_ice_pp_key +) cur +cross join ( + select count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +) snap; + +select cur.cnt as cur_cnt, snap.cnt as snap_cnt +from ( + select count(*) as cnt from tbl_ice_pp_key +) cur +cross join ( + select count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +) snap; + +-- with a partition predicate +explain +select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 0 +union all +select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +where a > 0; + +select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 0 +union all +select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +where a > 0; + +drop table tbl_ice_pp_key; \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_metadata_table_alias.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_metadata_table_alias.q.out index b14d10551b0e..f574825d2bdd 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/iceberg_metadata_table_alias.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_metadata_table_alias.q.out @@ -23,7 +23,7 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@ice_t POSTHOOK: Output: hdfs://### HDFS PATH ### OPTIMIZED SQL: SELECT `committed_at` AS `ice_t.snapshots.committed_at`, `snapshot_id` AS `ice_t.snapshots.snapshot_id`, `parent_id` AS `ice_t.snapshots.parent_id`, `operation` AS `ice_t.snapshots.operation`, `manifest_list` AS `ice_t.snapshots.manifest_list`, `summary` AS `ice_t.snapshots.summary` -FROM `default`.`ice_t` +FROM `default`.`ice_t`.`snapshots` STAGE DEPENDENCIES: Stage-0 is a root stage diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_partition_pruner_cache_key.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_partition_pruner_cache_key.q.out new file mode 100644 index 000000000000..5c2d33b3d605 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_partition_pruner_cache_key.q.out @@ -0,0 +1,387 @@ +PREHOOK: query: create external table tbl_ice_pp_key(a int, b string) stored by iceberg +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl_ice_pp_key +POSTHOOK: query: create external table tbl_ice_pp_key(a int, b string) stored by iceberg +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl_ice_pp_key +PREHOOK: query: insert into tbl_ice_pp_key values (1, 'one'), (2, 'two') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl_ice_pp_key +POSTHOOK: query: insert into tbl_ice_pp_key values (1, 'one'), (2, 'two') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl_ice_pp_key +PREHOOK: query: alter table tbl_ice_pp_key create tag s1 +PREHOOK: type: ALTERTABLE_CREATETAG +PREHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: query: alter table tbl_ice_pp_key create tag s1 +POSTHOOK: type: ALTERTABLE_CREATETAG +POSTHOOK: Input: default@tbl_ice_pp_key +PREHOOK: query: insert into tbl_ice_pp_key values + (3, 'three'), (4, 'four'), (5, 'five'), + (6, 'six'), (7, 'seven'),(8, 'eight'), + (9, 'nine'), (10, 'ten') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl_ice_pp_key +POSTHOOK: query: insert into tbl_ice_pp_key values + (3, 'three'), (4, 'four'), (5, 'five'), + (6, 'six'), (7, 'seven'),(8, 'eight'), + (9, 'nine'), (10, 'ten') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl_ice_pp_key +PREHOOK: query: explain select count(*) from tbl_ice_pp_key +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select count(*) from tbl_ice_pp_key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: 1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from tbl_ice_pp_key +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) from tbl_ice_pp_key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: explain select count(*) from tbl_ice_pp_key for system_version as of 's1' +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select count(*) from tbl_ice_pp_key for system_version as of 's1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: 1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from tbl_ice_pp_key for system_version as of 's1' +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) from tbl_ice_pp_key for system_version as of 's1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +2 +Warning: Shuffle Join MERGEJOIN[17][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 3' is a cross product +PREHOOK: query: explain +select cur.cnt as cur_cnt, snap.cnt as snap_cnt +from ( + select count(*) as cnt from tbl_ice_pp_key +) cur +cross join ( + select count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +) snap +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain +select cur.cnt as cur_cnt, snap.cnt as snap_cnt +from ( + select count(*) as cnt from tbl_ice_pp_key +) cur +cross join ( + select count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +) snap +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (XPROD_EDGE), Reducer 5 (XPROD_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: tbl_ice_pp_key + Statistics: Num rows: 10 Data size: 13420 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 10 Data size: 13420 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + minReductionHashAggr: 0.9 + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Map 4 + Map Operator Tree: + TableScan + alias: tbl_ice_pp_key + As of version: s1 + Statistics: Num rows: 2 Data size: 6310 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 2 Data size: 6310 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + minReductionHashAggr: 0.5 + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Reducer 2 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reducer 3 + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 + 1 + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +Warning: Shuffle Join MERGEJOIN[17][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 3' is a cross product +PREHOOK: query: select cur.cnt as cur_cnt, snap.cnt as snap_cnt +from ( + select count(*) as cnt from tbl_ice_pp_key +) cur +cross join ( + select count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +) snap +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select cur.cnt as cur_cnt, snap.cnt as snap_cnt +from ( + select count(*) as cnt from tbl_ice_pp_key +) cur +cross join ( + select count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +) snap +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 2 +PREHOOK: query: explain +select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 0 +union all +select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +where a > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain +select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 0 +union all +select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +where a > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Union 3 (CONTAINS) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE), Union 3 (CONTAINS) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: tbl_ice_pp_key + filterExpr: (a > 0) (type: boolean) + Statistics: Num rows: 10 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (a > 0) (type: boolean) + Statistics: Num rows: 10 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 10 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + minReductionHashAggr: 0.9 + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Map 4 + Map Operator Tree: + TableScan + alias: tbl_ice_pp_key + As of version: s1 + filterExpr: (a > 0) (type: boolean) + Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: (a > 0) (type: boolean) + Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + minReductionHashAggr: 0.5 + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Reducer 2 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: 'current' (type: string), _col0 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 99 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 198 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: 'asof_s1' (type: string), _col0 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 99 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 198 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Union 3 + Vertex: Union 3 + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 0 +union all +select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +where a > 0 +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 0 +union all +select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +where a > 0 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +current 10 +asof_s1 2 +PREHOOK: query: drop table tbl_ice_pp_key +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl_ice_pp_key +POSTHOOK: query: drop table tbl_ice_pp_key +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl_ice_pp_key diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index 89caa9e64083..05faa38d7caf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -1359,7 +1359,6 @@ public void setSnapshotRef(String snapshotRef) { public String getQualifier() { return Stream.of(metaTable, snapshotRef, asOfVersion, asOfTimestamp) .filter(Objects::nonNull).findFirst() - .map(s -> "." + s) .orElse(""); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index 5a038b4e7182..3c01817033b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -646,9 +646,12 @@ protected boolean areMergeable(ParseContext pctx, TableScanOperator tsOp1, Table return false; } - // HIVE-29509: Include snapshotRef to ensure different Iceberg branches/tags are treated as distinct tables - if (!Objects.equals(tsOp1.getConf().getSnapshotRef(), tsOp2.getConf().getSnapshotRef())) { - LOG.debug("Snapshot Ref differ {} ~ {}", tsOp1.getConf().getSnapshotRef(), tsOp2.getConf().getSnapshotRef()); + // Time-travel qualifier (snapshotRef, asOfVersion, asOfTimestamp) must match, + // otherwise the two scans address different snapshots of the same table. + String qualifier1 = tsOp1.getConf().getTableMetadata().getQualifier(); + String qualifier2 = tsOp2.getConf().getTableMetadata().getQualifier(); + if (!Objects.equals(qualifier1, qualifier2)) { + LOG.debug("Qualifier differ {} ~ {}", qualifier1, qualifier2); return false; } // If partitions do not match, we currently do not merge diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java index 73b7e6356481..7574ad5f6d24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java @@ -181,7 +181,8 @@ public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr, LOG.trace("prune Expression = " + (prunerExpr == null ? "" : prunerExpr)); } - String key = tab.getFullyQualifiedName() + tab.getQualifier() + ";"; + String qualifier = tab.getQualifier(); + String key = tab.getFullyQualifiedName() + (qualifier.isEmpty() ? "" : "." + qualifier) + ";"; if (!tab.isPartitioned()) { // If the table is not partitioned, return empty list. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index d5c683daa303..ffd0e5756976 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -3220,8 +3220,11 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc fullyQualifiedTabName.add(tabMetaData.getDbName()); } fullyQualifiedTabName.add(tabMetaData.getTableName()); - if (tabMetaData.getSnapshotRef() != null) { - fullyQualifiedTabName.add(tabMetaData.getSnapshotRef()); + // Include time-travel qualifier (snapshotRef / asOfVersion / asOfTimestamp) + // in the table identity so two scans at different snapshots stay distinct. + String qualifier = tabMetaData.getQualifier(); + if (!qualifier.isEmpty()) { + fullyQualifiedTabName.add(qualifier); } optTable = new RelOptHiveTable(relOptSchema, relOptSchema.getTypeFactory(), fullyQualifiedTabName, rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, From 859facf1e5d41c64c19842f2211021c023d563a9 Mon Sep 17 00:00:00 2001 From: Denys Kuzmenko Date: Mon, 25 May 2026 12:33:32 +0300 Subject: [PATCH 4/4] review comments #1 --- .../iceberg_partition_pruner_cache_key.q | 11 +-- .../iceberg_partition_pruner_cache_key.q.out | 89 +++++++++---------- .../ql/optimizer/SharedWorkOptimizer.java | 3 +- 3 files changed, 52 insertions(+), 51 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_partition_pruner_cache_key.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_partition_pruner_cache_key.q index 7975df70267c..4ec645616b13 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_partition_pruner_cache_key.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_partition_pruner_cache_key.q @@ -1,7 +1,8 @@ set hive.fetch.task.conversion=none; set hive.explain.user=false; -create external table tbl_ice_pp_key(a int, b string) stored by iceberg; +create external table tbl_ice_pp_key(a int, b string) + partitioned by spec (a) stored by iceberg; insert into tbl_ice_pp_key values (1, 'one'), (2, 'two'); alter table tbl_ice_pp_key create tag s1; @@ -36,14 +37,14 @@ cross join ( -- with a partition predicate explain -select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 0 +select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 2 union all select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' -where a > 0; +where a > 2; -select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 0 +select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 2 union all select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' -where a > 0; +where a > 2; drop table tbl_ice_pp_key; \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_partition_pruner_cache_key.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_partition_pruner_cache_key.q.out index 5c2d33b3d605..b87b1a62286b 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/iceberg_partition_pruner_cache_key.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_partition_pruner_cache_key.q.out @@ -1,8 +1,10 @@ -PREHOOK: query: create external table tbl_ice_pp_key(a int, b string) stored by iceberg +PREHOOK: query: create external table tbl_ice_pp_key(a int, b string) + partitioned by spec (a) stored by iceberg PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@tbl_ice_pp_key -POSTHOOK: query: create external table tbl_ice_pp_key(a int, b string) stored by iceberg +POSTHOOK: query: create external table tbl_ice_pp_key(a int, b string) + partitioned by spec (a) stored by iceberg POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@tbl_ice_pp_key @@ -129,9 +131,9 @@ STAGE PLANS: Map Operator Tree: TableScan alias: tbl_ice_pp_key - Statistics: Num rows: 10 Data size: 13420 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 10 Data size: 63810 Basic stats: COMPLETE Column stats: COMPLETE Select Operator - Statistics: Num rows: 10 Data size: 13420 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 10 Data size: 63810 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() minReductionHashAggr: 0.9 @@ -149,9 +151,9 @@ STAGE PLANS: TableScan alias: tbl_ice_pp_key As of version: s1 - Statistics: Num rows: 2 Data size: 6310 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 12636 Basic stats: COMPLETE Column stats: COMPLETE Select Operator - Statistics: Num rows: 2 Data size: 6310 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 12636 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() minReductionHashAggr: 0.5 @@ -237,18 +239,18 @@ POSTHOOK: Input: default@tbl_ice_pp_key POSTHOOK: Output: hdfs://### HDFS PATH ### 10 2 PREHOOK: query: explain -select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 0 +select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 2 union all select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' -where a > 0 +where a > 2 PREHOOK: type: QUERY PREHOOK: Input: default@tbl_ice_pp_key PREHOOK: Output: hdfs://### HDFS PATH ### POSTHOOK: query: explain -select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 0 +select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 2 union all select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' -where a > 0 +where a > 2 POSTHOOK: type: QUERY POSTHOOK: Input: default@tbl_ice_pp_key POSTHOOK: Output: hdfs://### HDFS PATH ### @@ -269,47 +271,44 @@ STAGE PLANS: Map Operator Tree: TableScan alias: tbl_ice_pp_key - filterExpr: (a > 0) (type: boolean) - Statistics: Num rows: 10 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE - Filter Operator - predicate: (a > 0) (type: boolean) - Statistics: Num rows: 10 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - Statistics: Num rows: 10 Data size: 40 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - aggregations: count() - minReductionHashAggr: 0.9 - mode: hash - outputColumnNames: _col0 + filterExpr: (a > 2) (type: boolean) + Statistics: Num rows: 8 Data size: 51174 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 8 Data size: 51174 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + minReductionHashAggr: 0.875 + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - null sort order: - sort order: - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: bigint) + value expressions: _col0 (type: bigint) Execution mode: vectorized Map 4 Map Operator Tree: TableScan alias: tbl_ice_pp_key As of version: s1 - filterExpr: (a > 0) (type: boolean) - Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + filterExpr: (a > 2) (type: boolean) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE Filter Operator - predicate: (a > 0) (type: boolean) - Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + predicate: (a > 2) (type: boolean) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE Select Operator - Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE Group By Operator aggregations: count() - minReductionHashAggr: 0.5 + minReductionHashAggr: 0.99 mode: hash outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 8 Basic stats: PARTIAL Column stats: COMPLETE Reduce Output Operator null sort order: sort order: - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 8 Basic stats: PARTIAL Column stats: COMPLETE value expressions: _col0 (type: bigint) Execution mode: vectorized Reducer 2 @@ -326,7 +325,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 99 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false - Statistics: Num rows: 2 Data size: 198 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 107 Basic stats: PARTIAL Column stats: COMPLETE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -338,14 +337,14 @@ STAGE PLANS: aggregations: count(VALUE._col0) mode: mergepartial outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 8 Basic stats: PARTIAL Column stats: COMPLETE Select Operator expressions: 'asof_s1' (type: string), _col0 (type: bigint) outputColumnNames: _col0, _col1 - Statistics: Num rows: 1 Data size: 99 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 1 Data size: 8 Basic stats: PARTIAL Column stats: COMPLETE File Output Operator compressed: false - Statistics: Num rows: 2 Data size: 198 Basic stats: COMPLETE Column stats: COMPLETE + Statistics: Num rows: 2 Data size: 107 Basic stats: PARTIAL Column stats: COMPLETE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat @@ -359,22 +358,22 @@ STAGE PLANS: Processor Tree: ListSink -PREHOOK: query: select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 0 +PREHOOK: query: select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 2 union all select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' -where a > 0 +where a > 2 PREHOOK: type: QUERY PREHOOK: Input: default@tbl_ice_pp_key PREHOOK: Output: hdfs://### HDFS PATH ### -POSTHOOK: query: select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 0 +POSTHOOK: query: select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 2 union all select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' -where a > 0 +where a > 2 POSTHOOK: type: QUERY POSTHOOK: Input: default@tbl_ice_pp_key POSTHOOK: Output: hdfs://### HDFS PATH ### -current 10 -asof_s1 2 +current 8 +asof_s1 0 PREHOOK: query: drop table tbl_ice_pp_key PREHOOK: type: DROPTABLE PREHOOK: Input: default@tbl_ice_pp_key diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index 3c01817033b5..26a5a85b174e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -651,7 +651,7 @@ protected boolean areMergeable(ParseContext pctx, TableScanOperator tsOp1, Table String qualifier1 = tsOp1.getConf().getTableMetadata().getQualifier(); String qualifier2 = tsOp2.getConf().getTableMetadata().getQualifier(); if (!Objects.equals(qualifier1, qualifier2)) { - LOG.debug("Qualifier differ {} ~ {}", qualifier1, qualifier2); + LOG.debug("Qualifiers differ {} ~ {}", qualifier1, qualifier2); return false; } // If partitions do not match, we currently do not merge @@ -1864,6 +1864,7 @@ private static boolean compareOperator(ParseContext pctx, Operator op1, Opera Table tableMeta1 = op1Conf.getTableMetadata(); Table tableMeta2 = op2Conf.getTableMetadata(); if (StringUtils.equals(tableMeta1.getFullyQualifiedName(), tableMeta2.getFullyQualifiedName()) + && StringUtils.equals(tableMeta1.getQualifier(), tableMeta2.getQualifier()) && op1Conf.getNeededColumns().equals(op2Conf.getNeededColumns()) && StringUtils.equals(op1Conf.getFilterExprString(), op2Conf.getFilterExprString()) && pctx.getPrunedPartitions(tsOp1).getPartitions().equals(