diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_partition_pruner_cache_key.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_partition_pruner_cache_key.q new file mode 100644 index 000000000000..4ec645616b13 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_partition_pruner_cache_key.q @@ -0,0 +1,50 @@ +set hive.fetch.task.conversion=none; +set hive.explain.user=false; + +create external table tbl_ice_pp_key(a int, b string) + partitioned by spec (a) stored by iceberg; + +insert into tbl_ice_pp_key values (1, 'one'), (2, 'two'); +alter table tbl_ice_pp_key create tag s1; + +insert into tbl_ice_pp_key values + (3, 'three'), (4, 'four'), (5, 'five'), + (6, 'six'), (7, 'seven'),(8, 'eight'), + (9, 'nine'), (10, 'ten'); + +explain select count(*) from tbl_ice_pp_key; +select count(*) from tbl_ice_pp_key; + +explain select count(*) from tbl_ice_pp_key for system_version as of 's1'; +select count(*) from tbl_ice_pp_key for system_version as of 's1'; + +explain +select cur.cnt as cur_cnt, snap.cnt as snap_cnt +from ( + select count(*) as cnt from tbl_ice_pp_key +) cur +cross join ( + select count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +) snap; + +select cur.cnt as cur_cnt, snap.cnt as snap_cnt +from ( + select count(*) as cnt from tbl_ice_pp_key +) cur +cross join ( + select count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +) snap; + +-- with a partition predicate +explain +select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 2 +union all +select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +where a > 2; + +select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 2 +union all +select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +where a > 2; + +drop table tbl_ice_pp_key; \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_metadata_table_alias.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_metadata_table_alias.q.out index b14d10551b0e..f574825d2bdd 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/iceberg_metadata_table_alias.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_metadata_table_alias.q.out @@ -23,7 +23,7 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@ice_t POSTHOOK: Output: hdfs://### HDFS PATH ### OPTIMIZED SQL: SELECT `committed_at` AS `ice_t.snapshots.committed_at`, `snapshot_id` AS `ice_t.snapshots.snapshot_id`, `parent_id` AS `ice_t.snapshots.parent_id`, `operation` AS `ice_t.snapshots.operation`, `manifest_list` AS `ice_t.snapshots.manifest_list`, `summary` AS `ice_t.snapshots.summary` -FROM `default`.`ice_t` +FROM `default`.`ice_t`.`snapshots` STAGE DEPENDENCIES: Stage-0 is a root stage diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_partition_pruner_cache_key.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_partition_pruner_cache_key.q.out new file mode 100644 index 000000000000..b87b1a62286b --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_partition_pruner_cache_key.q.out @@ -0,0 +1,386 @@ +PREHOOK: query: create external table tbl_ice_pp_key(a int, b string) + partitioned by spec (a) stored by iceberg +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl_ice_pp_key +POSTHOOK: query: create external table tbl_ice_pp_key(a int, b string) + partitioned by spec (a) stored by iceberg +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl_ice_pp_key +PREHOOK: query: insert into tbl_ice_pp_key values (1, 'one'), (2, 'two') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl_ice_pp_key +POSTHOOK: query: insert into tbl_ice_pp_key values (1, 'one'), (2, 'two') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl_ice_pp_key +PREHOOK: query: alter table tbl_ice_pp_key create tag s1 +PREHOOK: type: ALTERTABLE_CREATETAG +PREHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: query: alter table tbl_ice_pp_key create tag s1 +POSTHOOK: type: ALTERTABLE_CREATETAG +POSTHOOK: Input: default@tbl_ice_pp_key +PREHOOK: query: insert into tbl_ice_pp_key values + (3, 'three'), (4, 'four'), (5, 'five'), + (6, 'six'), (7, 'seven'),(8, 'eight'), + (9, 'nine'), (10, 'ten') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl_ice_pp_key +POSTHOOK: query: insert into tbl_ice_pp_key values + (3, 'three'), (4, 'four'), (5, 'five'), + (6, 'six'), (7, 'seven'),(8, 'eight'), + (9, 'nine'), (10, 'ten') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl_ice_pp_key +PREHOOK: query: explain select count(*) from tbl_ice_pp_key +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select count(*) from tbl_ice_pp_key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: 1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from tbl_ice_pp_key +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) from tbl_ice_pp_key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 +PREHOOK: query: explain select count(*) from tbl_ice_pp_key for system_version as of 's1' +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select count(*) from tbl_ice_pp_key for system_version as of 's1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: 1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from tbl_ice_pp_key for system_version as of 's1' +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) from tbl_ice_pp_key for system_version as of 's1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +2 +Warning: Shuffle Join MERGEJOIN[17][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 3' is a cross product +PREHOOK: query: explain +select cur.cnt as cur_cnt, snap.cnt as snap_cnt +from ( + select count(*) as cnt from tbl_ice_pp_key +) cur +cross join ( + select count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +) snap +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain +select cur.cnt as cur_cnt, snap.cnt as snap_cnt +from ( + select count(*) as cnt from tbl_ice_pp_key +) cur +cross join ( + select count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +) snap +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE) + Reducer 3 <- Reducer 2 (XPROD_EDGE), Reducer 5 (XPROD_EDGE) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: tbl_ice_pp_key + Statistics: Num rows: 10 Data size: 63810 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 10 Data size: 63810 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + minReductionHashAggr: 0.9 + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Map 4 + Map Operator Tree: + TableScan + alias: tbl_ice_pp_key + As of version: s1 + Statistics: Num rows: 2 Data size: 12636 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 2 Data size: 12636 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + minReductionHashAggr: 0.5 + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Reducer 2 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Reducer 3 + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 + 1 + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +Warning: Shuffle Join MERGEJOIN[17][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 3' is a cross product +PREHOOK: query: select cur.cnt as cur_cnt, snap.cnt as snap_cnt +from ( + select count(*) as cnt from tbl_ice_pp_key +) cur +cross join ( + select count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +) snap +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select cur.cnt as cur_cnt, snap.cnt as snap_cnt +from ( + select count(*) as cnt from tbl_ice_pp_key +) cur +cross join ( + select count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +) snap +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +10 2 +PREHOOK: query: explain +select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 2 +union all +select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +where a > 2 +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain +select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 2 +union all +select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +where a > 2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Union 3 (CONTAINS) + Reducer 5 <- Map 4 (CUSTOM_SIMPLE_EDGE), Union 3 (CONTAINS) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: tbl_ice_pp_key + filterExpr: (a > 2) (type: boolean) + Statistics: Num rows: 8 Data size: 51174 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + Statistics: Num rows: 8 Data size: 51174 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: count() + minReductionHashAggr: 0.875 + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Map 4 + Map Operator Tree: + TableScan + alias: tbl_ice_pp_key + As of version: s1 + filterExpr: (a > 2) (type: boolean) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Filter Operator + predicate: (a > 2) (type: boolean) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Select Operator + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE + Group By Operator + aggregations: count() + minReductionHashAggr: 0.99 + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: PARTIAL Column stats: COMPLETE + Reduce Output Operator + null sort order: + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: PARTIAL Column stats: COMPLETE + value expressions: _col0 (type: bigint) + Execution mode: vectorized + Reducer 2 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: 'current' (type: string), _col0 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 99 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 107 Basic stats: PARTIAL Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Reducer 5 + Execution mode: vectorized + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: PARTIAL Column stats: COMPLETE + Select Operator + expressions: 'asof_s1' (type: string), _col0 (type: bigint) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 1 Data size: 8 Basic stats: PARTIAL Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 2 Data size: 107 Basic stats: PARTIAL Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Union 3 + Vertex: Union 3 + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 2 +union all +select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +where a > 2 +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select 'current' as ver, count(*) as cnt from tbl_ice_pp_key where a > 2 +union all +select 'asof_s1' as ver, count(*) as cnt from tbl_ice_pp_key for system_version as of 's1' +where a > 2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: hdfs://### HDFS PATH ### +current 8 +asof_s1 0 +PREHOOK: query: drop table tbl_ice_pp_key +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tbl_ice_pp_key +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl_ice_pp_key +POSTHOOK: query: drop table tbl_ice_pp_key +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tbl_ice_pp_key +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl_ice_pp_key diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index f857e7d505f1..05faa38d7caf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -32,6 +32,7 @@ import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -1355,6 +1356,12 @@ public void setSnapshotRef(String snapshotRef) { this.snapshotRef = snapshotRef; } + public String getQualifier() { + return Stream.of(metaTable, snapshotRef, asOfVersion, asOfTimestamp) + .filter(Objects::nonNull).findFirst() + .orElse(""); + } + public SourceTable createSourceTable() { SourceTable sourceTable = new SourceTable(); sourceTable.setTable(this.tTable); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java index 5a038b4e7182..26a5a85b174e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java @@ -646,9 +646,12 @@ protected boolean areMergeable(ParseContext pctx, TableScanOperator tsOp1, Table return false; } - // HIVE-29509: Include snapshotRef to ensure different Iceberg branches/tags are treated as distinct tables - if (!Objects.equals(tsOp1.getConf().getSnapshotRef(), tsOp2.getConf().getSnapshotRef())) { - LOG.debug("Snapshot Ref differ {} ~ {}", tsOp1.getConf().getSnapshotRef(), tsOp2.getConf().getSnapshotRef()); + // Time-travel qualifier (snapshotRef, asOfVersion, asOfTimestamp) must match, + // otherwise the two scans address different snapshots of the same table. + String qualifier1 = tsOp1.getConf().getTableMetadata().getQualifier(); + String qualifier2 = tsOp2.getConf().getTableMetadata().getQualifier(); + if (!Objects.equals(qualifier1, qualifier2)) { + LOG.debug("Qualifiers differ {} ~ {}", qualifier1, qualifier2); return false; } // If partitions do not match, we currently do not merge @@ -1861,6 +1864,7 @@ private static boolean compareOperator(ParseContext pctx, Operator op1, Opera Table tableMeta1 = op1Conf.getTableMetadata(); Table tableMeta2 = op2Conf.getTableMetadata(); if (StringUtils.equals(tableMeta1.getFullyQualifiedName(), tableMeta2.getFullyQualifiedName()) + && StringUtils.equals(tableMeta1.getQualifier(), tableMeta2.getQualifier()) && op1Conf.getNeededColumns().equals(op2Conf.getNeededColumns()) && StringUtils.equals(op1Conf.getFilterExprString(), op2Conf.getFilterExprString()) && pctx.getPrunedPartitions(tsOp1).getPartitions().equals( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java index b3e0249b778b..f44d89671955 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/RelOptHiveTable.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.plan.RelOptSchema; @@ -82,7 +81,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; public class RelOptHiveTable implements RelOptTable { @@ -125,7 +123,7 @@ public RelOptHiveTable(RelOptSchema calciteSchema, RelDataTypeFactory typeFactor this.schema = calciteSchema; this.typeFactory = typeFactory; this.qualifiedTblName = ImmutableList.copyOf(qualifiedTblName); - this.name = this.qualifiedTblName.stream().collect(Collectors.joining(".")); + this.name = String.join(".", this.qualifiedTblName); this.rowType = rowType; this.hiveTblMetadata = hiveTblMetadata; this.hiveColStatsMap = new HashMap<>(); @@ -192,15 +190,15 @@ public List getColumnStrategies() { public RelOptHiveTable copy(RelDataType newRowType) { // 1. Build map of column name to col index of original schema // Assumption: Hive Table can not contain duplicate column names - Map nameToColIndxMap = new HashMap(); + Map nameToColIndxMap = new HashMap<>(); for (RelDataTypeField f : this.rowType.getFieldList()) { nameToColIndxMap.put(f.getName(), f.getIndex()); } // 2. Build nonPart/Part/Virtual column info for new RowSchema - List newHiveNonPartitionCols = new ArrayList(); - List newHivePartitionCols = new ArrayList(); - List newHiveVirtualCols = new ArrayList(); + List newHiveNonPartitionCols = new ArrayList<>(); + List newHivePartitionCols = new ArrayList<>(); + List newHiveVirtualCols = new ArrayList<>(); Map virtualColInfoMap = HiveCalciteUtil.getVColsMap(this.hiveVirtualCols, this.noOfNonVirtualCols); Integer originalColIndx; @@ -329,8 +327,8 @@ private List generateReferentialConstraints() { ImmutableList.Builder builder = ImmutableList.builder(); if (foreignKeyInfo != null && !foreignKeyInfo.getForeignKeys().isEmpty()) { for (List fkCols : foreignKeyInfo.getForeignKeys().values()) { - String parentDatabaseName = fkCols.get(0).parentDatabaseName; - String parentTableName = fkCols.get(0).parentTableName; + String parentDatabaseName = fkCols.getFirst().parentDatabaseName; + String parentTableName = fkCols.getFirst().parentTableName; String qualifiedName; List parentTableQualifiedName = new ArrayList<>(); if (parentDatabaseName != null && !parentDatabaseName.isEmpty()) { @@ -390,7 +388,7 @@ public T unwrap(Class arg0) { @Override public List getCollationList() { - ImmutableList.Builder collationList = new ImmutableList.Builder(); + ImmutableList.Builder collationList = new ImmutableList.Builder<>(); for (Order sortColumn : this.hiveTblMetadata.getSortCols()) { for (int i=0; i getCollationList() { @Override public RelDistribution getDistribution() { - ImmutableList.Builder columnPositions = new ImmutableList.Builder(); + ImmutableList.Builder columnPositions = new ImmutableList.Builder<>(); for (String bucketColumn : this.hiveTblMetadata.getBucketCols()) { for (int i=0; i()); + computePartitionList(hiveConf, null, new HashSet<>()); } rowCount = StatsUtils.getNumRows(hiveConf, getNonPartColumns(), hiveTblMetadata, partitionList, noColsMissingStats); @@ -465,7 +463,7 @@ private String getColNamesForLogging(Set colLst) { public void computePartitionList(HiveConf conf, RexNode pruneNode, Set partOrVirtualCols) { try { if (!hiveTblMetadata.isPartitioned() || pruneNode == null - || InputFinder.bits(pruneNode).length() == 0) { + || InputFinder.bits(pruneNode).isEmpty()) { // there is no predicate on partitioning column, we need all partitions // in this case. partitionList = PartitionPruner.prune(hiveTblMetadata, null, conf, getName(), @@ -485,11 +483,11 @@ public void computePartitionList(HiveConf conf, RexNode pruneNode, Set } private void updateColStats(Set projIndxLst, boolean allowMissingStats) { - List nonPartColNamesThatRqrStats = new ArrayList(); - List nonPartColIndxsThatRqrStats = new ArrayList(); - List partColNamesThatRqrStats = new ArrayList(); - List partColIndxsThatRqrStats = new ArrayList(); - Set colNamesFailedStats = new HashSet(); + List nonPartColNamesThatRqrStats = new ArrayList<>(); + List nonPartColIndxsThatRqrStats = new ArrayList<>(); + List partColNamesThatRqrStats = new ArrayList<>(); + List partColIndxsThatRqrStats = new ArrayList<>(); + Set colNamesFailedStats = new HashSet<>(); // 1. Separate required columns to Non Partition and Partition Cols ColumnInfo tmp; @@ -514,19 +512,19 @@ private void updateColStats(Set projIndxLst, boolean allowMissingStats) if (null == partitionList) { // We could be here either because its an unpartitioned table or because // there are no pruning predicates on a partitioned table. - computePartitionList(hiveConf, null, new HashSet()); + computePartitionList(hiveConf, null, new HashSet<>()); } - String partitionListKey = partitionList.getKey().orElse(null); - ColumnStatsList colStatsCached = colStatsCache.get(partitionListKey); - if (colStatsCached == null) { - colStatsCached = new ColumnStatsList(); - colStatsCache.put(partitionListKey, colStatsCached); - } + String partitionListKey = partitionList.getKey(); + + ColumnStatsList colStatsCached = colStatsCache.computeIfAbsent( + partitionListKey, + k -> new ColumnStatsList() + ); // 2. Obtain Col Stats for Non Partition Cols - if (nonPartColNamesThatRqrStats.size() > 0) { - List hiveColStats = new ArrayList(); + if (!nonPartColNamesThatRqrStats.isEmpty()) { + List hiveColStats = new ArrayList<>(); if (!hiveTblMetadata.isPartitioned()) { // 2.1 Handle the case for unpartitioned table. @@ -547,9 +545,9 @@ private void updateColStats(Set projIndxLst, boolean allowMissingStats) if (hiveColStats.isEmpty()) { colNamesFailedStats.addAll(nonPartColNamesThatRqrStats); } else if (hiveColStats.size() != nonPartColNamesThatRqrStats.size()) { - Set setOfFiledCols = new HashSet(nonPartColNamesThatRqrStats); + Set setOfFiledCols = new HashSet<>(nonPartColNamesThatRqrStats); - Set setOfObtainedColStats = new HashSet(); + Set setOfObtainedColStats = new HashSet<>(); for (ColStatistics cs : hiveColStats) { setOfObtainedColStats.add(cs.getColumnName()); } @@ -561,7 +559,7 @@ private void updateColStats(Set projIndxLst, boolean allowMissingStats) // nonPartColNamesThatRqrStats. reorder hiveColStats so we can build hiveColStatsMap // using nonPartColIndxsThatRqrStats as below Map columnStatsMap = - new HashMap(hiveColStats.size()); + new HashMap<>(hiveColStats.size()); for (ColStatistics cs : hiveColStats) { columnStatsMap.put(cs.getColumnName(), cs); // even though the stats were estimated we need to warn user that @@ -586,7 +584,7 @@ private void updateColStats(Set projIndxLst, boolean allowMissingStats) if (partitionList.getNotDeniedPartns().isEmpty()) { // no need to make a metastore call rowCount = 0; - hiveColStats = new ArrayList(); + hiveColStats = new ArrayList<>(); for (int i = 0; i < nonPartColNamesThatRqrStats.size(); i++) { // add empty stats object for each column hiveColStats.add( @@ -594,14 +592,13 @@ private void updateColStats(Set projIndxLst, boolean allowMissingStats) nonPartColNamesThatRqrStats.get(i), hiveNonPartitionColsMap.get(nonPartColIndxsThatRqrStats.get(i)).getTypeName())); } - colNamesFailedStats.clear(); colStatsCached.updateState(State.COMPLETE); } else { Statistics stats = StatsUtils.collectStatistics(hiveConf, partitionList, hiveTblMetadata, hiveNonPartitionCols, nonPartColNamesThatRqrStats, colStatsCached, nonPartColNamesThatRqrStats, true); rowCount = stats.getNumRows(); - hiveColStats = new ArrayList(); + hiveColStats = new ArrayList<>(); for (String c : nonPartColNamesThatRqrStats) { ColStatistics cs = stats.getColumnStatisticsFromColName(c); if (cs != null) { @@ -622,7 +619,7 @@ private void updateColStats(Set projIndxLst, boolean allowMissingStats) } } - if (hiveColStats != null && hiveColStats.size() == nonPartColNamesThatRqrStats.size()) { + if (hiveColStats.size() == nonPartColNamesThatRqrStats.size()) { for (int i = 0; i < hiveColStats.size(); i++) { // the columns in nonPartColIndxsThatRqrStats/nonPartColNamesThatRqrStats/hiveColStats // are in same order @@ -754,7 +751,7 @@ public int hashCode() { } public String getPartitionListKey() { - return partitionList != null ? partitionList.getKey().orElse(null) : null; + return partitionList != null ? partitionList.getKey() : null; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java index b1bc9eaf0a75..7574ad5f6d24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java @@ -181,12 +181,8 @@ public static PrunedPartitionList prune(Table tab, ExprNodeDesc prunerExpr, LOG.trace("prune Expression = " + (prunerExpr == null ? "" : prunerExpr)); } - String key = tab.getFullyQualifiedName() + ";"; - if (tab.getMetaTable() != null) { - key = tab.getFullyQualifiedName() + "." + tab.getMetaTable() + ";"; - } else if (tab.getSnapshotRef() != null) { - key = tab.getFullyQualifiedName() + "." + tab.getSnapshotRef() + ";"; - } + String qualifier = tab.getQualifier(); + String key = tab.getFullyQualifiedName() + (qualifier.isEmpty() ? "" : "." + qualifier) + ";"; if (!tab.isPartitioned()) { // If the table is not partitioned, return empty list. @@ -441,7 +437,7 @@ static private boolean hasUserFunctions(ExprNodeDesc expr) { return false; } - private static PrunedPartitionList getPartitionsFromServer(Table tab, String key, ExprNodeDesc compactExpr, + private static PrunedPartitionList getPartitionsFromServer(Table tab, String key, ExprNodeDesc compactExpr, HiveConf conf, Set partColsUsedInFilter, boolean isPruningByExactFilter) throws SemanticException { try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index d5c683daa303..ffd0e5756976 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -3220,8 +3220,11 @@ private RelNode genTableLogicalPlan(String tableAlias, QB qb) throws SemanticExc fullyQualifiedTabName.add(tabMetaData.getDbName()); } fullyQualifiedTabName.add(tabMetaData.getTableName()); - if (tabMetaData.getSnapshotRef() != null) { - fullyQualifiedTabName.add(tabMetaData.getSnapshotRef()); + // Include time-travel qualifier (snapshotRef / asOfVersion / asOfTimestamp) + // in the table identity so two scans at different snapshots stay distinct. + String qualifier = tabMetaData.getQualifier(); + if (!qualifier.isEmpty()) { + fullyQualifiedTabName.add(qualifier); } optTable = new RelOptHiveTable(relOptSchema, relOptSchema.getTypeFactory(), fullyQualifiedTabName, rowType, tabMetaData, nonPartitionColumns, partitionColumns, virtualCols, conf, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index b2627bf3bf2b..6ea3b7cc1c75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -128,7 +128,7 @@ public class ParseContext { private boolean needViewColumnAuthorization; private Map rsToRuntimeValuesInfo = - new LinkedHashMap(); + new LinkedHashMap<>(); /** * Mapping holding information about semijoins. * @@ -451,7 +451,7 @@ public Map getColStatsCache() { * @return col stats */ public ColumnStatsList getColStatsCached(PrunedPartitionList partList) { - return ctx.getOpContext().getColStatsCache().get(partList.getKey().orElse(null)); + return ctx.getOpContext().getColStatsCache().get(partList.getKey()); } /** @@ -515,8 +515,7 @@ public Set getSemanticInputs() { return semanticInputs; } - public void replaceRootTask(Task rootTask, - List> tasks) { + public void replaceRootTask(Task rootTask, List> tasks) { this.rootTasks.remove(rootTask); this.rootTasks.addAll(tasks); } @@ -663,7 +662,7 @@ public void setColumnStatsAutoGatherContexts( public Collection getAllOps() { List ops = new ArrayList<>(); - Set visited = new HashSet(); + Set visited = new HashSet<>(); for (Operator op : getTopOps().values()) { getAllOps(ops, visited, op); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java index 398dbf555def..3329d4ae749d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PrunedPartitionList.java @@ -18,11 +18,9 @@ package org.apache.hadoop.hive.ql.parse; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Set; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -37,7 +35,7 @@ public class PrunedPartitionList { private final Table source; /** Key to identify this partition list. */ - private final Optional ppListKey; + private final String ppListKey; /** Partitions that either satisfy the partition criteria, or may satisfy it. */ private final Set partitions; @@ -56,7 +54,7 @@ public PrunedPartitionList(Table source, Set partitions, public PrunedPartitionList(Table source, String key, Set partitions, List referred, boolean hasUnknowns) { this.source = Objects.requireNonNull(source); - this.ppListKey = Optional.ofNullable(key); + this.ppListKey = key; this.referred = Objects.requireNonNull(referred); this.partitions = Objects.requireNonNull(partitions); this.hasUnknowns = hasUnknowns; @@ -66,7 +64,7 @@ public Table getSourceTable() { return source; } - public Optional getKey() { + public String getKey() { return ppListKey; } @@ -82,7 +80,7 @@ public Set getPartitions() { * @return all partitions. */ public List getNotDeniedPartns() { - return Collections.unmodifiableList(new ArrayList<>(partitions)); + return List.copyOf(partitions); } /**