Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,6 @@ class V2SessionCatalog(catalog: SessionCatalog)
}

val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(
catalogTable.schema, changes, catalogTable.provider, "ALTER TABLE")
val comment = properties.get(TableCatalog.PROP_COMMENT)
val collation = properties.get(TableCatalog.PROP_COLLATION)
val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
Expand All @@ -300,16 +298,23 @@ class V2SessionCatalog(catalog: SessionCatalog)
catalogTable.storage
}

val finalProperties = CatalogV2Util.applyClusterByChanges(properties, schema, changes)
val finalProperties = CatalogV2Util.applyClusterByChanges(properties,
catalogTable.schema, changes)
try {
if (changes.exists(!_.isInstanceOf[TableChange.ColumnChange])) {
catalog.alterTable(
catalogTable.copy(
properties = finalProperties, schema = schema, owner = owner, comment = comment,
collation = collation, storage = storage))
properties = finalProperties,
schema = catalogTable.schema,
owner = owner,
comment = comment,
collation = collation,
storage = storage))
}
if (changes.exists(_.isInstanceOf[TableChange.ColumnChange])) {
catalog.alterTableDataSchema(ident.asTableIdentifier, schema)
val newDataSchema = CatalogV2Util.applySchemaChanges(
catalogTable.dataSchema, changes, catalogTable.provider, "ALTER TABLE")
catalog.alterTableDataSchema(ident.asTableIdentifier, newDataSchema)
}
} catch {
case _: NoSuchTableException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableChange, TableInfo}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableCatalog, TableChange, TableInfo}
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.connector.expressions.Expressions
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader}
Expand Down Expand Up @@ -3432,4 +3433,47 @@ class HiveDDLSuite
any[String], any[String], any[StructType])
}
}

test("SPARK-52638: V2 Session Catalog alter table add column with partition") {
val catalog = spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]

withTable("t1") {
val identifier = Identifier.of(Array("default"), "t1")
val outputSchema = new StructType()
.add("a", IntegerType, true, "comment1")
.add("b", IntegerType, true, "comment2")
.add("c", IntegerType, true, "comment3")
.add("d", IntegerType, true, "comment4")
catalog.createTable(
identifier,
new TableInfo.Builder()
.withProperties(Map.empty.asJava)
.withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
.withPartitions(Array(Expressions.identity("a")))
.build()
)
val table1 = catalog.loadTable(identifier)
val cols = table1.columns()

assert(cols.length == 4)
assert(cols(0).name() == "b")
assert(cols(1).name() == "c")
assert(cols(2).name() == "d")
assert(cols(3).name() == "a")

catalog.alterTable(
identifier,
TableChange.addColumn(Array("e"), IntegerType)
)

val table2 = catalog.loadTable(identifier)
val cols2 = table2.columns()

assert(cols2(0).name() == "b")
assert(cols2(1).name() == "c")
assert(cols2(2).name() == "d")
assert(cols2(3).name() == "e")
assert(cols2(4).name() == "a")
}
}
}