Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,15 @@ class V2SessionCatalog(catalog: SessionCatalog)

val finalProperties = CatalogV2Util.applyClusterByChanges(properties, schema, changes)
try {
catalog.alterTable(
catalogTable.copy(
properties = finalProperties, schema = schema, owner = owner, comment = comment,
collation = collation, storage = storage))
if (changes.exists(!_.isInstanceOf[TableChange.ColumnChange])) {
catalog.alterTable(
catalogTable.copy(
properties = finalProperties, schema = schema, owner = owner, comment = comment,
collation = collation, storage = storage))
}
if (changes.exists(_.isInstanceOf[TableChange.ColumnChange])) {
catalog.alterTableDataSchema(ident.asTableIdentifier, schema)
}
} catch {
case _: NoSuchTableException =>
throw QueryCompilationErrors.noSuchTableError(ident)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,22 +590,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
parameters = Map("fieldName" -> "`missing_col`", "fields" -> "`id`, `data`"))
}

test("alterTable: rename top-level column") {
val catalog = newCatalog()

catalog.createTable(testIdent, columns, emptyTrans, emptyProps)
val table = catalog.loadTable(testIdent)

assert(table.columns === columns)

catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id"))
val updated = catalog.loadTable(testIdent)

val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType)

assert(updated.schema == expectedSchema)
}

test("alterTable: rename nested column") {
val catalog = newCatalog()

Expand All @@ -627,26 +611,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(updated.columns === expectedColumns)
}

test("alterTable: rename struct column") {
val catalog = newCatalog()

val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
val tableColumns = columns :+ Column.create("point", pointStruct)

catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps)
val table = catalog.loadTable(testIdent)

assert(table.columns === tableColumns)

catalog.alterTable(testIdent, TableChange.renameColumn(Array("point"), "p"))
val updated = catalog.loadTable(testIdent)

val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
val expectedColumns = columns :+ Column.create("p", newPointStruct)

assert(updated.columns === expectedColumns)
}

test("alterTable: rename missing column fails") {
val catalog = newCatalog()

Expand Down Expand Up @@ -686,21 +650,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
assert(updated.columns === expectedColumns)
}

test("alterTable: delete top-level column") {
val catalog = newCatalog()

catalog.createTable(testIdent, columns, emptyTrans, emptyProps)
val table = catalog.loadTable(testIdent)

assert(table.columns === columns)

catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false))
val updated = catalog.loadTable(testIdent)

val expectedSchema = new StructType().add("data", StringType)
assert(updated.schema == expectedSchema)
}

test("alterTable: delete nested column") {
val catalog = newCatalog()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ import java.net.URI
import java.time.LocalDateTime
import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.hadoop.fs.Path
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
Expand All @@ -32,12 +36,13 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableChange, TableInfo}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader}
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
import org.apache.spark.sql.hive.orc.OrcFileOperator
Expand Down Expand Up @@ -3392,4 +3397,39 @@ class HiveDDLSuite
)
}
}

test("SPARK-52272: V2SessionCatalog does not alter schema on Hive Catalog") {
val spyCatalog = spy(spark.sessionState.catalog.externalCatalog)
val v1SessionCatalog = new SessionCatalog(spyCatalog)
val v2SessionCatalog = new V2SessionCatalog(v1SessionCatalog)
withTable("t1") {
val identifier = Identifier.of(Array("default"), "t1")
val outputSchema = new StructType().add("a", IntegerType, true, "comment1")
v2SessionCatalog.createTable(
identifier,
new TableInfo.Builder()
.withProperties(Map.empty.asJava)
.withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
.withPartitions(Array.empty)
.build()
)
v2SessionCatalog.alterTable(identifier, TableChange.setProperty("foo", "bar"))
val loaded = v2SessionCatalog.loadTable(identifier)
assert(loaded.properties().get("foo") == "bar")

verify(spyCatalog, times(1)).alterTable(any[CatalogTable])
verify(spyCatalog, times(0)).alterTableDataSchema(
any[String], any[String], any[StructType])

v2SessionCatalog.alterTable(identifier,
TableChange.updateColumnComment(Array("a"), "comment2"))
val loaded2 = v2SessionCatalog.loadTable(identifier)
assert(loaded2.columns().length == 1)
assert(loaded2.columns.head.comment() == "comment2")

verify(spyCatalog, times(1)).alterTable(any[CatalogTable])
verify(spyCatalog, times(1)).alterTableDataSchema(
any[String], any[String], any[StructType])
}
}
}