Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2127,6 +2127,9 @@ public static enum ConfVars {
"If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"),
HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY("hive.iceberg.allow.datafiles.in.table.location.only", false,
"If this is set to true, then all the data files being read should be withing the table location"),
HIVE_ICEBERG_MATERIALIZEDVIEW_METADATA_LOCATION("hive.iceberg.materializedview.metadata.location", "metastore",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about dropping this config since we introduce external materialized views. We can extend the grammar to allow the external keyword in create materialized view statements.

create external materialized view mat_native stored by iceberg stored as orc tblproperties ('format-version'='1') as
select b, c from tbl_ice_native where c > 52;

For external MVs, store the MV metadata via the storage handler API, and keep the existing HMS storage logic for non-external MVs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I'm not sure what is the proper approach: As HIVE-29578: Iceberg: add support for logical views is merged, it changes the whole picture:
This implementation uses a HiveConf setting.
That implementation uses a table property, 'view-format'='iceberg'.

My problem is if we introduce the external keyword here and don't change the existing view-format solution, we provide an inconsistent interface for the users.

But I also don't think if the view-format table property is a proper approach for materialized views: it works great until we stay within the Hive engine. But what happens if we not?
How we will handle the case when the user creates a materialized view (or a simple logical view) in Spark and we want to use it from Hive?
Should create external create the view if it doesn't exist and just fill the Hive side of metadata if the view physically exists but Hive doesn't know about it yet? Like how we do it with external tables?

Tagging @difin (as the author for Iceberg views) and @zabetak (as you are on PTO now) here to get their opinion to get a consistent Hive behavior.

new StringSet("metastore", "iceberg"),
"Location of materialized view metadata stored by iceberg"),
HIVE_USE_EXPLICIT_RCFILE_HEADER("hive.exec.rcfile.use.explicit.header", true,
"If this is set the header for RCFiles will simply be RCF. If this is not\n" +
"set the header will be that borrowed from sequence files, e.g. SEQ- followed\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveCatalog extends BaseMetastoreViewCatalog
implements SupportsNamespaces, Configurable {
public class HiveCatalog extends BaseMetastoreViewCatalog implements SupportsNamespaces, Configurable {

public static final String LIST_ALL_TABLES = "list-all-tables";
public static final String LIST_ALL_TABLES_DEFAULT = "false";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,51 @@ private static boolean isValidIcebergTable(Table table) {
}

static void validateTableIsIceberg(Table table, String fullName) {
String tableType = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
String tableTypeProp = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
NoSuchIcebergTableException.check(
isValidIcebergTable(table), "Not an iceberg table: %s (type=%s)", fullName, tableType);
(TableType.MANAGED_TABLE.name().equalsIgnoreCase(table.getTableType()) ||
TableType.EXTERNAL_TABLE.name().equalsIgnoreCase(table.getTableType()) ||
TableType.MATERIALIZED_VIEW.name().equalsIgnoreCase(table.getTableType())) &&
(
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(tableTypeProp) ||
ICEBERG_VIEW_TYPE_VALUE.equalsIgnoreCase(tableTypeProp)
),
"Not an iceberg table: %s (type=%s) (tableType=%s)",
fullName,
tableTypeProp,
table.getTableType());
}

static void validateTableOrMVIsIceberg(Table table, String fullName) {
String tableTypeProp = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
NoSuchIcebergTableException.check(
(TableType.MANAGED_TABLE.name().equalsIgnoreCase(table.getTableType()) ||
TableType.EXTERNAL_TABLE.name().equalsIgnoreCase(table.getTableType())) &&
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(tableTypeProp) ||
(TableType.MATERIALIZED_VIEW.name().equalsIgnoreCase(table.getTableType()) ||
TableType.EXTERNAL_MATERIALIZED_VIEW.name().equalsIgnoreCase(table.getTableType())) &&
ICEBERG_VIEW_TYPE_VALUE.equalsIgnoreCase(tableTypeProp),
"Not an iceberg table: %s (type=%s) (tableType=%s)",
fullName,
tableTypeProp,
table.getTableType());

/*
table tbl_ice
tableTypeProp ICEBERG-VIEW
table.getTableType() EXTERNAL_TABLE


*/
}

static void validateTableIsIcebergView(Table table, String fullName) {
String tableTypeProp = table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);

NoSuchIcebergViewException.check(
isValidIcebergView(table),
(TableType.VIRTUAL_VIEW.name().equalsIgnoreCase(table.getTableType()) ||
TableType.EXTERNAL_MATERIALIZED_VIEW.name().equalsIgnoreCase(table.getTableType())) &&
ICEBERG_VIEW_TYPE_VALUE.equalsIgnoreCase(tableTypeProp),
"Not an iceberg view: %s (type=%s) (tableType=%s)",
fullName,
tableTypeProp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations
private final long maxHiveTablePropertySize;
private final int metadataRefreshMaxRetries;
private final FileIO fileIO;

private final ClientPool<IMetaStoreClient, TException> metaClients;

protected HiveTableOperations(
Expand Down Expand Up @@ -120,8 +122,12 @@ protected void doRefresh() {

// Check if we are trying to load an Iceberg View as a Table
HiveOperationsBase.validateIcebergViewNotLoadedAsIcebergTable(table, fullName);
// Check if it is a valid Iceberg Table
HiveOperationsBase.validateTableIsIceberg(table, fullName);

if ("iceberg".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ICEBERG_MATERIALIZEDVIEW_METADATA_LOCATION))) {
HiveOperationsBase.validateTableOrMVIsIceberg(table, fullName);
} else {
HiveOperationsBase.validateTableIsIceberg(table, fullName);
}

metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);

Expand Down
4 changes: 4 additions & 0 deletions iceberg/iceberg-handler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>patched-iceberg-core</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,43 @@

package org.apache.iceberg.mr;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.CreationMetadata;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hive.HMSTablePropertyHelper;
import org.apache.iceberg.hive.IcebergCatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.view.ImmutableRefreshState;
import org.apache.iceberg.view.ImmutableSourceState;
import org.apache.iceberg.view.RefreshState;
import org.apache.iceberg.view.RefreshStateParser;
import org.apache.iceberg.view.SourceState;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewVersion;

/**
* Class for catalog resolution and accessing the common functions for {@link Catalog} API.
Expand Down Expand Up @@ -76,6 +93,12 @@ public final class Catalogs {
ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, LOCATION, NAME,
InputFormatConfig.CATALOG_NAME);

public static final String MATERIALIZED_VIEW_PROPERTY_KEY = "iceberg.materialized.view";
public static final String MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY =
"iceberg.materialized.view.storage.table";
public static final String MATERIALIZED_VIEW_ORIGINAL_TEXT = "iceberg.materialized.view.original.text";
public static final String MATERIALIZED_VIEW_STORAGE_TABLE_IDENTIFIER_SUFFIX = "_storage_table";

private Catalogs() {
}

Expand Down Expand Up @@ -279,4 +302,139 @@ private static Map<String, String> filterIcebergTableProperties(Properties props
}
return map;
}

public static MaterializedView createMaterializedView(
Configuration conf, Properties props, String viewOriginalText, String viewExpandedText,
CreationMetadata creationMetadata) {

boolean isExternalMaterializedView = "iceberg".equals(
HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_ICEBERG_MATERIALIZEDVIEW_METADATA_LOCATION));

Schema schema = schema(props);
PartitionSpec spec = spec(props, schema);
String location = props.getProperty(LOCATION);
String storageTableLocation = location +
(isExternalMaterializedView ? MATERIALIZED_VIEW_STORAGE_TABLE_IDENTIFIER_SUFFIX : "");
String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);

Optional<Catalog> catalog = loadCatalog(conf, catalogName);
SortOrder sortOrder = HMSTablePropertyHelper.getSortOrder(props, schema);
if (catalog.isEmpty()) {
throw new IllegalStateException("Unable to load catalog: " + catalogName);
}
String name = props.getProperty(NAME);
Preconditions.checkNotNull(name, "Table identifier not set");

ViewCatalog viewCatalog = (ViewCatalog) catalog.get();

Map<String, String> map = filterIcebergTableProperties(props);
String storageTableIdentifier = name +
(isExternalMaterializedView ? MATERIALIZED_VIEW_STORAGE_TABLE_IDENTIFIER_SUFFIX : "");
Table storageTable = catalog.get().buildTable(TableIdentifier.parse(storageTableIdentifier), schema)
.withPartitionSpec(spec).withLocation(storageTableLocation).withProperties(map).withSortOrder(sortOrder)
.create();

Map<String, String> viewProperties = Maps.newHashMapWithExpectedSize(2);
viewProperties.put(MATERIALIZED_VIEW_PROPERTY_KEY, "true");
viewProperties.put(MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY, storageTableIdentifier);
viewProperties.put(MATERIALIZED_VIEW_ORIGINAL_TEXT, viewOriginalText);

long createTime = System.currentTimeMillis();

List<SourceState> sourceStates = Lists.newArrayList();

for (var sourceTable : creationMetadata.getSourceTables()) {
SourceState.SourceStateType type = TableType.EXTERNAL_TABLE.name()
.equalsIgnoreCase(sourceTable.getTable().getTableType()) ?
SourceState.SourceStateType.TABLE :
SourceState.SourceStateType.VIEW;

String dbName = sourceTable.getTable().getDbName();
String sourceTableName = sourceTable.getTable().getTableName();
String sourceTableNamespace = "default";
String sourceTableCatalog = sourceTable.getTable().isSetCatName() ? sourceTable.getTable().getCatName() : null;
Catalog tableCatalog = loadCatalog(conf, sourceTableCatalog).orElse(catalog.get());
UUID uuid = null;
Snapshot snapshot = null;
ViewVersion version = null;

switch (type) {
case TABLE -> {
Table icebergTable = tableCatalog.loadTable(TableIdentifier.of(dbName, sourceTableName));
uuid = icebergTable.uuid();
snapshot = icebergTable.currentSnapshot();

SourceState sourcestate = ImmutableSourceState.of(type, sourceTableName,
Namespace.of(sourceTableNamespace), catalogName,
uuid, snapshot == null ? null : snapshot.snapshotId(), null, null);
sourceStates.add(sourcestate);
}
case VIEW -> {
var icebergView = ((ViewCatalog) tableCatalog).loadView(TableIdentifier.parse(sourceTableName));
uuid = icebergView.uuid();
version = icebergView.currentVersion();

SourceState sourcestate = ImmutableSourceState.of(type, sourceTableName,
Namespace.of(sourceTableNamespace), catalogName,
uuid, null, null, version.versionId());
sourceStates.add(sourcestate);
}
}
}

RefreshState refreshState = ImmutableRefreshState.of(1, sourceStates, createTime);

SessionStateUtil.addResource(conf, "refresh-state", RefreshStateParser.toJson(refreshState));

TableIdentifier viewIdentifier = TableIdentifier.parse(name);
View mv = viewCatalog.buildView(viewIdentifier)
.withLocation(location)
.withDefaultNamespace(viewIdentifier.namespace())
.withQuery("hive", viewExpandedText)
.withSchema(schema)
.withProperties(viewProperties)
.withStorageTable(storageTableIdentifier)
.withCreateTime(createTime)
.create();

return new MaterializedView(mv, storageTable);
}

public static class MaterializedView {
private View view;
private Table storageTable;

public MaterializedView(View view, Table storageTable) {
this.view = view;
this.storageTable = storageTable;
}

public View getView() {
return view;
}

public Table getStorageTable() {
return storageTable;
}
}

public static MaterializedView loadMaterializedView(Configuration conf, Properties props) {
return loadMaterializedView(conf, props.getProperty(NAME), props.getProperty(InputFormatConfig.CATALOG_NAME));
}

public static MaterializedView loadMaterializedView(
Configuration conf, String tableIdentifier, String catalogName) {
Optional<Catalog> catalog = loadCatalog(conf, catalogName);

if (catalog.isPresent()) {
Preconditions.checkArgument(tableIdentifier != null, "View identifier not set");
ViewCatalog viewCatalog = (ViewCatalog) catalog.get();
View view = viewCatalog.loadView(TableIdentifier.parse(tableIdentifier));
String storageTableIdentifier = view.properties().get(MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY);
Table storageTable = catalog.get().loadTable(TableIdentifier.parse(storageTableIdentifier));
return new MaterializedView(view, storageTable);
}

throw new UnsupportedOperationException("Catalog " + catalogName + " not found!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private InputFormatConfig() {
public static final String TABLE_LOCATION = "iceberg.mr.table.location";
public static final String TABLE_METADATA_LOCATION = "iceberg.mr.table.metadata.location";
public static final String TABLE_SCHEMA = "iceberg.mr.table.schema";
public static final String TABLE_TYPE = "iceberg.mr.table.type";
public static final String PARTITION_SPEC = "iceberg.mr.table.partition.spec";
public static final String SERIALIZED_TABLE_PREFIX = "iceberg.mr.serialized.table.";
public static final String TABLE_CATALOG_PREFIX = "iceberg.mr.table.catalog.";
Expand Down
Loading
Loading