-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-29133: Support Z-order indexing for Iceberg tables via CREATE TABLE DDL #6138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
|
||
package org.apache.iceberg.mr.hive; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
|
@@ -41,6 +42,8 @@ | |
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; | ||
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc; | ||
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields; | ||
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.ZOrderFieldDesc; | ||
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.ZOrderFields; | ||
import org.apache.hadoop.hive.ql.util.NullOrdering; | ||
import org.apache.iceberg.BaseMetastoreTableOperations; | ||
import org.apache.iceberg.BaseTable; | ||
|
@@ -74,6 +77,8 @@ | |
import org.slf4j.LoggerFactory; | ||
|
||
import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ; | ||
import static org.apache.iceberg.mr.InputFormatConfig.SORT_COLUMNS; | ||
import static org.apache.iceberg.mr.InputFormatConfig.SORT_ORDER; | ||
|
||
public class BaseHiveIcebergMetaHook implements HiveMetaHook { | ||
private static final Logger LOG = LoggerFactory.getLogger(BaseHiveIcebergMetaHook.class); | ||
|
@@ -217,28 +222,83 @@ private void validateCatalogConfigsDefined() { | |
} | ||
} | ||
|
||
/** | ||
* Persists the table's write sort order based on the HMS property 'default-sort-order' | ||
* that is populated by the DDL layer. | ||
* <p> | ||
* Behaviour: | ||
* - If the JSON represents Z-order, we remove DEFAULT_SORT_ORDER | ||
* as Iceberg does not have Z-order support in its spec. | ||
* So, we persist Z-order metadata in {@link org.apache.iceberg.mr.InputFormatConfig#SORT_ORDER} | ||
* and {@link org.apache.iceberg.mr.InputFormatConfig#SORT_COLUMNS} to be used by Hive Writer. | ||
* <p> | ||
* - Otherwise, the JSON is a list of SortFields; we convert it to Iceberg | ||
* SortOrder JSON and keep it in DEFAULT_SORT_ORDER for Iceberg to use it. | ||
*/ | ||
private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema, | ||
Properties properties) { | ||
String sortOderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER); | ||
SortFields sortFields = null; | ||
if (!Strings.isNullOrEmpty(sortOderJSONString)) { | ||
try { | ||
sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOderJSONString, SortFields.class); | ||
} catch (Exception e) { | ||
LOG.warn("Can not read write order json: {}", sortOderJSONString, e); | ||
return; | ||
} | ||
String sortOrderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER); | ||
if (Strings.isNullOrEmpty(sortOrderJSONString)) { | ||
return; | ||
} | ||
|
||
if (isZOrderJSON(sortOrderJSONString)) { | ||
properties.remove(TableProperties.DEFAULT_SORT_ORDER); | ||
setZOrderSortOrder(sortOrderJSONString, properties); | ||
return; | ||
} | ||
|
||
try { | ||
SortFields sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOrderJSONString, SortFields.class); | ||
if (sortFields != null && !sortFields.getSortFields().isEmpty()) { | ||
SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema); | ||
SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(schema); | ||
sortFields.getSortFields().forEach(fieldDesc -> { | ||
NullOrder nullOrder = fieldDesc.getNullOrdering() == NullOrdering.NULLS_FIRST ? | ||
NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; | ||
NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST; | ||
SortDirection sortDirection = fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC ? | ||
SortDirection.ASC : SortDirection.DESC; | ||
sortOderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder); | ||
SortDirection.ASC : SortDirection.DESC; | ||
sortOrderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder); | ||
}); | ||
properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOderBuilder.build())); | ||
properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOrderBuilder.build())); | ||
} | ||
} catch (Exception e) { | ||
LOG.warn("Can not read write order json: {}", sortOrderJSONString); | ||
} | ||
} | ||
|
||
/** | ||
* Configures the Z-order sort order metadata in the given properties | ||
* based on the specified Z-order fields. | ||
* | ||
* @param jsonString the JSON string representing sort orders | ||
* @param properties the Properties object to store sort order metadata | ||
*/ | ||
private void setZOrderSortOrder(String jsonString, Properties properties) { | ||
try { | ||
ZOrderFields zorderFields = JSON_OBJECT_MAPPER.reader().readValue(jsonString, ZOrderFields.class); | ||
if (zorderFields != null && !zorderFields.getZOrderFields().isEmpty()) { | ||
List<String> columnNames = zorderFields.getZOrderFields().stream() | ||
.map(ZOrderFieldDesc::getColumnName) | ||
.collect(Collectors.toList()); | ||
|
||
LOG.info("Setting Z-order sort order for columns: {}", columnNames); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe LOG.debug( |
||
|
||
properties.put(SORT_ORDER, "ZORDER"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you please introduce the enum: enum SortType { |
||
properties.put(SORT_COLUMNS, String.join(",", columnNames)); | ||
|
||
LOG.info("Z-order sort order configured for Iceberg table with columns: {}", columnNames); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is redundant |
||
} | ||
} catch (Exception e) { | ||
LOG.warn("Failed to parse Z-order sort order", e); | ||
} | ||
} | ||
|
||
private boolean isZOrderJSON(String jsonString) { | ||
try { | ||
JsonNode node = JSON_OBJECT_MAPPER.readTree(jsonString); | ||
return node.has("zorderFields"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not move this to constans? |
||
} catch (Exception e) { | ||
return false; | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,6 +82,7 @@ | |
import org.apache.hadoop.hive.ql.ddl.table.create.like.CreateTableLikeDesc; | ||
import org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc; | ||
import org.apache.hadoop.hive.ql.exec.ColumnInfo; | ||
import org.apache.hadoop.hive.ql.exec.UDFArgumentException; | ||
import org.apache.hadoop.hive.ql.exec.Utilities; | ||
import org.apache.hadoop.hive.ql.hooks.WriteEntity; | ||
import org.apache.hadoop.hive.ql.io.IOConstants; | ||
|
@@ -119,6 +120,7 @@ | |
import org.apache.hadoop.hive.ql.session.SessionState; | ||
import org.apache.hadoop.hive.ql.session.SessionStateUtil; | ||
import org.apache.hadoop.hive.ql.stats.Partish; | ||
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; | ||
import org.apache.hadoop.hive.ql.util.NullOrdering; | ||
import org.apache.hadoop.hive.serde2.AbstractSerDe; | ||
import org.apache.hadoop.hive.serde2.Deserializer; | ||
|
@@ -184,6 +186,7 @@ | |
import org.apache.iceberg.mr.InputFormatConfig; | ||
import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles; | ||
import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction; | ||
import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergZorder; | ||
import org.apache.iceberg.puffin.Blob; | ||
import org.apache.iceberg.puffin.BlobMetadata; | ||
import org.apache.iceberg.puffin.Puffin; | ||
|
@@ -218,6 +221,8 @@ | |
import static org.apache.iceberg.SnapshotSummary.TOTAL_FILE_SIZE_PROP; | ||
import static org.apache.iceberg.SnapshotSummary.TOTAL_POS_DELETES_PROP; | ||
import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP; | ||
import static org.apache.iceberg.mr.InputFormatConfig.SORT_COLUMNS; | ||
import static org.apache.iceberg.mr.InputFormatConfig.SORT_ORDER; | ||
|
||
public class HiveIcebergStorageHandler extends DefaultStorageHandler implements HiveStoragePredicateHandler { | ||
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergStorageHandler.class); | ||
|
@@ -929,9 +934,64 @@ public DynamicPartitionCtx createDPContext( | |
addCustomSortExpr(table, hmsTable, writeOperation, dpCtx, getSortTransformSpec(table)); | ||
} | ||
|
||
// Even if table has no explicit sort order, honor z-order if configured | ||
Map<String, String> props = table.properties(); | ||
if ("ZORDER".equalsIgnoreCase(props.getOrDefault(SORT_ORDER, ""))) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please use enum |
||
createZOrderCustomSort(props, dpCtx, table, hmsTable, writeOperation); | ||
} | ||
|
||
return dpCtx; | ||
} | ||
|
||
/** | ||
* Adds a custom sort expression to the DynamicPartitionCtx that performs local Z-ordering on write. | ||
* | ||
* Behavior: | ||
* - Reads Z-order properties from 'sort.order' and 'sort.columns' (comma-separated). | ||
* - Resolves the referenced columns to their positions in the physical row (taking into account | ||
* ACID virtual columns offset for overwrite/update operations). | ||
* - Configures a single ASC sort key with NULLS FIRST and injects a custom key expression for | ||
* Z-order | ||
*/ | ||
private void createZOrderCustomSort(Map<String, String> props, DynamicPartitionCtx dpCtx, Table table, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe addZOrderExpr ? |
||
org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation writeOperation) { | ||
String colsProp = props.get(SORT_COLUMNS); | ||
if (StringUtils.isNotBlank(colsProp)) { | ||
List<String> zCols = Arrays.stream(colsProp.split(",")).map(String::trim) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. List zCols = Arrays.stream(colsProp.split(",")) |
||
.filter(s -> !s.isEmpty()).collect(Collectors.toList()); | ||
|
||
Map<String, Integer> fieldOrderMap = Maps.newHashMap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Map<String, Integer> fieldOrderMap = new HashMap<>(fields.size()); |
||
List<Types.NestedField> fields = table.schema().columns(); | ||
for (int i = 0; i < fields.size(); ++i) { | ||
fieldOrderMap.put(fields.get(i).name(), i); | ||
} | ||
int offset = (shouldOverwrite(hmsTable, writeOperation) ? | ||
ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA : acidSelectColumns(hmsTable, writeOperation)).size(); | ||
|
||
List<Integer> zIndices = zCols.stream().map(col -> { | ||
Integer base = fieldOrderMap.get(col); | ||
Preconditions.checkArgument(base != null, "Z-order column not found in schema: %s", col); | ||
return base + offset; | ||
}).collect(Collectors.toList()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. .toList() |
||
|
||
dpCtx.setCustomSortOrder(Lists.newArrayList(Collections.singletonList(1))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we even need to set CustomSortOrder and CustomSortNullOrder for z-order? |
||
dpCtx.setCustomSortNullOrder(Lists.newArrayList(Collections.singletonList(NullOrdering.NULLS_FIRST.getCode()))); | ||
|
||
dpCtx.addCustomSortExpressions(Collections.singletonList(allCols -> { | ||
List<ExprNodeDesc> args = Lists.newArrayListWithExpectedSize(zIndices.size()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. List args = zIndices.stream() |
||
for (Integer idx : zIndices) { | ||
args.add(allCols.get(idx)); | ||
} | ||
try { | ||
GenericUDF udf = new GenericUDFIcebergZorder(); | ||
return ExprNodeGenericFuncDesc.newInstance(udf, "iceberg_zorder", args); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. name is redundant?
|
||
} catch (UDFArgumentException e) { | ||
throw new RuntimeException(e); | ||
} | ||
})); | ||
} | ||
} | ||
|
||
private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.Table hmsTable, | ||
Operation writeOperation, DynamicPartitionCtx dpCtx, | ||
List<TransformSpec> transformSpecs) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sortOrder is ASC or DESC, should we rename to SORT_TYPE?