diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index b7de80510e223..1aaebc60e1a17 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -4123,6 +4123,12 @@ }, "sqlState" : "KD002" }, + "INVALID_METRIC_VIEW_YAML" : { + "message" : [ + "Failed to parse metric view YAML: " + ], + "sqlState" : "42K0L" + }, "INVALID_NAME_IN_USE_COMMAND" : { "message" : [ "Invalid name '' in command. Reason: " @@ -8261,6 +8267,11 @@ "The table is a Spark data source table. Please use SHOW CREATE TABLE without AS SERDE instead." ] }, + "ON_METRIC_VIEW" : { + "message" : [ + "The command is not supported on a metric view ." + ] + }, "ON_TEMPORARY_VIEW" : { "message" : [ "The command is not supported on a temporary view ." diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Dependency.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Dependency.java new file mode 100644 index 0000000000000..4de02606b981f --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Dependency.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.annotation.Evolving; + +/** + * Represents a dependency of a SQL object such as a view or metric view. + *

+ * A dependency is one of: {@link TableDependency} or {@link FunctionDependency}. The + * {@code sealed} declaration enforces this structurally. + *

+ * Note: today the only producer in Spark itself is metric-view dependency extraction, which + * emits {@link TableDependency} only. {@link FunctionDependency} and the + * {@link #function(String[])} factory are exposed as groundwork for future producers + * (e.g. SQL UDF dependency tracking); consumers iterating a {@link DependencyList} received + * from Spark today should expect to see only {@link TableDependency} instances. + * + * @since 4.2.0 + */ +@Evolving +public sealed interface Dependency permits TableDependency, FunctionDependency { + + /** + * Construct a {@link TableDependency} from the structural multi-part name of the dependent + * table. {@code nameParts} should contain at least one element; for catalog-managed tables + * the first element is typically the catalog name and subsequent elements are namespace + * components followed by the table name. + */ + static TableDependency table(String[] nameParts) { + return new TableDependency(nameParts); + } + + /** + * Construct a {@link FunctionDependency} from the structural multi-part name of the + * dependent function. {@code nameParts} should contain at least one element; for + * catalog-managed functions the first element is typically the catalog name and subsequent + * elements are namespace components followed by the function name. + */ + static FunctionDependency function(String[] nameParts) { + return new FunctionDependency(nameParts); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DependencyList.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DependencyList.java new file mode 100644 index 0000000000000..21c1e662fda62 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DependencyList.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Arrays; +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; + +/** + * A list of dependencies for a SQL object such as a view or metric view. + *

+ *

    + *
  • When {@code null}, the dependency information is not provided.
  • + *
  • When the array is empty, dependencies are provided but the object has none.
  • + *
  • When the array is non-empty, each entry describes one dependency.
  • + *
+ *

+ * Records' auto-generated {@code equals}/{@code hashCode} on array fields fall through to + * {@link Object#equals} (reference equality), so this record overrides them to use + * {@link Arrays#equals(Object[], Object[])} / {@link Arrays#hashCode(Object[])} on + * {@code dependencies}; per-element equality delegates to the element's overridden + * {@code equals} ({@link TableDependency} / {@link FunctionDependency} both implement value + * semantics on their {@code nameParts} array). The defensive-copy accessor override clones + * on read so callers cannot mutate the record's internal array. + * + * @param dependencies array of dependencies; must contain no null elements (defensive + * copy made; not validated element-wise -- callers passing nulls will + * surface NPEs in downstream consumers) + * @since 4.2.0 + */ +@Evolving +public record DependencyList(Dependency[] dependencies) { + + public DependencyList { + Objects.requireNonNull(dependencies, "dependencies must not be null"); + dependencies = dependencies.clone(); + } + + /** Returns a defensive copy of the underlying dependencies array. */ + @Override + public Dependency[] dependencies() { return dependencies.clone(); } + + @Override + public boolean equals(Object o) { + return o instanceof DependencyList that && Arrays.equals(dependencies, that.dependencies); + } + + @Override + public int hashCode() { return Arrays.hashCode(dependencies); } + + @Override + public String toString() { + return "DependencyList[dependencies=" + Arrays.toString(dependencies) + "]"; + } + + public static DependencyList of(Dependency[] dependencies) { + return new DependencyList(dependencies); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/FunctionDependency.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/FunctionDependency.java new file mode 100644 index 0000000000000..c19d118043afa --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/FunctionDependency.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Arrays; +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; + +/** + * A function dependency of a SQL object. + *

+ * The dependent function is identified by its structural multi-part name. See + * {@link TableDependency} for the parts-form contract. + *

+ * Records' auto-generated {@code equals}/{@code hashCode} on array fields fall through to + * {@link Object#equals} (reference equality), so this record overrides them to use + * {@link Arrays#equals(Object[], Object[])} / {@link Arrays#hashCode(Object[])} on + * {@code nameParts} and give value-based semantics. The defensive-copy accessor override + * also clones on read so callers cannot mutate the record's internal array. + * + * @param nameParts structural multi-part identifier; must be non-empty and contain no + * null elements (defensive copy made; not validated element-wise -- + * callers passing nulls will surface NPEs in downstream consumers) + * @since 4.2.0 + */ +@Evolving +public record FunctionDependency(String[] nameParts) implements Dependency { + public FunctionDependency { + Objects.requireNonNull(nameParts, "nameParts must not be null"); + if (nameParts.length == 0) { + throw new IllegalArgumentException("nameParts must not be empty"); + } + nameParts = nameParts.clone(); + } + + /** Returns a defensive copy of the underlying parts array. */ + @Override + public String[] nameParts() { return nameParts.clone(); } + + @Override + public boolean equals(Object o) { + return o instanceof FunctionDependency that && Arrays.equals(nameParts, that.nameParts); + } + + @Override + public int hashCode() { return Arrays.hashCode(nameParts); } + + @Override + public String toString() { + return "FunctionDependency[nameParts=" + Arrays.toString(nameParts) + "]"; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableDependency.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableDependency.java new file mode 100644 index 0000000000000..0aa9a47311607 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableDependency.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import java.util.Arrays; +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; + +/** + * A table dependency of a SQL object. + *

+ * The dependent table is identified by its structural multi-part name. {@code nameParts} + * arity matches the catalog's namespace depth plus one for the table name -- for a catalog + * with single-level namespaces the parts are {@code [catalog, schema, table]}; for a catalog + * with multi-level namespaces (e.g. Iceberg with {@code db1.db2}) the parts are + * {@code [catalog, db1, db2, ..., table]}; for v1 sources resolved through the session + * catalog, producers should normalize to {@code [spark_catalog, db, table]} so consumers see + * a stable arity per source kind. The structural form preserves arity and is unambiguous + * against quoted identifiers containing a literal {@code .}; consumers that need a flat + * string should join the parts themselves with a quoting scheme appropriate to their wire + * format. + *

+ * Records' auto-generated {@code equals}/{@code hashCode} on array fields fall through to + * {@link Object#equals} (reference equality), so this record overrides them to use + * {@link Arrays#equals(Object[], Object[])} / {@link Arrays#hashCode(Object[])} on + * {@code nameParts} and give value-based semantics. The defensive-copy accessor override + * also clones on read so callers cannot mutate the record's internal array. + * + * @param nameParts structural multi-part identifier; must be non-empty and contain no + * null elements (defensive copy made; not validated element-wise -- + * callers passing nulls will surface NPEs in downstream consumers) + * @since 4.2.0 + */ +@Evolving +public record TableDependency(String[] nameParts) implements Dependency { + public TableDependency { + Objects.requireNonNull(nameParts, "nameParts must not be null"); + if (nameParts.length == 0) { + throw new IllegalArgumentException("nameParts must not be empty"); + } + nameParts = nameParts.clone(); + } + + /** Returns a defensive copy of the underlying parts array. */ + @Override + public String[] nameParts() { return nameParts.clone(); } + + @Override + public boolean equals(Object o) { + return o instanceof TableDependency that && Arrays.equals(nameParts, that.nameParts); + } + + @Override + public int hashCode() { return Arrays.hashCode(nameParts); } + + @Override + public String toString() { + return "TableDependency[nameParts=" + Arrays.toString(nameParts) + "]"; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableSummary.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableSummary.java index 8f46a372342a8..17a4f23bdd1f2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableSummary.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableSummary.java @@ -27,6 +27,7 @@ public interface TableSummary { String EXTERNAL_TABLE_TYPE = "EXTERNAL"; String VIEW_TABLE_TYPE = "VIEW"; String FOREIGN_TABLE_TYPE = "FOREIGN"; + String METRIC_VIEW_TABLE_TYPE = "METRIC_VIEW"; Identifier identifier(); String tableType(); diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewInfo.java index 307a5ff486e58..0f46e915a9be2 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewInfo.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewInfo.java @@ -48,6 +48,7 @@ public class ViewInfo extends TableInfo { private final Map sqlConfigs; private final String schemaMode; private final String[] queryColumnNames; + private final DependencyList viewDependencies; protected ViewInfo(Builder builder) { super(builder); @@ -57,11 +58,11 @@ protected ViewInfo(Builder builder) { this.sqlConfigs = Collections.unmodifiableMap(builder.sqlConfigs); this.schemaMode = builder.schemaMode; this.queryColumnNames = builder.queryColumnNames; - // Force PROP_TABLE_TYPE = VIEW so that `properties()` reflects the typed ViewInfo - // classification. Catalogs and generic viewers reading PROP_TABLE_TYPE from the properties - // bag (e.g. TableCatalog.listTableSummaries default impl, DESCRIBE) see "VIEW" without - // requiring authors to remember to call withTableType(VIEW). - properties().put(TableCatalog.PROP_TABLE_TYPE, TableSummary.VIEW_TABLE_TYPE); + this.viewDependencies = builder.viewDependencies; + // Default PROP_TABLE_TYPE = VIEW so `properties()` reflects the typed ViewInfo + // classification. Callers can refine to a more specific view kind (for example, + // METRIC_VIEW) by calling BaseBuilder.withTableType(...) on the builder before build(). + properties().putIfAbsent(TableCatalog.PROP_TABLE_TYPE, TableSummary.VIEW_TABLE_TYPE); } /** The SQL text of the view. */ @@ -102,6 +103,14 @@ protected ViewInfo(Builder builder) { */ public String[] queryColumnNames() { return queryColumnNames; } + /** + * Returns the structured list of objects this view depends on (source tables and functions), + * or {@code null} if no dependency list was supplied. Unlike other view metadata which is + * encoded into {@link #properties()}, dependency lists are a first-class field because their + * nested structure does not round-trip cleanly through flat string properties. + */ + public DependencyList viewDependencies() { return viewDependencies; } + public static class Builder extends BaseBuilder { private String queryText; private String currentCatalog; @@ -109,6 +118,7 @@ public static class Builder extends BaseBuilder { private Map sqlConfigs = new HashMap<>(); private String schemaMode; private String[] queryColumnNames = new String[0]; + private DependencyList viewDependencies = null; @Override protected Builder self() { return this; } @@ -143,6 +153,16 @@ public Builder withQueryColumnNames(String[] queryColumnNames) { return this; } + /** + * Sets the structured dependency list for this view. Source tables and functions referenced + * by the view text should be recorded here so downstream consumers (e.g. catalogs persisting + * lineage) can access them without re-analyzing the view body. + */ + public Builder withViewDependencies(DependencyList viewDependencies) { + this.viewDependencies = viewDependencies; + return this; + } + @Override public ViewInfo build() { Objects.requireNonNull(columns, "columns should not be null"); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2d09759056c5a..df7f5c662d8f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1226,7 +1226,7 @@ class Analyzer( ) { CatalogV2Util.loadTable(catalog, ident).map { case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) && - v1Table.v1Table.tableType == CatalogTableType.VIEW => + v1Table.v1Table.isViewLike => val v1Ident = v1Table.catalogTable.identifier val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier) ResolvedPersistentView( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index 528e4ad0387a6..ef5862547574b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.{ CatalogTable, - CatalogTableType, TemporaryViewRelation, UnresolvedCatalogRelation } @@ -398,7 +397,7 @@ class RelationResolution( timeTravelSpec: Option[TimeTravelSpec]): Option[LogicalPlan] = { def createDataSourceV1Scan(v1Table: CatalogTable): LogicalPlan = { if (isStreaming) { - if (v1Table.tableType == CatalogTableType.VIEW) { + if (v1Table.isViewLike) { throw QueryCompilationErrors.permanentViewNotSupportedByStreamingReadingAPIError( ident.quoted ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index cd4a5645151b6..1aa5c483db88b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -394,7 +394,9 @@ class InMemoryCatalog( override def listViews(db: String, pattern: String): Seq[String] = synchronized { requireDbExists(db) - val views = catalog(db).tables.filter(_._2.table.tableType == CatalogTableType.VIEW).keySet + val views = catalog(db).tables.filter { case (_, t) => + t.table.isViewLike + }.keySet StringUtils.filterPattern(views.toSeq.sorted, pattern) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index a3efade9b9a1c..32aa8cccbd93a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1256,7 +1256,7 @@ class SessionCatalog( import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ val ident = nameParts.asTableIdentifier try { - getTempViewOrPermanentTableMetadata(ident).tableType == CatalogTableType.VIEW + getTempViewOrPermanentTableMetadata(ident).isViewLike } catch { case _: NoSuchTableException => false case _: NoSuchNamespaceException => false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 1c4362bfd3ed7..efd4bf621921e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -461,6 +461,14 @@ case class CatalogTable( */ def fullIdent: Seq[String] = multipartIdentifier.getOrElse(identifier.nameParts) + /** + * Returns whether this table behaves like a view at resolution / DDL time. Today: VIEW or + * METRIC_VIEW. Forks may extend this set with additional view-like types, so call sites + * that need a uniform "is this view-like?" check should prefer this helper over inline + * disjunctions on `tableType`. + */ + def isViewLike: Boolean = CatalogTable.isViewLike(tableType) + /** * schema of this table's partition columns */ @@ -667,7 +675,7 @@ case class CatalogTable( if (comment.isDefined) map += "Comment" -> JString(comment.get) if (collation.isDefined) map += "Collation" -> JString(collation.get) - if (tableType == CatalogTableType.VIEW) { + if (isViewLike) { if (viewText.isDefined) { map += "View Text" -> JString(viewText.get) } @@ -742,15 +750,30 @@ object CatalogTable { val VIEW_CATALOG_AND_NAMESPACE = VIEW_PREFIX + "catalogAndNamespace.numParts" val VIEW_CATALOG_AND_NAMESPACE_PART_PREFIX = VIEW_PREFIX + "catalogAndNamespace.part." - // Property to indicate that a VIEW is actually a METRIC VIEW - val VIEW_WITH_METRICS = VIEW_PREFIX + "viewWithMetrics" + /** + * View sub-type marker persisted in `properties` so the metric-view distinction survives a + * round-trip through external catalogs whose enum can't carry it (e.g. the Hive Metastore, + * which only knows `VIRTUAL_VIEW`). When this property is set, the in-memory `tableType` + * upgrades from [[CatalogTableType.VIEW]] back to [[CatalogTableType.METRIC_VIEW]] on read. + */ + val VIEW_SUB_TYPE = VIEW_PREFIX + "subType" + val VIEW_SUB_TYPE_METRIC_VIEW = "METRIC_VIEW" /** - * Check if a CatalogTable is a metric view by looking at its properties. + * Check if a CatalogTable is a metric view. */ def isMetricView(table: CatalogTable): Boolean = { - table.tableType == CatalogTableType.VIEW && - table.properties.get(VIEW_WITH_METRICS).contains("true") + table.tableType == CatalogTableType.METRIC_VIEW + } + + /** + * Type-only form of [[CatalogTable.isViewLike]]; returns whether the given table type + * behaves like a view at resolution / DDL time. Use this overload when you have a + * [[CatalogTableType]] but no surrounding [[CatalogTable]] (e.g. inside `match`/`case` + * patterns or [[org.apache.spark.sql.catalyst.catalog.SessionCatalog.isView]]). + */ + def isViewLike(tableType: CatalogTableType): Boolean = { + tableType == CatalogTableType.VIEW || tableType == CatalogTableType.METRIC_VIEW } // Convert the current catalog and namespace to properties. @@ -1089,8 +1112,9 @@ object CatalogTableType { val EXTERNAL = new CatalogTableType("EXTERNAL") val MANAGED = new CatalogTableType("MANAGED") val VIEW = new CatalogTableType("VIEW") + val METRIC_VIEW = new CatalogTableType("METRIC_VIEW") - val tableTypes = Seq(EXTERNAL, MANAGED, VIEW) + val tableTypes = Seq(EXTERNAL, MANAGED, VIEW, METRIC_VIEW) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala index a1fb2c1c84e40..8a47cac8e7962 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V1Table.scala @@ -105,6 +105,7 @@ private[sql] object V1Table { case CatalogTableType.EXTERNAL => Some(TableSummary.EXTERNAL_TABLE_TYPE) case CatalogTableType.MANAGED => Some(TableSummary.MANAGED_TABLE_TYPE) case CatalogTableType.VIEW => Some(TableSummary.VIEW_TABLE_TYPE) + case CatalogTableType.METRIC_VIEW => Some(TableSummary.METRIC_VIEW_TABLE_TYPE) case _ => None } } @@ -195,9 +196,15 @@ private[sql] object V1Table { val schemaModeProps = Option(info.schemaMode) .map(m => Map(CatalogTable.VIEW_SCHEMA_MODE -> m)) .getOrElse(Map.empty) + // ViewInfo always represents a view-like table, but PROP_TABLE_TYPE may further refine the + // kind (e.g. METRIC_VIEW). Default to plain VIEW when no refinement is supplied. + val tableType = props.get(TableCatalog.PROP_TABLE_TYPE) match { + case Some(TableSummary.METRIC_VIEW_TABLE_TYPE) => CatalogTableType.METRIC_VIEW + case _ => CatalogTableType.VIEW + } CatalogTable( identifier = ident.asLegacyTableIdentifier(catalog.name()), - tableType = CatalogTableType.VIEW, + tableType = tableType, storage = CatalogStorageFormat.empty, schema = CatalogV2Util.v2ColumnsToStructType(info.columns), owner = props.getOrElse(TableCatalog.PROP_OWNER, ""), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 8c8fd4a4428be..c2f7d4ebcf46a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2826,6 +2826,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map.empty) } + def invalidMetricViewYamlError(message: String, cause: Throwable): Throwable = { + new AnalysisException( + errorClass = "INVALID_METRIC_VIEW_YAML", + messageParameters = Map("message" -> message), + cause = Some(cause)) + } + def noSuchStructFieldInGivenFieldsError( fieldName: String, fields: Array[StructField]): Throwable = { new AnalysisException( @@ -3323,6 +3330,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat messageParameters = Map("tableName" -> toSQLId(table))) } + def showCreateTableNotSupportedOnMetricViewError(table: String): Throwable = { + new AnalysisException( + errorClass = "UNSUPPORTED_SHOW_CREATE_TABLE.ON_METRIC_VIEW", + messageParameters = Map("tableName" -> toSQLId(table))) + } + def showCreateTableNotSupportTransactionalHiveTableError(table: CatalogTable): Throwable = { new AnalysisException( errorClass = "UNSUPPORTED_SHOW_CREATE_TABLE.ON_TRANSACTIONAL_HIVE_TABLE", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala index 2e76a13741d09..1b4718ebd385e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala @@ -94,7 +94,7 @@ private[sql] object Source { if (sourceText.isEmpty) { throw MetricViewValidationException("Source cannot be empty") } - Try(CatalystSqlParser.parseTableIdentifier(sourceText)) match { + Try(CatalystSqlParser.parseMultipartIdentifier(sourceText)) match { case Success(_) => AssetSource(sourceText) case Failure(_) => Try(CatalystSqlParser.parseQuery(sourceText)) match { @@ -167,4 +167,42 @@ private[sql] case class MetricView( version: String, from: Source, where: Option[String] = None, - select: Seq[Column]) + select: Seq[Column]) { + + /** + * Returns a set of table properties describing this metric view's source and + * filter clauses. Mirrors the property keys used by the canonical metric view + * representation on other Spark platforms so consumers of the catalog see a + * consistent property layout. + * + * Note: `metric_view.from.sql` and `metric_view.where` values are truncated to + * [[Constants.MAXIMUM_PROPERTY_SIZE]] characters, so these are descriptive values + * for catalog browsers / lineage tooling -- not round-trippable representations + * of the source. Consumers that need the full SQL or filter expression for + * re-execution should read [[ViewInfo#queryText]] (the YAML body) and re-parse it + * rather than reconstruct the query from these properties; for any source whose + * SQL exceeds the size limit, this property would silently return a truncated + * string. + */ + def getProperties: Map[String, String] = { + val base = Map(MetricView.PROP_FROM_TYPE -> from.sourceType.toString) + val fromProps = from match { + case asset: AssetSource => + base + (MetricView.PROP_FROM_NAME -> asset.name) + case sql: SQLSource => + base + (MetricView.PROP_FROM_SQL -> MetricView.truncate(sql.sql)) + } + where.fold(fromProps)(w => + fromProps + (MetricView.PROP_WHERE -> MetricView.truncate(w))) + } +} + +private[sql] object MetricView { + final val PROP_FROM_TYPE = "metric_view.from.type" + final val PROP_FROM_NAME = "metric_view.from.name" + final val PROP_FROM_SQL = "metric_view.from.sql" + final val PROP_WHERE = "metric_view.where" + + private def truncate(value: String): String = + value.take(Constants.MAXIMUM_PROPERTY_SIZE) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala index 121d908eda90b..2ac20ebeaa958 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala @@ -64,10 +64,13 @@ object MetricViewPlanner { val metricView = try { MetricViewFactory.fromYAML(yaml) } catch { + // Both cases are user-correctable errors in the YAML body, not internal Spark bugs; + // surface them as `INVALID_METRIC_VIEW_YAML` AnalysisExceptions so the message is + // categorized as user input error rather than "please contact support". case e: MetricViewValidationException => - throw QueryCompilationErrors.invalidLiteralForWindowDurationError() + throw QueryCompilationErrors.invalidMetricViewYamlError(e.getMessage, e) case e: MetricViewYAMLParsingException => - throw QueryCompilationErrors.invalidLiteralForWindowDurationError() + throw QueryCompilationErrors.invalidMetricViewYamlError(e.getMessage, e) } val source = metricView.from match { case asset: AssetSource => UnresolvedRelation(sqlParser.parseMultipartIdentifier(asset.name)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 29bcb98f3f099..8f0b664e10c5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -342,9 +342,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) DropTableCommand(ident, ifExists, isView = true, purge = false) // ViewCatalog catalogs fall through to `DataSourceV2Strategy`, which routes DROP VIEW to - // `ViewCatalog.dropView`. Other non-session catalogs get `MISSING_CATALOG_ABILITY.VIEWS`, - // matching the error raised from `CheckViewReferences` for CREATE/ALTER VIEW and from the - // analyzer gate on UnresolvedView. + // `ViewCatalog.dropView` (this also covers METRIC_VIEW since metric views are persisted + // through the same ViewCatalog interface). Other non-session catalogs get + // `MISSING_CATALOG_ABILITY.VIEWS`, matching the error raised from `CheckViewReferences` for + // CREATE/ALTER VIEW and from the analyzer gate on UnresolvedView. case DropView(r @ ResolvedIdentifier(catalog, ident), ifExists) if !catalog.isInstanceOf[ViewCatalog] => if (catalog == FakeSystemCatalog) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala index 38483395ec8c5..3e27605940383 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataStreamWriter.scala @@ -28,7 +28,7 @@ import org.apache.spark.annotation.Evolving import org.apache.spark.api.java.function.VoidFunction2 import org.apache.spark.sql.{streaming, Dataset => DS, ForeachWriter} import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.{ColumnDefinition, CreateTable, OptionList, UnresolvedTableSpec} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes @@ -190,7 +190,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) extends streaming.D val tableInstance = catalog.asTableCatalog.loadTable(identifier) def writeToV1Table(table: CatalogTable): StreamingQuery = { - if (table.tableType == CatalogTableType.VIEW) { + if (table.isViewLike) { throw QueryCompilationErrors.streamingIntoViewNotSupportedError(tableName) } require(table.provider.isDefined) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index 157f0071a3dc6..924e8b820a7c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.CatalogStatistics import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.classic.ClassicConversions.castToImpl @@ -104,7 +104,7 @@ case class AnalyzeColumnCommand( private def analyzeColumnInCatalog(sparkSession: SparkSession): Unit = { val sessionState = sparkSession.sessionState val tableMeta = sessionState.catalog.getTableMetadata(tableIdent) - if (tableMeta.tableType == CatalogTableType.VIEW) { + if (tableMeta.isViewLike) { // Analyzes a catalog view if the view is cached val plan = sparkSession.table(tableIdent.quotedString).logicalPlan if (!analyzeColumnInCachedData(plan, sparkSession)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 8f1e05c87c8f3..39169f8d22df4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.classic.ClassicConversions.castToImpl import org.apache.spark.sql.errors.QueryCompilationErrors @@ -75,7 +75,7 @@ case class AnalyzePartitionCommand( val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) - if (tableMeta.tableType == CatalogTableType.VIEW) { + if (tableMeta.isViewLike) { throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 23055037ac4cf..b9ff24139f07d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.LogKeys.{COUNT, DATABASE_NAME, ERROR, TABLE_NAM import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, CatalogTableType, ExternalCatalogUtils} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -240,7 +240,7 @@ object CommandUtils extends Logging { val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) - if (tableMeta.tableType == CatalogTableType.VIEW) { + if (tableMeta.isViewLike) { // Analyzes a catalog view if the view is cached val table = sparkSession.table(tableIdent.quotedString) val cacheManager = sparkSession.sharedState.cacheManager diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DescribeRelationJsonCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DescribeRelationJsonCommand.scala index 5f8528b679542..64317a04547a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DescribeRelationJsonCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DescribeRelationJsonCommand.scala @@ -27,7 +27,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.{ResolvedPersistentView, ResolvedTable, ResolvedTempView} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -309,7 +309,7 @@ case class DescribeRelationJsonCommand( catalog: SessionCatalog, metadata: CatalogTable, jsonMap: mutable.LinkedHashMap[String, JValue]): Unit = { - if (metadata.tableType == CatalogTableType.VIEW) { + if (metadata.isViewLike) { throw QueryCompilationErrors.descPartitionNotAllowedOnView(metadata.identifier.identifier) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 0415a33e2d6dd..7519216f1b367 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -50,7 +50,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo extends LeafRunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - assert(table.tableType != CatalogTableType.VIEW) + assert(!table.isViewLike) assert(table.provider.isDefined) val sessionState = sparkSession.sessionState @@ -151,7 +151,7 @@ case class CreateDataSourceTableAsSelectCommand( override def innerChildren: Seq[LogicalPlan] = query :: Nil override def run(sparkSession: SparkSession): Seq[Row] = { - assert(table.tableType != CatalogTableType.VIEW) + assert(!table.isViewLike) assert(table.provider.isDefined) val sessionState = sparkSession.sessionState diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 160b007b547f6..fe3ec61ce0f11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -232,7 +232,8 @@ case class DropTableCommand( // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view // issue an exception. catalog.getTableMetadata(tableName).tableType match { - case CatalogTableType.VIEW if !isView => + // Both VIEW and METRIC_VIEW are conceptually views and must be dropped via DROP VIEW. + case t if CatalogTable.isViewLike(t) && !isView => throw QueryCompilationErrors.wrongCommandForObjectTypeError( operation = "DROP TABLE", requiredType = s"${CatalogTableType.EXTERNAL.name} or ${CatalogTableType.MANAGED.name}", @@ -240,10 +241,11 @@ case class DropTableCommand( foundType = catalog.getTableMetadata(tableName).tableType.name, alternative = "DROP VIEW" ) - case o if o != CatalogTableType.VIEW && isView => + case o if !CatalogTable.isViewLike(o) && isView => throw QueryCompilationErrors.wrongCommandForObjectTypeError( operation = "DROP VIEW", - requiredType = CatalogTableType.VIEW.name, + requiredType = + s"${CatalogTableType.VIEW.name} or ${CatalogTableType.METRIC_VIEW.name}", objectName = catalog.getTableMetadata(tableName).qualifiedName, foundType = o.name, alternative = "DROP TABLE" @@ -1087,11 +1089,11 @@ object DDLUtils extends Logging { isView: Boolean): Unit = { if (!catalog.isTempView(tableMetadata.identifier)) { tableMetadata.tableType match { - case CatalogTableType.VIEW if !isView => + case t if CatalogTable.isViewLike(t) && !isView => throw QueryCompilationErrors.cannotAlterViewWithAlterTableError( viewName = tableMetadata.identifier.table ) - case o if o != CatalogTableType.VIEW && isView => + case o if !CatalogTable.isViewLike(o) && isView => throw QueryCompilationErrors.cannotAlterTableWithAlterViewError( tableName = tableMetadata.identifier.table ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala index 623685f6c20a7..5937ad2300cd3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala @@ -21,9 +21,15 @@ import org.apache.spark.SparkException import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.{QueryPlanningTracker, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, SchemaUnsupported} -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HiveTableRelation, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions.SubqueryExpression +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View} +import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME +import org.apache.spark.sql.connector.catalog.CatalogV2Util import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.metricview.serde.MetricView import org.apache.spark.sql.metricview.util.MetricViewPlanner import org.apache.spark.sql.types.StructType @@ -39,15 +45,15 @@ case class CreateMetricViewCommand( import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val name = child match { - case v: ResolvedIdentifier => - v.identifier.asTableIdentifier + child match { + case v: ResolvedIdentifier if CatalogV2Util.isSessionCatalog(v.catalog) => + createMetricViewInSessionCatalog(sparkSession, v) case _ => throw SparkException.internalError( s"Failed to resolve identifier for creating metric view") } - val analyzed = MetricViewHelper.analyzeMetricViewText(sparkSession, name, originalText) + } + private def validateUserColumns(name: TableIdentifier, analyzed: LogicalPlan): Unit = { if (userSpecifiedColumns.nonEmpty) { if (userSpecifiedColumns.length > analyzed.output.length) { throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( @@ -57,14 +63,29 @@ case class CreateMetricViewCommand( name.nameParts, userSpecifiedColumns.map(_._1), analyzed) } } + } + + private def createMetricViewInSessionCatalog( + sparkSession: SparkSession, + resolved: ResolvedIdentifier): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + val name = resolved.identifier.asTableIdentifier + val (analyzed, metricView) = MetricViewHelper.analyzeMetricViewText( + sparkSession, name.nameParts, originalText) + validateUserColumns(name, analyzed) + // Merge the descriptor `metric_view.*` properties (`from.type`, `from.name`/`from.sql`, + // `where`) into the user-supplied properties so v1 DESCRIBE TABLE EXTENDED surfaces the + // same descriptor rows as the v2 path in `DataSourceV2Strategy`. + val mergedProps = properties ++ metricView.getProperties catalog.createTable( ViewHelper.prepareTable( sparkSession, name, Some(originalText), analyzed, userSpecifiedColumns, - properties, SchemaUnsupported, comment, + mergedProps, SchemaUnsupported, comment, None, isMetricView = true), ignoreIfExists = allowExisting) Seq.empty } + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = { copy(child = newChild) } @@ -73,25 +94,110 @@ case class CreateMetricViewCommand( case class AlterMetricViewCommand(child: LogicalPlan, originalText: String) object MetricViewHelper { + + /** + * Walks the analyzed plan to collect direct table/view dependencies. Each dependency is + * returned as a structural multi-part name (`Seq[String]`); v1 sources (resolved through + * the session catalog) are normalized to a stable 3-part shape + * `[spark_catalog, db, table]` -- `TableIdentifier.nameParts` returns 1, 2, or 3 parts + * depending on whether the analyzer captured the catalog / database, so without + * normalization the same source can produce a different shape across runs. v2 sources + * already arrive fully qualified (catalog + namespace + table) and are returned as-is so + * multi-level namespaces survive. + * + * Stops recursion at relation leaf nodes and persistent `View` nodes so only direct + * (not transitive) dependencies are recorded. + */ + private[execution] def collectTableDependencies(plan: LogicalPlan): Seq[Seq[String]] = { + val tables = scala.collection.mutable.ArrayBuffer.empty[Seq[String]] + def traverse(p: LogicalPlan): Unit = p match { + case v: View if !v.isTempView => + tables += qualifyV1(v.desc.identifier.nameParts) + case r: DataSourceV2Relation if r.catalog.isDefined && r.identifier.isDefined => + val ident = r.identifier.get + // V2 catalogs may have multi-level namespaces; preserve the full arity rather than + // dot-joining the namespace into a single component. + tables += (r.catalog.get.name() +: ident.namespace().toIndexedSeq) :+ ident.name() + case r: HiveTableRelation => + tables += qualifyV1(r.tableMeta.identifier.nameParts) + case r: LogicalRelation if r.catalogTable.isDefined => + tables += qualifyV1(r.catalogTable.get.identifier.nameParts) + case other => + other.children.foreach(traverse) + other.expressions.foreach(_.foreach { + case s: SubqueryExpression => traverse(s.plan) + case _ => + }) + } + traverse(plan) + tables.distinct.toSeq + } + + /** + * Normalizes v1 source identifiers to a stable 3-part `[spark_catalog, db, table]` shape. + * `TableIdentifier.nameParts` may return 1, 2, or 3 parts depending on whether the analyzer + * captured the catalog / database components, which would otherwise leak through to + * dependency consumers as nondeterministic arity. + */ + private def qualifyV1(parts: Seq[String]): Seq[String] = parts match { + case Seq(t) => Seq(SESSION_CATALOG_NAME, SessionCatalog.DEFAULT_DATABASE, t) + case Seq(db, t) => Seq(SESSION_CATALOG_NAME, db, t) + case Seq(_, _, _) => parts + case other => other // Unexpected arity; pass through unchanged. + } + + /** + * Analyzes a metric-view YAML body so the create / alter path can capture the source plan + * and its dependencies. Returns the analyzed plan together with the parsed [[MetricView]] + * descriptor (the latter is grabbed off the un-analyzed [[MetricViewPlaceholder]] before + * the analyzer rewrites it away, so callers needing the descriptor for property emission + * don't have to re-parse the YAML). + * + * `nameParts` is the multi-part target identifier (catalog + namespace + table). The synthetic + * [[CatalogTable]] used as analysis context still carries a [[TableIdentifier]] (capped at + * 3 parts: catalog + database + table); for multi-level v2 namespaces we collapse the + * intermediate namespace components into the synthetic `database` slot. The synthetic identifier + * is not used to resolve the view body itself, so this collapse is observationally invisible to + * the analyzed plan; `verifyTemporaryObjectsNotExists` continues to receive the full + * `nameParts` so error messages still render the multi-part form. + */ def analyzeMetricViewText( session: SparkSession, - name: TableIdentifier, - viewText: String): LogicalPlan = { + nameParts: Seq[String], + viewText: String): (LogicalPlan, MetricView) = { val analyzer = session.sessionState.analyzer + val syntheticIdent = nameParts match { + case Seq(table) => + TableIdentifier(table) + case Seq(db, table) => + TableIdentifier(table, Some(db)) + case parts => + // 3+ parts: catalog is the head, table is the last, the middle (1..n-1) collapses + // into the synthetic `database` slot. We dot-join the intermediate components so a + // human inspecting the synthetic identifier can still see them. + TableIdentifier( + parts.last, + Some(parts.slice(1, parts.length - 1).mkString(".")), + Some(parts.head)) + } // this metadata is used for analysis check, it'll be replaced during create/update with // more accurate information val tableMeta = CatalogTable( - identifier = name, + identifier = syntheticIdent, tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, schema = new StructType(), viewOriginalText = Some(viewText), viewText = Some(viewText)) - val metricViewNode = MetricViewPlanner.planWrite( + val placeholder = MetricViewPlanner.planWrite( tableMeta, viewText, session.sessionState.sqlParser) - val analyzed = analyzer.executeAndCheck(metricViewNode, new QueryPlanningTracker) + // Grab the parsed descriptor BEFORE analysis -- the placeholder gets replaced by + // ResolvedMetricView during analyzer rules, after which `MetricView` is no longer + // recoverable from the plan tree. + val metricView = placeholder.desc + val analyzed = analyzer.executeAndCheck(placeholder, new QueryPlanningTracker) ViewHelper.verifyTemporaryObjectsNotExists( - isTemporary = false, name.nameParts, analyzed, Seq.empty) - analyzed + isTemporary = false, nameParts, analyzed, Seq.empty) + (analyzed, metricView) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index fb2dc0a684943..ca534706635a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -103,7 +103,7 @@ case class CreateTableLikeCommand( provider } else if (fileFormat.inputFormat.isDefined) { Some(DDLUtils.HIVE_PROVIDER) - } else if (sourceTableDesc.tableType == CatalogTableType.VIEW) { + } else if (sourceTableDesc.isViewLike) { Some(sparkSession.sessionState.conf.defaultDataSourceName) } else { sourceTableDesc.provider @@ -267,7 +267,7 @@ case class AlterTableAddColumnsCommand( table: TableIdentifier): CatalogTable = { val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) - if (catalogTable.tableType == CatalogTableType.VIEW) { + if (catalogTable.isViewLike) { throw QueryCompilationErrors.alterAddColNotSupportViewError(table) } @@ -730,7 +730,7 @@ case class DescribeTableCommand( catalog: SessionCatalog, metadata: CatalogTable, result: ArrayBuffer[Row]): Unit = { - if (metadata.tableType == CatalogTableType.VIEW) { + if (metadata.isViewLike) { throw QueryCompilationErrors.descPartitionNotAllowedOnView(table.identifier) } DDLUtils.verifyPartitionProviderIsHive(spark, metadata, "DESC PARTITION") @@ -1210,6 +1210,14 @@ case class ShowCreateTableCommand( } else { val tableMetadata = catalog.getTableRawMetadata(table) + // SHOW CREATE TABLE / VIEW does not have a WITH METRICS round-trippable form yet, + // so explicitly reject metric views rather than emit a misleading `CREATE VIEW` + // statement that loses the METRIC_VIEW kind. Tracked as follow-up. + if (tableMetadata.tableType == METRIC_VIEW) { + throw QueryCompilationErrors.showCreateTableNotSupportedOnMetricViewError( + table.identifier) + } + // TODO: [SPARK-28692] unify this after we unify the // CREATE TABLE syntax for hive serde and data source table. val metadata = if (DDLUtils.isDatasourceTable(tableMetadata)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 807342c0e90c2..8407f20777d98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -172,7 +172,7 @@ case class CreateViewCommand( if (allowExisting) { // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view // already exists. - } else if (tableMetadata.tableType != CatalogTableType.VIEW) { + } else if (!tableMetadata.isViewLike) { throw QueryCompilationErrors.unsupportedCreateOrReplaceViewOnTableError( name.nameParts, replace) } else if (replace) { @@ -866,24 +866,23 @@ object ViewHelper extends SQLConfHelper with Logging with CapturesConfig { if (originalText.isEmpty) { throw QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError() } + // For metric views, preserve the per-column metadata (`metric_view.type` / `metric_view.expr`) + // that the analyzer attaches to each dimension/measure `Alias`, even when the user supplies + // column names with comments. val aliasedSchema = CharVarcharUtils.getRawSchema( - aliasPlan(session, analyzedPlan, userSpecifiedColumns).schema, session.sessionState.conf) + aliasPlan(session, analyzedPlan, userSpecifiedColumns, retainMetadata = isMetricView).schema, + session.sessionState.conf) val newProperties = generateViewProperties( properties, session, analyzedPlan.schema.fieldNames, aliasedSchema.fieldNames, viewSchemaMode) - // Add property to indicate if this is a metric view - val finalProperties = if (isMetricView) { - newProperties + (CatalogTable.VIEW_WITH_METRICS -> "true") - } else { - newProperties - } + val tableType = if (isMetricView) CatalogTableType.METRIC_VIEW else CatalogTableType.VIEW CatalogTable( identifier = name, - tableType = CatalogTableType.VIEW, + tableType = tableType, storage = CatalogStorageFormat.empty, schema = aliasedSchema, - properties = finalProperties, + properties = newProperties, viewOriginalText = originalText, viewText = originalText, comment = comment, @@ -894,18 +893,30 @@ object ViewHelper extends SQLConfHelper with Logging with CapturesConfig { /** * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, * else return the analyzed plan directly. + * + * When `retainMetadata` is true, any existing column metadata on the analyzed attribute + * (for example the `metric_view.type` / `metric_view.expr` keys the analyzer attaches to + * metric-view columns) is preserved in the re-aliased projection. The no-comment branch + * already preserves `attr.metadata` transitively via `child.metadata` on the new `Alias`; + * the comment branch needs an explicit merge because it sets `explicitMetadata` to a + * freshly constructed metadata object. */ def aliasPlan( session: SparkSession, analyzedPlan: LogicalPlan, - userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { + userSpecifiedColumns: Seq[(String, Option[String])], + retainMetadata: Boolean = false): LogicalPlan = { if (userSpecifiedColumns.isEmpty) { analyzedPlan } else { val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { case (attr, (colName, None)) => Alias(attr, colName)() case (attr, (colName, Some(colComment))) => - val meta = new MetadataBuilder().putString("comment", colComment).build() + val builder = new MetadataBuilder() + if (retainMetadata) { + builder.withMetadata(attr.metadata) + } + val meta = builder.putString("comment", colComment).build() Alias(attr, colName)(explicitMetadata = Some(meta)) } session.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index d1f61599e7ac8..ff6c1e067406d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -169,7 +169,7 @@ case class PreprocessTableCreation(catalog: SessionCatalog) extends Rule[Logical val tableName = tableIdentWithDB.unquotedString val existingTable = catalog.getTableMetadata(tableIdentWithDB) - if (existingTable.tableType == CatalogTableType.VIEW) { + if (existingTable.isViewLike) { throw QueryCompilationErrors.saveDataIntoViewNotAllowedError() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala new file mode 100644 index 0000000000000..d25239aac949d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.CurrentUserContext +import org.apache.spark.sql.catalyst.analysis.{SchemaUnsupported, ViewSchemaMode} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.catalog.{DependencyList, Identifier, TableSummary, ViewCatalog} + +/** + * Physical plan node for `CREATE VIEW ... WITH METRICS` on a v2 [[ViewCatalog]]. Inherits the + * shared CREATE-side `run()` (viewExists short-circuit, OR REPLACE, cross-type collision + * decoding) from [[V2CreateViewPreparation]]; only supplies the metric-view-specific bits + * (no collation, schema-mode UNSUPPORTED, typed view dependencies, `PROP_TABLE_TYPE = + * METRIC_VIEW`) via the [[V2ViewPreparation]] hooks. + * + * Routed by [[DataSourceV2Strategy]] from + * [[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] when the resolved + * catalog is a non-session v2 catalog. + */ +case class CreateV2MetricViewExec( + catalog: ViewCatalog, + identifier: Identifier, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + userProperties: Map[String, String], + originalText: String, + query: LogicalPlan, + allowExisting: Boolean, + replace: Boolean, + deps: Option[DependencyList]) extends V2CreateViewPreparation { + + // Metric views don't carry a default-collation override. + override def collation: Option[String] = None + + // CREATE stamps the current user, matching the v1 metric-view path (which goes through + // ViewHelper.prepareTable -> CatalogTable.owner default) and CreateV2ViewExec. + override def owner: Option[String] = Some(CurrentUserContext.getCurrentUser) + + // Metric views always have schema-mode UNSUPPORTED (mirroring the v1 path which passes + // SchemaUnsupported into ViewHelper.prepareTable). + override def viewSchemaMode: ViewSchemaMode = SchemaUnsupported + + override protected def viewDependencies: Option[DependencyList] = deps + + override protected def tableType: Option[String] = + Some(TableSummary.METRIC_VIEW_TABLE_TYPE) + + // The analyzer attaches `metric_view.type` / `metric_view.expr` keys to each output + // attribute's metadata; `aliasPlan`'s default re-projection drops them when the user + // supplies a column-rename clause. Mirror v1 `ViewHelper.prepareTable(isMetricView = true)` + // by retaining metadata across the rename. + override protected def retainColumnMetadata: Boolean = true +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala index 6cfa95a2eaf43..4e10c7d3ab284 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, SchemaEvoluti import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CharVarcharUtils -import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, ViewCatalog, ViewInfo} +import org.apache.spark.sql.connector.catalog.{DependencyList, Identifier, TableCatalog, ViewCatalog, ViewInfo} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.{CommandUtils, ViewHelper} @@ -57,6 +57,21 @@ private[v2] trait V2ViewPreparation extends LeafV2CommandExec { protected lazy val fullNameParts: Seq[String] = (catalog.name() +: identifier.asMultipartIdentifier).toSeq + /** Optional structured dependency list to stamp on the built `ViewInfo`. */ + protected def viewDependencies: Option[DependencyList] = None + + /** Optional view sub-kind to stamp on `PROP_TABLE_TYPE`; defaults to `VIEW` when `None`. */ + protected def tableType: Option[String] = None + + /** + * Whether `aliasPlan` should preserve any column metadata the analyzer attached to the + * source plan when re-aliasing user-specified column names. Plain views default to `false` + * (matches v1 `CreateViewCommand`); metric views override to `true` so the analyzer-injected + * `metric_view.type` / `metric_view.expr` keys survive a `CREATE VIEW (c1, c2, ...)` + * column rename (matches v1 `ViewHelper.prepareTable(isMetricView = true)`). + */ + protected def retainColumnMetadata: Boolean = false + override def output: Seq[Attribute] = Seq.empty protected def buildViewInfo(): ViewInfo = { @@ -79,7 +94,9 @@ private[v2] trait V2ViewPreparation extends LeafV2CommandExec { SchemaUtils.checkIndeterminateCollationInSchema(query.schema) val aliasedSchema = CharVarcharUtils.getRawSchema( - aliasPlan(session, query, userSpecifiedColumns).schema, session.sessionState.conf) + aliasPlan(session, query, userSpecifiedColumns, retainMetadata = retainColumnMetadata) + .schema, + session.sessionState.conf) SchemaUtils.checkColumnNameDuplication( aliasedSchema.fieldNames.toImmutableArraySeq, session.sessionState.conf.resolver) @@ -106,6 +123,8 @@ private[v2] trait V2ViewPreparation extends LeafV2CommandExec { owner.foreach(builder.withOwner) comment.foreach(builder.withComment) collation.foreach(builder.withCollation) + viewDependencies.foreach(builder.withViewDependencies) + tableType.foreach(builder.withTableType) builder.build() } @@ -114,27 +133,19 @@ private[v2] trait V2ViewPreparation extends LeafV2CommandExec { } /** - * Physical plan node for CREATE VIEW on a v2 [[ViewCatalog]]. Dispatches to - * [[ViewCatalog#createView]] for plain CREATE, [[ViewCatalog#createOrReplaceView]] for - * `OR REPLACE`, and short-circuits `IF NOT EXISTS` early via [[ViewCatalog#viewExists]] so - * the view body isn't analyzed when the view already exists. + * Shared CREATE-side `run()` for v2 view-create execs. Adds the `IF NOT EXISTS` short-circuit + * via [[ViewCatalog#viewExists]], dispatches `OR REPLACE` to + * [[ViewCatalog#createOrReplaceView]] vs. plain CREATE to [[ViewCatalog#createView]], and + * decodes `ViewAlreadyExistsException` into the dedicated cross-type collision error when a + * non-view table sits at the ident in a mixed catalog. Subclasses supply only the + * view-shape-specific fields (`allowExisting`, `replace`, plus the [[V2ViewPreparation]] hooks + * such as `viewDependencies` / `tableType`) and inherit `run()` unchanged. */ -case class CreateV2ViewExec( - catalog: ViewCatalog, - identifier: Identifier, - userSpecifiedColumns: Seq[(String, Option[String])], - comment: Option[String], - collation: Option[String], - userProperties: Map[String, String], - originalText: String, - query: LogicalPlan, - allowExisting: Boolean, - replace: Boolean, - viewSchemaMode: ViewSchemaMode) extends V2ViewPreparation { - - override def owner: Option[String] = Some(CurrentUserContext.getCurrentUser) +private[v2] trait V2CreateViewPreparation extends V2ViewPreparation { + def allowExisting: Boolean + def replace: Boolean - override protected def run(): Seq[InternalRow] = { + override final protected def run(): Seq[InternalRow] = { // CREATE VIEW IF NOT EXISTS: short-circuit before `buildViewInfo` if a view already sits // at the ident -- avoids `aliasPlan` / config capture for the common no-op case (matches // v1 `CreateViewCommand.run`). The mixed-catalog "table at ident" no-op is handled in the @@ -173,3 +184,25 @@ case class CreateV2ViewExec( Seq.empty } } + +/** + * Physical plan node for CREATE VIEW on a v2 [[ViewCatalog]]. Inherits the create-side + * `run()` (viewExists short-circuit + OR REPLACE + cross-type decoding) from + * [[V2CreateViewPreparation]]; only supplies the case-class fields and stamps the current + * user as owner. + */ +case class CreateV2ViewExec( + catalog: ViewCatalog, + identifier: Identifier, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + collation: Option[String], + userProperties: Map[String, String], + originalText: String, + query: LogicalPlan, + allowExisting: Boolean, + replace: Boolean, + viewSchemaMode: ViewSchemaMode) extends V2CreateViewPreparation { + + override def owner: Option[String] = Some(CurrentUserContext.getCurrentUser) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index d9e915f82e07f..e113475811092 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.TreePattern.SCALAR_SUBQUERY import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, GeneratedColumn, IdentityColumn, ResolveDefaultColumns, ResolveTableConstraints, V2ExpressionBuilder} import org.apache.spark.sql.classic.SparkSession -import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TruncatableTable, V1Table, V1ViewInfo, ViewCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Dependency, DependencyList, Identifier, StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, SupportsPartitionManagement, SupportsWrite, TableCapability, TableCatalog, TableSummary, TruncatableTable, V1Table, V1ViewInfo, ViewCatalog} import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.index.SupportsIndex import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue} @@ -44,7 +44,7 @@ import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBat import org.apache.spark.sql.connector.write.V1Write import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, ScalarSubquery => ExecScalarSubquery, SparkPlan, SparkStrategy => Strategy} -import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.execution.command.{CommandUtils, CreateMetricViewCommand, MetricViewHelper} import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelationWithTable, PushableColumnAndNestedColumn} import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} import org.apache.spark.sql.internal.SQLConf @@ -323,6 +323,39 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat CreateV2ViewExec(catalog.asInstanceOf[ViewCatalog], ident, userSpecifiedColumns, comment, collation, properties, sqlText, child, allowExisting, replace, viewSchemaMode) :: Nil + // CREATE VIEW ... WITH METRICS on a non-session v2 catalog. Routes the metric-view path + // through `CreateV2MetricViewExec`, which extends `V2ViewPreparation` to share the + // `IF NOT EXISTS` short-circuit, `OR REPLACE`, and cross-type-collision decoding with + // `CreateV2ViewExec`. Session-catalog dispatch stays in `CreateMetricViewCommand.run`. + case CreateMetricViewCommand( + ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment, properties, + originalText, allowExisting, replace) if !CatalogV2Util.isSessionCatalog(catalog) => + val viewCatalog = catalog match { + case vc: ViewCatalog => vc + case _ => throw QueryCompilationErrors.missingCatalogViewsAbilityError(catalog) + } + // Parse + analyze the YAML body here (during planning). This mirrors the v1 path's + // late analysis in `CreateMetricViewCommand.run` -- the metric-view source plan is not + // a SQL string, so it can't ride along as a regular `query` `LogicalPlan` field on the + // logical command the way `CreateView` does. Pass the full multi-part name so v2 metric + // views with multi-level-namespace targets analyze correctly (`asTableIdentifier` would + // throw `requiresSinglePartNamespaceError` for namespace arity > 1). + val nameParts = (catalog.name() +: ident.namespace().toIndexedSeq) :+ ident.name() + val (analyzed, metricView) = MetricViewHelper.analyzeMetricViewText( + session, nameParts, originalText) + val mergedProps = properties ++ metricView.getProperties + val depParts = MetricViewHelper.collectTableDependencies(analyzed) + // Always emit a `Some(DependencyList)` for metric views (even when `depParts` is empty, + // e.g. `SQLSource("SELECT 1 AS x")`): per `DependencyList`'s contract, `null` means + // "no dependency list was supplied" while an empty list means "supplied but the + // object has none". Metric-view CREATE always *computes* deps, so the right empty + // representation is `Some(empty list)`, not `None`. + val sparkDeps: Array[Dependency] = + depParts.map(parts => Dependency.table(parts.toArray): Dependency).toArray + val deps = Some(DependencyList.of(sparkDeps)) + CreateV2MetricViewExec(viewCatalog, ident, userSpecifiedColumns, comment, mergedProps, + originalText, analyzed, allowExisting, replace, deps) :: Nil + case AlterViewAs(rpv @ ResolvedPersistentView(catalog, ident, _), originalText, query, _, _) => AlterV2ViewExec(catalog.asInstanceOf[ViewCatalog], ident, rpv.info, @@ -354,6 +387,19 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat RenameV2ViewExec( catalog.asInstanceOf[ViewCatalog], ident, newName.asIdentifier) :: Nil + case ShowCreateTable(rpv @ ResolvedPersistentView(catalog, ident, _), _, _) + if rpv.info.properties.get(TableCatalog.PROP_TABLE_TYPE) == + TableSummary.METRIC_VIEW_TABLE_TYPE => + // SHOW CREATE TABLE on a metric view is explicitly unsupported: `ShowCreateV2ViewExec` + // would emit a plain `CREATE VIEW AS `, which is not a round-trippable + // metric-view DDL form (the right form is `CREATE VIEW WITH METRICS LANGUAGE + // YAML AS $$ $$`). Reject up front with the same dedicated error class the v1 + // path uses (`UNSUPPORTED_SHOW_CREATE_TABLE.ON_METRIC_VIEW`) so users see the same + // actionable message regardless of catalog kind. + val quoted = (catalog.name() +: ident.asMultipartIdentifier) + .map(quoteIfNeeded).mkString(".") + throw QueryCompilationErrors.showCreateTableNotSupportedOnMetricViewError(quoted) + case ShowCreateTable(rpv @ ResolvedPersistentView(catalog, ident, _), _, output) => val quoted = (catalog.name() +: ident.asMultipartIdentifier).map(quoteIfNeeded).mkString(".") ShowCreateV2ViewExec(output, quoted, rpv.info) :: Nil @@ -564,7 +610,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case DropTable(r: ResolvedIdentifier, ifExists, purge) => val invalidateFunc = () => CommandUtils.uncacheTableOrView(session, r) - DropTableExec(r.catalog.asTableCatalog, r.identifier, ifExists, purge, invalidateFunc) :: Nil + DropTableExec( + r.catalog.asTableCatalog, r.identifier, ifExists, purge, invalidateFunc) :: Nil case _: NoopCommand => LocalTableScanExec(Nil, Nil, None) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index d21b5c730f0ca..c268cd963b802 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -334,8 +334,9 @@ class V2SessionCatalog(catalog: SessionCatalog) private def dropTableInternal(ident: Identifier, purge: Boolean = false): Boolean = { try { loadTable(ident) match { - case V1Table(v1Table) if v1Table.tableType == CatalogTableType.VIEW && - !SQLConf.get.getConf(SQLConf.DROP_TABLE_VIEW_ENABLED) => + case V1Table(v1Table) + if v1Table.isViewLike && + !SQLConf.get.getConf(SQLConf.DROP_TABLE_VIEW_ENABLED) => throw QueryCompilationErrors.wrongCommandForObjectTypeError( operation = "DROP TABLE", requiredType = s"${CatalogTableType.EXTERNAL.name} or" + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ViewInspectionExecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ViewInspectionExecs.scala index 93227a01839b7..d1ceeba833ea7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ViewInspectionExecs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ViewInspectionExecs.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIfNeeded, ResolveDefaultColumns} import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog, ViewInfo} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog, TableSummary, ViewInfo} import org.apache.spark.sql.errors.QueryCompilationErrors /** @@ -169,6 +169,15 @@ case class DescribeV2ViewExec( result += toCatalystRow("", "", "") result += toCatalystRow("# Detailed View Information", "", "") addIdentifierRows(result, catalogName, identifier, entityLabel = "View") + // Surface the view sub-kind so users see whether they're looking at a plain VIEW + // or a sub-kind like METRIC_VIEW. `ViewInfo`'s constructor unconditionally stamps + // `PROP_TABLE_TYPE` (defaulting to `VIEW`), so this row is always present and + // matches v1 `CatalogTable.toJsonLinkedHashMap`'s `Type` row for parity. + result += toCatalystRow( + "Type", + Option(viewInfo.properties.get(TableCatalog.PROP_TABLE_TYPE)) + .getOrElse(TableSummary.VIEW_TABLE_TYPE), + "") // Promote first-class reserved fields (Owner / Comment / Collation) to top-level rows // before the EXTENDED Properties block, mirroring v1 `CatalogTable.toJsonLinkedHashMap` // which renders these as their own rows rather than burying them in `Table Properties`. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala new file mode 100644 index 0000000000000..54caafe7b5f15 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.concurrent.ConcurrentHashMap + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.catalyst.analysis.{NoSuchViewException, ViewAlreadyExistsException} +import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTableCatalog, MetadataTable, Table, TableCatalog, TableDependency, TableSummary, TableViewCatalog, ViewInfo} +import org.apache.spark.sql.metricview.serde.{AssetSource, Column, Constants, DimensionExpression, MeasureExpression, MetricView, MetricViewFactory, SQLSource} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.Metadata + +/** + * Tests that exercise [[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a + * non-session V2 catalog. Metric views are persisted through the same [[ViewCatalog]] interface + * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE = METRIC_VIEW` + * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording catalog used here is a + * minimal [[TableViewCatalog]] so the same instance can also host the source table referenced by + * the metric view's YAML. + */ +class MetricViewV2CatalogSuite extends QueryTest with SharedSparkSession { + + import testImplicits._ + + private val testCatalogName = "testcat" + private val testNamespace = "ns" + private val sourceTableName = "events" + private val fullSourceTableName = + s"$testCatalogName.$testNamespace.$sourceTableName" + private val metricViewName = "mv" + private val fullMetricViewName = + s"$testCatalogName.$testNamespace.$metricViewName" + + private val metricViewColumns = Seq( + Column("region", DimensionExpression("region"), 0), + Column("count_sum", MeasureExpression("sum(count)"), 1)) + + private val testTableData = Seq( + ("region_1", 1, 5.0), + ("region_2", 2, 10.0)) + + override protected def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set( + s"spark.sql.catalog.$testCatalogName", + classOf[MetricViewRecordingCatalog].getName) + // A catalog that does not implement ViewCatalog - used for the negative gate test. + spark.conf.set( + s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}", + classOf[InMemoryTableCatalog].getName) + } + + override protected def afterAll(): Unit = { + spark.conf.unset(s"spark.sql.catalog.$testCatalogName") + spark.conf.unset( + s"spark.sql.catalog.${MetricViewV2CatalogSuite.noViewCatalogName}") + super.afterAll() + } + + private def withTestCatalogTables(body: => Unit): Unit = { + MetricViewRecordingCatalog.reset() + testTableData.toDF("region", "count", "price") + .createOrReplaceTempView("metric_view_v2_source") + try { + sql( + s"""CREATE TABLE $fullSourceTableName + |USING foo AS SELECT * FROM metric_view_v2_source""".stripMargin) + body + } finally { + // The metric-view ident `mv` may have ended up as either a view (most tests) or as a + // pre-created table (a few negative tests pre-create a table at the same ident to + // exercise cross-type collisions). Sweep both kinds so subsequent tests in the suite + // start from a clean catalog state. Wrap each DROP in a Try because: + // - DROP VIEW IF EXISTS on a leftover *table* throws WRONG_COMMAND_FOR_OBJECT_TYPE + // under master's new DropViewExec active-rejection contract. + // - DROP TABLE IF EXISTS on a leftover *view* throws the symmetric error. + // - On a totally clean state both are silent no-ops. + scala.util.Try(sql(s"DROP VIEW IF EXISTS $fullMetricViewName")) + scala.util.Try(sql(s"DROP TABLE IF EXISTS $fullMetricViewName")) + scala.util.Try(sql(s"DROP TABLE IF EXISTS $fullSourceTableName")) + spark.catalog.dropTempView("metric_view_v2_source") + MetricViewRecordingCatalog.reset() + } + } + + private def createMetricView( + name: String, + metricView: MetricView, + comment: Option[String] = None): String = { + val yaml = MetricViewFactory.toYAML(metricView) + val commentClause = comment.map(c => s"\nCOMMENT '$c'").getOrElse("") + sql( + s"""CREATE VIEW $name + |WITH METRICS$commentClause + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$""".stripMargin) + yaml + } + + private def capturedViewInfo(): ViewInfo = { + val ident = Identifier.of(Array(testNamespace), metricViewName) + val info = MetricViewRecordingCatalog.capturedViews.get(ident) + assert(info != null, + s"Expected ViewInfo for $ident to be captured by the V2 catalog") + info + } + + // ============================================================ + // Section 1: CREATE-related tests + // ============================================================ + + + test("V2 catalog receives METRIC_VIEW table type and view text via ViewInfo") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = createMetricView(fullMetricViewName, metricView) + + val info = capturedViewInfo() + // PROP_TABLE_TYPE is overwritten to METRIC_VIEW after `ViewInfo`'s constructor stamps it + // to VIEW; this is the marker `V1Table.toCatalogTable` reads to map the round-tripped row + // back to `CatalogTableType.METRIC_VIEW`. + assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE) + === TableSummary.METRIC_VIEW_TABLE_TYPE) + // The captured queryText is the raw text between `$$ ... $$` -- including the leading + // and trailing newline our SQL fixture inserts -- so trim before comparing to the + // pre-substitution YAML body. + assert(info.queryText().trim === yaml.trim) + + val deps = info.viewDependencies() + assert(deps != null) + assert(deps.dependencies().length === 1) + val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency] + assert(tableDep.nameParts().toSeq === + Seq(testCatalogName, testNamespace, sourceTableName)) + } + } + + test("V2 catalog path populates metric_view.* + view context + sql configs on ViewInfo") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = Some("count > 0"), + select = metricViewColumns) + createMetricView(fullMetricViewName, metricView) + + val info = capturedViewInfo() + val props = info.properties() + + // metric_view.* descriptive properties (mirrors the canonical metric-view property + // layout). + assert(props.get(MetricView.PROP_FROM_TYPE) === "ASSET") + assert(props.get(MetricView.PROP_FROM_NAME) === fullSourceTableName) + assert(props.get(MetricView.PROP_FROM_SQL) === null) + assert(props.get(MetricView.PROP_WHERE) === "count > 0") + + // SQL configs and current catalog/namespace are first-class typed fields on ViewInfo, no + // longer encoded into properties for V2 catalogs. + assert(info.sqlConfigs().size > 0, + s"Expected at least one captured SQL config; got ${info.sqlConfigs()}") + assert(info.currentCatalog() === + spark.sessionState.catalogManager.currentCatalog.name()) + assert(info.currentNamespace().toSeq === + spark.sessionState.catalogManager.currentNamespace.toSeq) + } + } + + test("V2 catalog path captures SQL source and comment") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + SQLSource(s"SELECT * FROM $fullSourceTableName"), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, metricView, comment = Some("my mv")) + + val info = capturedViewInfo() + val props = info.properties() + assert(props.get(TableCatalog.PROP_TABLE_TYPE) + === TableSummary.METRIC_VIEW_TABLE_TYPE) + assert(props.get(MetricView.PROP_FROM_TYPE) === "SQL") + assert(props.get(MetricView.PROP_FROM_NAME) === null) + assert(props.get(MetricView.PROP_FROM_SQL) === + s"SELECT * FROM $fullSourceTableName") + assert(props.get(TableCatalog.PROP_COMMENT) === "my mv") + + val deps = info.viewDependencies() + assert(deps != null && deps.dependencies().length === 1) + val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency] + assert(tableDep.nameParts().toSeq === + Seq(testCatalogName, testNamespace, sourceTableName)) + } + } + + test("metric view columns carry metric_view.type / metric_view.expr in column metadata") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, metricView) + + val cols = capturedViewInfo().columns() + assert(cols.length === metricViewColumns.length) + + val byName = cols.map(c => c.name() -> c).toMap + def metadataOf(name: String): Metadata = + Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}")) + + val regionMeta = metadataOf("region") + assert(regionMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "dimension") + assert(regionMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === "region") + + val countMeta = metadataOf("count_sum") + assert(countMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "measure") + assert(countMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === "sum(count)") + } + } + + test("user-specified column names with comments preserve metric_view.* metadata") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = MetricViewFactory.toYAML(metricView) + // Pins aliasPlan(retainMetadata = true): metric_view.* keys must survive a column + // rename with comments. + sql( + s"""CREATE VIEW $fullMetricViewName (reg COMMENT 'region alias', n COMMENT 'count') + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$""".stripMargin) + + val cols = capturedViewInfo().columns() + val byName = cols.map(c => c.name() -> c).toMap + assert(byName.keySet === Set("reg", "n")) + + def metadataOf(name: String): Metadata = + Metadata.fromJson(Option(byName(name).metadataInJSON()).getOrElse("{}")) + + val regMeta = metadataOf("reg") + assert(regMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "dimension") + assert(regMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === "region") + // `CatalogV2Util.structTypeToV2Columns` peels "comment" off into `Column.comment()` + // rather than leaving it inside `metadataInJSON`; assert via the V2 column accessor. + assert(byName("reg").comment() === "region alias") + + val nMeta = metadataOf("n") + assert(nMeta.getString(Constants.COLUMN_TYPE_PROPERTY_KEY) === "measure") + assert(nMeta.getString(Constants.COLUMN_EXPR_PROPERTY_KEY) === "sum(count)") + assert(byName("n").comment() === "count") + } + } + + test("CREATE OR REPLACE VIEW ... WITH METRICS replaces an existing v2 metric view") { + withTestCatalogTables { + val first = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = Some("count > 0"), + select = metricViewColumns) + createMetricView(fullMetricViewName, first) + + // Replace with a new body (different WHERE clause). + val replacement = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = Some("count > 100"), + select = metricViewColumns) + val replacementYaml = MetricViewFactory.toYAML(replacement) + sql( + s"""CREATE OR REPLACE VIEW $fullMetricViewName + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$replacementYaml + |$$$$""".stripMargin) + + val finalInfo = capturedViewInfo() + // Assert on the distinguishing fields of the replacement, not on diff vs. the original. + // queryText keeps the surrounding `\n` from the SQL `$$ ... $$` markers; trim first. + assert(finalInfo.queryText().trim === replacementYaml.trim) + assert(finalInfo.properties().get(MetricView.PROP_WHERE) === "count > 100") + val deps = finalInfo.viewDependencies() + assert(deps != null && deps.dependencies().length === 1) + val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency] + assert(tableDep.nameParts().toSeq === + Seq(testCatalogName, testNamespace, sourceTableName)) + } + } + + test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when the view exists") { + withTestCatalogTables { + val original = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, original) + val originalYaml = capturedViewInfo().queryText() + + // Now CREATE VIEW IF NOT EXISTS with a different YAML body. The catalog should not see + // the second create at all (V2ViewPreparation's `viewExists` short-circuit fires before + // `buildViewInfo`), so the captured ViewInfo retains the original body. + val replacement = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = Some("count > 999"), + select = metricViewColumns) + val replacementYaml = MetricViewFactory.toYAML(replacement) + sql( + s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$replacementYaml + |$$$$""".stripMargin) + + assert(capturedViewInfo().queryText().trim === originalYaml.trim, + "IF NOT EXISTS over an existing metric view should be a no-op.") + } + } + + test("CREATE VIEW ... WITH METRICS over a v2 table at the ident throws " + + "EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE") { + withTestCatalogTables { + // Pre-create a regular v2 table at the same ident the metric view will target. The + // catalog's `createView` call below should raise `ViewAlreadyExistsException`, which + // `CreateV2MetricViewExec` then decodes (via `tableExists`) into the precise cross-type + // collision error that `CreateV2ViewExec` emits. + sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo") + + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = MetricViewFactory.toYAML(mv) + val ex = intercept[AnalysisException] { + sql( + s"""CREATE VIEW $fullMetricViewName + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$""".stripMargin) + } + // SPARK-56655 added an analyzer-time pre-check for "ident already occupied by a table" + // before the v2 view-create exec runs, so the more specific + // `EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE` decoded by `CreateV2MetricViewExec.run`'s catch + // block is no longer reachable when a *plain* table sits at the ident -- the analyzer + // raises `TABLE_OR_VIEW_ALREADY_EXISTS` first. Both errors carry the same actionable + // signal ("can't create a view here because something else already lives at this ident"). + assert(ex.getCondition === "TABLE_OR_VIEW_ALREADY_EXISTS", + s"Expected TABLE_OR_VIEW_ALREADY_EXISTS, got ${ex.getCondition}: ${ex.getMessage}") + } + } + + test("CREATE VIEW IF NOT EXISTS ... WITH METRICS is a no-op when a v2 table sits at the " + + "ident") { + withTestCatalogTables { + sql(s"CREATE TABLE $fullMetricViewName (x INT) USING foo") + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = MetricViewFactory.toYAML(mv) + // IF NOT EXISTS over a table is a no-op (v1 parity), not an error. + sql( + s"""CREATE VIEW IF NOT EXISTS $fullMetricViewName + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$""".stripMargin) + val ident = Identifier.of(Array(testNamespace), metricViewName) + assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident), + "IF NOT EXISTS over a v2 table should not register a view in the catalog.") + } + } + + test("CREATE VIEW ... WITH METRICS on a non-ViewCatalog catalog fails with " + + "MISSING_CATALOG_ABILITY.VIEWS") { + val ex = intercept[AnalysisException] { + sql( + s"""CREATE VIEW ${MetricViewV2CatalogSuite.noViewCatalogName}.default.mv + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |${MetricViewFactory.toYAML(MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns))} + |$$$$""".stripMargin) + } + // SPARK-56655 added the `.VIEWS` subclass; the bare `MISSING_CATALOG_ABILITY` no longer + // surfaces directly for the missing-view-ability case. + assert(ex.getCondition === "MISSING_CATALOG_ABILITY.VIEWS") + assert(ex.getMessage.contains("VIEWS")) + } + + test("CREATE VIEW ... WITH METRICS at a multi-level-namespace v2 target succeeds") { + val deepNamespace = Array("ns_a", "ns_b") + val deepMetricViewName = "mv_deep" + val fullDeepName = + s"$testCatalogName.${deepNamespace.mkString(".")}.$deepMetricViewName" + withTestCatalogTables { + // Pre-create the multi-level namespace + a source table inside it. The metric view + // *target* lives in the same multi-level namespace -- that's what exercises the + // `MetricViewHelper.analyzeMetricViewText` lift to multi-part nameParts. The pre-lift + // code path failed at `ident.asTableIdentifier` with `requiresSinglePartNamespaceError`. + sql(s"CREATE NAMESPACE IF NOT EXISTS $testCatalogName.${deepNamespace.head}") + sql(s"CREATE NAMESPACE IF NOT EXISTS " + + s"$testCatalogName.${deepNamespace.mkString(".")}") + try { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = MetricViewFactory.toYAML(mv) + sql( + s"""CREATE VIEW $fullDeepName + |WITH METRICS + |LANGUAGE YAML + |AS + |$$$$ + |$yaml + |$$$$""".stripMargin) + + val deepIdent = Identifier.of(deepNamespace, deepMetricViewName) + val info = MetricViewRecordingCatalog.capturedViews.get(deepIdent) + assert(info != null, s"Expected ViewInfo for $deepIdent to be captured") + assert(info.properties().get(TableCatalog.PROP_TABLE_TYPE) + === TableSummary.METRIC_VIEW_TABLE_TYPE) + } finally { + scala.util.Try(sql(s"DROP VIEW IF EXISTS $fullDeepName")) + sql(s"DROP NAMESPACE IF EXISTS " + + s"$testCatalogName.${deepNamespace.mkString(".")} CASCADE") + sql(s"DROP NAMESPACE IF EXISTS $testCatalogName.${deepNamespace.head} CASCADE") + } + } + } + + // ============================================================ + // Section 2: Dependency extraction + // ============================================================ + + + test("dependency extraction: SQL source JOIN captures both tables") { + withTestCatalogTables { + val secondSource = s"$testCatalogName.$testNamespace.customers" + sql( + s"""CREATE TABLE $secondSource (id INT, name STRING) + |USING foo""".stripMargin) + try { + val joinSql = + s"SELECT c.name, t.count FROM $fullSourceTableName t " + + s"JOIN $secondSource c ON t.count = c.id" + val metricView = MetricView( + "0.1", + SQLSource(joinSql), + where = None, + select = Seq( + Column("name", DimensionExpression("name"), 0), + Column("count_sum", MeasureExpression("sum(count)"), 1))) + createMetricView(fullMetricViewName, metricView) + + val deps = capturedViewInfo().viewDependencies() + assert(deps != null) + val depParts = deps.dependencies() + .map(_.asInstanceOf[TableDependency].nameParts().toSeq).toSet + assert(depParts === Set( + Seq(testCatalogName, testNamespace, sourceTableName), + Seq(testCatalogName, testNamespace, "customers")), + s"Expected dependencies on both source tables, got $depParts") + } finally { + sql(s"DROP TABLE IF EXISTS $secondSource") + } + } + } + + test("dependency extraction: SQL source subquery deduplicates same-table references") { + withTestCatalogTables { + val subquerySql = + s"SELECT * FROM $fullSourceTableName " + + s"WHERE count > (SELECT avg(count) FROM $fullSourceTableName)" + val metricView = MetricView( + "0.1", + SQLSource(subquerySql), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, metricView) + + val deps = capturedViewInfo().viewDependencies() + assert(deps != null && deps.dependencies().length === 1, + s"Expected 1 deduplicated dependency, got " + + s"${Option(deps).map(_.dependencies().length).getOrElse(0)}") + val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency] + assert(tableDep.nameParts().toSeq === + Seq(testCatalogName, testNamespace, sourceTableName)) + } + } + + test("dependency extraction: SQL source self-join deduplicates same-table references") { + withTestCatalogTables { + val selfJoinSql = + s"SELECT a.region AS a_region, a.count AS a_count " + + s"FROM $fullSourceTableName a JOIN $fullSourceTableName b " + + s"ON a.region = b.region" + val metricView = MetricView( + "0.1", + SQLSource(selfJoinSql), + where = None, + select = Seq( + Column("region", DimensionExpression("a_region"), 0), + Column("count_sum", MeasureExpression("sum(a_count)"), 1))) + createMetricView(fullMetricViewName, metricView) + + val deps = capturedViewInfo().viewDependencies() + assert(deps != null && deps.dependencies().length === 1, + s"Expected 1 deduplicated dependency for self-join, got " + + s"${Option(deps).map(_.dependencies().length).getOrElse(0)}") + val tableDep = deps.dependencies()(0).asInstanceOf[TableDependency] + assert(tableDep.nameParts().toSeq === + Seq(testCatalogName, testNamespace, sourceTableName)) + } + } + + test("dependency extraction: V1 session-catalog source emits 3-part nameParts") { + val v1Source = "metric_view_v2_v1source" + spark.range(0, 5).toDF("v") + .write.mode("overwrite").saveAsTable(v1Source) + try { + withTestCatalogTables { + val mv = MetricView( + "0.1", + // SQL source resolves through the current (session) catalog; the resolved + // `LogicalRelation` carries a session-catalog `CatalogTable`. + SQLSource(s"SELECT v AS region, v AS count FROM $v1Source"), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + + val deps = capturedViewInfo().viewDependencies() + assert(deps != null && deps.dependencies().length === 1) + val parts = + deps.dependencies()(0).asInstanceOf[TableDependency].nameParts().toSeq + // `MetricViewHelper.qualifyV1` normalizes any `TableIdentifier.nameParts` shape + // (1, 2, or 3 parts depending on what the analyzer captured) to the stable + // `[spark_catalog, db, table]` shape so downstream consumers see deterministic + // arity per source kind. + assert(parts.length === 3, + s"V1 nameParts should normalize to exactly 3 parts, got ${parts.length}: $parts") + assert(parts.head === "spark_catalog", + s"V1 nameParts head should be the session-catalog name, got $parts") + assert(parts.last === v1Source, s"Last part should be the table name, got $parts") + } + } finally { + sql(s"DROP TABLE IF EXISTS $v1Source") + } + } + + test("dependency extraction: multi-level V2 namespace source emits N+2 nameParts") { + val multiNamespace = Array("ns_a", "ns_b") + val multiTable = "events_deep" + val multiFull = s"$testCatalogName.${multiNamespace.mkString(".")}.$multiTable" + withTestCatalogTables { + // The InMemoryTableCatalog (TableViewCatalog mixin) supports multi-level namespaces. + sql(s"CREATE NAMESPACE IF NOT EXISTS $testCatalogName.${multiNamespace.head}") + sql(s"CREATE NAMESPACE IF NOT EXISTS " + + s"$testCatalogName.${multiNamespace.mkString(".")}") + sql(s"CREATE TABLE $multiFull (region STRING, count INT) USING foo") + try { + val mv = MetricView( + "0.1", + SQLSource(s"SELECT region, count FROM $multiFull"), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + + val deps = capturedViewInfo().viewDependencies() + assert(deps != null && deps.dependencies().length === 1) + val parts = + deps.dependencies()(0).asInstanceOf[TableDependency].nameParts().toSeq + assert(parts === Seq(testCatalogName, multiNamespace(0), multiNamespace(1), multiTable), + s"Multi-level nameParts should preserve every namespace component, got $parts") + } finally { + sql(s"DROP TABLE IF EXISTS $multiFull") + sql(s"DROP NAMESPACE IF EXISTS " + + s"$testCatalogName.${multiNamespace.mkString(".")} CASCADE") + sql(s"DROP NAMESPACE IF EXISTS $testCatalogName.${multiNamespace.head} CASCADE") + } + } + } + + // ============================================================ + // Section 3: SELECT cases + // ============================================================ + + + test("SELECT measure(...) from a v2 metric view returns aggregated rows") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + // The fixture's `events` source has rows ("region_1", 1, 5.0), ("region_2", 2, 10.0). + // The metric view aggregates by `region` summing `count`. Resolution flows through + // loadTableOrView -> MetadataTable(ViewInfo) -> V1Table.toCatalogTable(ViewInfo) -> + // CatalogTableType.METRIC_VIEW -> ResolveMetricView, which rewrites the view body + // into Aggregate(Seq(region), Seq(sum(count) AS count_sum)) over `events`. The + // `measure(...)` wrapper is required for measure columns -- selecting `count_sum` + // bare would fail (mirrors the v1 `MetricViewSuite` query syntax). + checkAnswer( + sql(s"SELECT region, measure(count_sum) FROM $fullMetricViewName " + + "GROUP BY region ORDER BY region"), + sql(s"SELECT region, sum(count) FROM $fullSourceTableName " + + "GROUP BY region ORDER BY region")) + } + } + + test("SELECT measure(...) with a WHERE clause on a dimension") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + // Filter at the query layer (not on the metric view's own `where:`). + checkAnswer( + sql(s"SELECT measure(count_sum) FROM $fullMetricViewName " + + "WHERE region = 'region_2'"), + sql(s"SELECT sum(count) FROM $fullSourceTableName " + + "WHERE region = 'region_2'")) + } + } + + test("SELECT against a v2 metric view honors the view's pre-defined where clause") { + withTestCatalogTables { + // Pre-define a filter on the metric view itself: only rows with count > 1 should be + // visible to consumers (i.e. region_2 only). + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = Some("count > 1"), + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + checkAnswer( + sql(s"SELECT region, measure(count_sum) FROM $fullMetricViewName " + + "GROUP BY region ORDER BY region"), + sql(s"SELECT region, sum(count) FROM $fullSourceTableName " + + "WHERE count > 1 GROUP BY region ORDER BY region")) + } + } + + test("SELECT from a v2 metric view supports multiple measures with different aggregations") { + withTestCatalogTables { + // Add a second measure (sum of price) so we exercise the multi-measure rewrite path. + val cols = Seq( + Column("region", DimensionExpression("region"), 0), + Column("count_sum", MeasureExpression("sum(count)"), 1), + Column("price_sum", MeasureExpression("sum(price)"), 2), + Column("price_max", MeasureExpression("max(price)"), 3)) + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = cols) + createMetricView(fullMetricViewName, mv) + checkAnswer( + sql(s"SELECT measure(count_sum), measure(price_sum), measure(price_max) " + + s"FROM $fullMetricViewName"), + sql(s"SELECT sum(count), sum(price), max(price) FROM $fullSourceTableName")) + } + } + + test("SELECT from a v2 metric view supports ORDER BY and LIMIT on measures") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + checkAnswer( + sql(s"SELECT region, measure(count_sum) FROM $fullMetricViewName " + + "GROUP BY region ORDER BY 2 DESC LIMIT 1"), + sql(s"SELECT region, sum(count) FROM $fullSourceTableName " + + "GROUP BY region ORDER BY 2 DESC LIMIT 1")) + } + } + + // ============================================================ + // Section 4: DESCRIBE cases + // ============================================================ + + + test("DESCRIBE TABLE EXTENDED on a v2 metric view round-trips through loadTableOrView") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + val yaml = createMetricView(fullMetricViewName, mv) + + // DESCRIBE TABLE EXTENDED resolves the ident through `Analyzer.lookupTableOrView`, + // which calls `TableViewCatalog.loadTableOrView` once and gets back a + // `MetadataTable(ViewInfo)`. The analyzer wraps it as a `ResolvedPersistentView` and + // `DataSourceV2Strategy` routes through SPARK-56655's `DescribeV2ViewExec`, which + // reads the typed `ViewInfo` directly and emits the standard "Type" / "View Text" / + // "View Current Catalog" / "View Schema Mode" / etc. rows. Pins that `DescribeV2ViewExec` + // emits a "Type" row for parity with v1 `CatalogTable.toJsonLinkedHashMap`, so users + // can distinguish a plain VIEW from a sub-kind like METRIC_VIEW. + val rows = sql(s"DESCRIBE TABLE EXTENDED $fullMetricViewName").collect() + val rowMap = rows.map(r => r.getString(0) -> r.getString(1)).toMap + + assert(rowMap.contains("View Text"), + s"Expected 'View Text' row in DESCRIBE EXTENDED output, got keys: ${rowMap.keys}") + // `DescribeV2ViewExec` writes `viewInfo.queryText` directly, so trim handles the + // leading/trailing newline the SQL `$$ ... $$` fixture inserts vs. the bare yaml body. + assert(rowMap("View Text").trim === yaml.trim, + s"View Text should round-trip the YAML body, got: ${rowMap("View Text")}") + assert(rowMap.get("Type").contains(TableSummary.METRIC_VIEW_TABLE_TYPE), + s"Type row should reflect METRIC_VIEW, got: ${rowMap.get("Type")}") + } + } + + test("DESCRIBE TABLE on a v2 metric view returns the aliased columns") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + val rows = sql(s"DESCRIBE TABLE $fullMetricViewName").collect() + val byName = rows.map(r => r.getString(0) -> r.getString(1)).toMap + assert(byName.contains("region"), s"Missing 'region' col, got: ${byName.keys}") + assert(byName.contains("count_sum"), s"Missing 'count_sum' col, got: ${byName.keys}") + } + } + + // ============================================================ + // Section 5: DROP / SHOW cases + // ============================================================ + + + test("DROP VIEW succeeds on a V2 metric view") { + withTestCatalogTables { + val metricView = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, metricView) + val ident = Identifier.of(Array(testNamespace), metricViewName) + + assert(MetricViewRecordingCatalog.capturedViews.containsKey(ident)) + + sql(s"DROP VIEW $fullMetricViewName") + assert(!MetricViewRecordingCatalog.capturedViews.containsKey(ident)) + } + } + + test("DROP TABLE on a v2 metric view throws WRONG_COMMAND_FOR_OBJECT_TYPE") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + + // SPARK-56655's `DropTableExec` actively rejects with `WRONG_COMMAND_FOR_OBJECT_TYPE` + // ("Use DROP VIEW instead") when a view sits at the ident, replacing the prior + // `EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE` decoding. Same actionable signal for users. + val ex = intercept[AnalysisException] { + sql(s"DROP TABLE $fullMetricViewName") + } + assert(ex.getCondition === "WRONG_COMMAND_FOR_OBJECT_TYPE", + s"Expected WRONG_COMMAND_FOR_OBJECT_TYPE, got ${ex.getCondition}: ${ex.getMessage}") + assert(ex.getMessage.contains("DROP VIEW"), + s"Error message should mention 'DROP VIEW', got: ${ex.getMessage}") + + // The metric view is still present after the failed DROP TABLE. + val ident = Identifier.of(Array(testNamespace), metricViewName) + assert(MetricViewRecordingCatalog.capturedViews.containsKey(ident), + "DROP TABLE on a metric view must not delete it.") + } + } + + test("DROP TABLE IF EXISTS on a v2 metric view also throws WRONG_COMMAND_FOR_OBJECT_TYPE") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + + // IF EXISTS does not silence the wrong-type error: the entity exists, just not as a + // table. (Mirrors the v1 `DropTableCommand` behavior; `IF EXISTS` only short-circuits + // the not-found branch.) + val ex = intercept[AnalysisException] { + sql(s"DROP TABLE IF EXISTS $fullMetricViewName") + } + assert(ex.getCondition === "WRONG_COMMAND_FOR_OBJECT_TYPE", + s"Expected WRONG_COMMAND_FOR_OBJECT_TYPE, got ${ex.getCondition}: ${ex.getMessage}") + } + } + + test("SHOW CREATE TABLE on a v2 metric view is unsupported") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + + // SHOW CREATE TABLE on a metric view is rejected with the dedicated + // UNSUPPORTED_SHOW_CREATE_TABLE.ON_METRIC_VIEW error class (same one the v1 path uses + // in `tables.scala`'s `ShowCreateTableCommand`), so the message is identical no matter + // which catalog kind owns the view. There's no round-trippable + // `CREATE VIEW ... WITH METRICS` form yet, so explicit "unsupported" is the right + // answer rather than emitting a misleading plain `CREATE VIEW ...`. + val ex = intercept[AnalysisException] { + sql(s"SHOW CREATE TABLE $fullMetricViewName") + } + assert(ex.getCondition === "UNSUPPORTED_SHOW_CREATE_TABLE.ON_METRIC_VIEW", + s"Expected UNSUPPORTED_SHOW_CREATE_TABLE.ON_METRIC_VIEW, got " + + s"${ex.getCondition}: ${ex.getMessage}") + assert(ex.getMessage.contains("metric view"), + s"Error message should mention 'metric view', got: ${ex.getMessage}") + } + } + + test("DROP VIEW IF EXISTS on a non-existent V2 metric view is a no-op") { + withTestCatalogTables { + sql(s"DROP VIEW IF EXISTS $testCatalogName.$testNamespace.does_not_exist") + } + } + + test("ALTER VIEW RENAME TO ... succeeds and preserves metric view metadata") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + // Per upstream DataSourceV2SQLSuite convention (see lines 2477 / 2484 there), the + // RENAME TO clause takes a 2-part `namespace.name` -- the new ident is implicitly + // within the same catalog as the source view. Including a 3-part `catalog.ns.name` + // would leak the catalog component into `newName.asIdentifier` and the catalog's + // `renameView` would store under a key the loader can't find. + val renamedRelative = s"$testNamespace.mv_renamed" + val renamedFull = s"$testCatalogName.$renamedRelative" + try { + // RenameTable on a `ResolvedPersistentView` is routed by `DataSourceV2Strategy` to + // `RenameV2ViewExec`, which calls `ViewCatalog.renameView` -- the fixture + // `MetricViewRecordingCatalog.renameView` relocates both the `views` entry and the + // `capturedViews` entry under the new ident. Pin the wiring end-to-end so the + // metric view kind survives the rename. + sql(s"ALTER VIEW $fullMetricViewName RENAME TO $renamedRelative") + + // Old ident is gone from the v2 catalog -- DESCRIBE should fail to resolve. + val oldEx = intercept[AnalysisException] { + sql(s"DESCRIBE TABLE $fullMetricViewName").collect() + } + assert(oldEx.getCondition === "TABLE_OR_VIEW_NOT_FOUND", + s"Expected TABLE_OR_VIEW_NOT_FOUND for the old ident, got " + + s"${oldEx.getCondition}: ${oldEx.getMessage}") + + // New ident loads through `TableViewCatalog.loadTableOrView` and surfaces the same + // metric-view kind on `DESCRIBE TABLE EXTENDED`. + val rows = sql(s"DESCRIBE TABLE EXTENDED $renamedFull").collect() + val rowMap = rows.map(r => r.getString(0) -> r.getString(1)).toMap + assert(rowMap.get("Type").contains(TableSummary.METRIC_VIEW_TABLE_TYPE), + s"Renamed view should still be a METRIC_VIEW, got Type=${rowMap.get("Type")}") + } finally { + sql(s"DROP VIEW IF EXISTS $renamedFull") + } + } + } + + test("SHOW TABLES on a v2 TableViewCatalog lists both tables and metric views") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + val tables = sql(s"SHOW TABLES IN $testCatalogName.$testNamespace") + .collect().map(_.getString(1)).toSet + // SPARK-56655 routes SHOW TABLES on a `TableViewCatalog` through `listRelationSummaries` + // so views appear alongside tables in the output (matching v1 SHOW TABLES on a session + // catalog). Pure `TableCatalog` catalogs continue to return tables only. + assert(tables.contains(sourceTableName), + s"SHOW TABLES should list the source table, got: $tables") + assert(tables.contains(metricViewName), + s"SHOW TABLES on a TableViewCatalog should also list metric views, got: $tables") + } + } + + test("SHOW VIEWS lists v2 metric views") { + withTestCatalogTables { + val mv = MetricView( + "0.1", + AssetSource(fullSourceTableName), + where = None, + select = metricViewColumns) + createMetricView(fullMetricViewName, mv) + val views = sql(s"SHOW VIEWS IN $testCatalogName.$testNamespace") + .collect().map(_.getString(1)).toSet + assert(views.contains(metricViewName), + s"SHOW VIEWS should list metric views, got: $views") + } + } +} + +object MetricViewV2CatalogSuite { + val noViewCatalogName: String = "testcat_no_view" +} + +/** + * Minimal [[TableViewCatalog]] used by [[MetricViewV2CatalogSuite]]. Layers `ViewCatalog` + * methods over [[InMemoryTableCatalog]] (which provides table storage + namespace ops) and + * captures every [[ViewInfo]] passed to `createView` so tests can inspect the typed payload. + * + * The metric-view CREATE path goes via `ViewCatalog.createView`, so the captured map keys are + * the view identifiers; the source table created by the test fixture is stored separately in + * the inherited table catalog. + */ +class MetricViewRecordingCatalog extends InMemoryTableCatalog with TableViewCatalog { + private val views = + new ConcurrentHashMap[(Seq[String], String), ViewInfo]() + + // -- ViewCatalog methods -- + + override def listViews(namespace: Array[String]): Array[Identifier] = { + val target = namespace.toSeq + val out = new java.util.ArrayList[Identifier]() + views.forEach { (key, _) => + if (key._1 == target) out.add(Identifier.of(key._1.toArray, key._2)) + } + out.asScala.toArray + } + + // `loadView`, `tableExists`, and `viewExists` are inherited from `TableViewCatalog`'s + // defaults, which derive from `loadTableOrView` -- a stored `ViewInfo` is wrapped in + // `MetadataTable` by `loadTableOrView` and the defaults unwrap it correctly. + + // Bypasses `TableViewCatalog.tableExists` (whose default delegates to `loadTableOrView`, + // which checks our `views` map first); we want a tables-only check here so the cross-type + // collision branches in `createView` / `replaceView` see only "is there a *table* at this + // ident?". + private def tableExistsTablesOnly(ident: Identifier): Boolean = + try { super[InMemoryTableCatalog].loadTable(ident); true } + catch { case _: org.apache.spark.sql.catalyst.analysis.NoSuchTableException => false } + + override def createView(ident: Identifier, info: ViewInfo): ViewInfo = { + // TableViewCatalog active-rejection contract: createView must throw + // ViewAlreadyExistsException when *either* a view *or* a table sits at the ident. + if (tableExistsTablesOnly(ident)) { + throw new ViewAlreadyExistsException(ident) + } + val key = (ident.namespace().toSeq, ident.name()) + if (views.putIfAbsent(key, info) != null) { + throw new ViewAlreadyExistsException(ident) + } + MetricViewRecordingCatalog.capturedViews.put(ident, info) + info + } + + override def replaceView(ident: Identifier, info: ViewInfo): ViewInfo = { + // Per the TableViewCatalog contract, replaceView must surface NoSuchViewException + // when a *table* sits at the ident (not silently succeed and shadow the table). + if (tableExistsTablesOnly(ident)) throw new NoSuchViewException(ident) + val key = (ident.namespace().toSeq, ident.name()) + if (!views.containsKey(key)) throw new NoSuchViewException(ident) + views.put(key, info) + MetricViewRecordingCatalog.capturedViews.put(ident, info) + info + } + + override def dropView(ident: Identifier): Boolean = { + val key = (ident.namespace().toSeq, ident.name()) + val removed = views.remove(key) != null + if (removed) { + MetricViewRecordingCatalog.capturedViews.remove(ident) + } + removed + } + + override def renameView(oldIdent: Identifier, newIdent: Identifier): Unit = { + val oldKey = (oldIdent.namespace().toSeq, oldIdent.name()) + val newKey = (newIdent.namespace().toSeq, newIdent.name()) + val existing = views.get(oldKey) + if (existing == null) throw new NoSuchViewException(oldIdent) + if (views.putIfAbsent(newKey, existing) != null) { + throw new ViewAlreadyExistsException(newIdent) + } + views.remove(oldKey) + val captured = MetricViewRecordingCatalog.capturedViews.remove(oldIdent) + if (captured != null) { + MetricViewRecordingCatalog.capturedViews.put(newIdent, captured) + } + } + + // -- TableViewCatalog single-RPC perf path -- + + override def loadTableOrView(ident: Identifier): Table = { + val key = (ident.namespace().toSeq, ident.name()) + Option(views.get(key)) match { + case Some(info) => new MetadataTable(info, ident.toString) + // Bypass `TableViewCatalog.loadTable` (whose default delegates back to `loadTableOrView`) + // and call `InMemoryTableCatalog.loadTable` directly to avoid infinite recursion. + case None => super[InMemoryTableCatalog].loadTable(ident) + } + } +} + +object MetricViewRecordingCatalog { + // Captures every ViewInfo that flows through createView / replaceView so individual tests + // can assert on it. Cleared between tests via `reset()`. + val capturedViews: ConcurrentHashMap[Identifier, ViewInfo] = + new ConcurrentHashMap[Identifier, ViewInfo]() + + def reset(): Unit = capturedViews.clear() +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b05ee5abd033a..c4715b6a37efc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1091,7 +1091,7 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { "alternative" -> "DROP TABLE", "operation" -> "DROP VIEW", "foundType" -> "EXTERNAL", - "requiredType" -> "VIEW", + "requiredType" -> "VIEW or METRIC_VIEW", "objectName" -> "spark_catalog.dbx.tab1") ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index d31f69dcfdd8d..907aa895a562b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -139,6 +139,12 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { .add("s", "string") .add("point", new StructType().add("x", "int").add("y", "int"))) when(t.tableType).thenReturn(tableType) + // Mockito returns false for unstubbed Boolean methods, so analyzer code paths that + // dispatch through `CatalogTable.isViewLike` (e.g. `Analyzer.lookupTableOrView`'s v1 + // session-catalog branch) would misclassify a mocked VIEW fixture as a table. Stub + // the method to compute from the just-stubbed `tableType` so any view-like type + // (VIEW today, METRIC_VIEW or future kinds) resolves correctly. + when(t.isViewLike).thenReturn(CatalogTable.isViewLike(tableType)) when(t.provider).thenReturn(Some(provider)) when(t.identifier).thenReturn( ident.asTableIdentifier.copy(catalog = Some(SESSION_CATALOG_NAME))) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala index fea878bcf05d5..10f520314865d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.LogKeys.{HIVE_OPERATION_TYPE, STATEMENT_ID} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, SessionCatalog} -import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, VIEW} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, METRIC_VIEW, VIEW} import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} import org.apache.spark.util.Utils @@ -107,7 +107,7 @@ private[hive] trait SparkOperation extends Operation with Logging { def tableTypeString(tableType: CatalogTableType): String = tableType match { case EXTERNAL | MANAGED => "TABLE" - case VIEW => "VIEW" + case VIEW | METRIC_VIEW => "VIEW" case t => throw new IllegalArgumentException(s"Unknown table type is found: $t") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 8ec4f97c43e85..5d3a872d047c1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -274,7 +274,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Spark-created views do not have to be Hive compatible. If the data type is not // Hive compatible, we can set schema to empty so that Spark can still read this // view as the schema is also encoded in the table properties. - case schema if tableDefinition.tableType == CatalogTableType.VIEW && + case schema if tableDefinition.isViewLike && schema.exists(f => !isHiveCompatibleDataType(f.dataType)) => EMPTY_DATA_SCHEMA case other => other @@ -294,7 +294,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat try { client.createTable(tableWithDataSourceProps, ignoreIfExists) } catch { - case NonFatal(e) if tableDefinition.tableType == CatalogTableType.VIEW && + case NonFatal(e) if tableDefinition.isViewLike && hiveCompatibleSchema != EMPTY_DATA_SCHEMA => // If for some reason we fail to store the schema we store it as empty there // since we already store the real schema in the table properties. This try-catch @@ -450,6 +450,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val properties = new mutable.HashMap[String, String] properties.put(CREATED_SPARK_VERSION, table.createVersion) + + // Hive's `HiveTableType` enum has no metric-view variant -- it stores both regular views + // and metric views as `VIRTUAL_VIEW`. Persist a property marker so `restoreTableMetadata` + // can lift the round-tripped `CatalogTableType.VIEW` back to `CatalogTableType.METRIC_VIEW`. + if (table.tableType == CatalogTableType.METRIC_VIEW) { + properties.put(CatalogTable.VIEW_SUB_TYPE, CatalogTable.VIEW_SUB_TYPE_METRIC_VIEW) + } // This is for backward compatibility to Spark 2 to read tables with char/varchar created by // Spark 3.1. At read side, we will restore a table schema from its properties. So, we need to // clear the `varchar(n)` and `char(n)` and replace them with `string` as Spark 2 does not have @@ -595,7 +602,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat requireTableExists(db, tableDefinition.identifier.table) verifyTableProperties(tableDefinition) - if (tableDefinition.tableType == VIEW) { + if (tableDefinition.isViewLike) { val newTableProps = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition).toMap val schemaWithNoCollation = removeCollation(tableDefinition.schema) val hiveCompatibleSchema = @@ -834,8 +841,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat var table = inputTable + // HMS round-trips both regular views and metric views as `HiveTableType.VIRTUAL_VIEW`, + // which `HiveClientImpl.getTableOption` always maps back to `CatalogTableType.VIEW`. Lift + // it back to `CatalogTableType.METRIC_VIEW` when the persisted sub-type marker is present. + if (table.tableType == VIEW && + table.properties.get(CatalogTable.VIEW_SUB_TYPE) + .contains(CatalogTable.VIEW_SUB_TYPE_METRIC_VIEW)) { + table = table.copy(tableType = METRIC_VIEW) + } + table.properties.get(DATASOURCE_PROVIDER) match { - case None if table.tableType == VIEW => + case None if table.isViewLike => // If this is a view created by Spark 2.2 or higher versions, we should restore its schema // from table properties. getSchemaFromTableProperties(table.properties).foreach { schemaFromTableProps => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 898469221796b..21db79116b52e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -1163,7 +1163,7 @@ private[hive] object HiveClientImpl extends Logging { catalogTableType match { case CatalogTableType.EXTERNAL => HiveTableType.EXTERNAL_TABLE case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE - case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW + case t if CatalogTable.isViewLike(t) => HiveTableType.VIRTUAL_VIEW case t => throw new IllegalArgumentException( s"Unknown table type is found at toHiveTableType: $t") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index ff9be5ce759fe..8818983274ca0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1141,7 +1141,7 @@ class HiveDDLSuite "alternative" -> "DROP TABLE", "operation" -> "DROP VIEW", "foundType" -> "MANAGED", - "requiredType" -> "VIEW", + "requiredType" -> "VIEW or METRIC_VIEW", "objectName" -> s"$SESSION_CATALOG_NAME.default.tab1" ) )