From eb46a88a04916f96135defb809836632c122d22c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Wa=C5=9B?= Date: Wed, 9 Oct 2024 13:13:35 +0200 Subject: [PATCH 1/4] Fix redirecting from some Delta Lake tables Don't attempt to read Delta Lake table columns when performing table redirect to a different catalog. This allows redirecting from incomplete or invalid Delta Lake tables. --- ...redGlueMetastoreWithTableRedirections.java | 76 +++++++++++++++++++ .../hive/metastore/glue/GlueConverter.java | 7 +- .../metastore/glue/TestGlueConverter.java | 7 +- 3 files changed, 83 insertions(+), 7 deletions(-) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java index 317956370506a..0e9e43c6b1eeb 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSharedGlueMetastoreWithTableRedirections.java @@ -20,13 +20,25 @@ import io.trino.testing.DistributedQueryRunner; import io.trino.testing.QueryRunner; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.DeleteTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.SerDeInfo; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.TableInput; import java.nio.file.Path; +import java.util.Map; import static io.trino.plugin.hive.metastore.glue.TestingGlueHiveMetastore.createTestingGlueHiveMetastore; import static io.trino.testing.TestingSession.testSessionBuilder; import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; /** @@ -108,4 +120,68 @@ protected String getExpectedDeltaLakeCreateSchema(String catalogName) ")"; return format(expectedDeltaLakeCreateSchema, catalogName, schema, dataDirectory.toUri()); } + + @Test + public void testUnsupportedHiveTypeRedirect() + { + String tableName = "unsupported_types"; + // Use another complete table location so `SHOW CREATE TABLE` doesn't fail on reading metadata + String location; + try (GlueClient glueClient = GlueClient.create()) { + GetTableResponse existingTable = glueClient.getTable(GetTableRequest.builder() + .databaseName(schema) + .name("delta_table") + .build()); + location = existingTable.table().storageDescriptor().location(); + } + // Create a table directly in Glue, simulating an external table being created in Spark, + // with a custom AWS data type not mapped to HiveType when + Column timestampColumn = Column.builder() + .name("last_hour_load") + .type("timestamp_ntz") + .build(); + StorageDescriptor sd = StorageDescriptor.builder() + .columns(timestampColumn) + .location(location) + .inputFormat("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + .outputFormat("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") + .serdeInfo(SerDeInfo.builder() + .serializationLibrary("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + .parameters(Map.of( + "serialization.format", "1", + "path", location)) + .build()) + .build(); + TableInput tableInput = TableInput.builder() + .name(tableName) + .storageDescriptor(sd) + .parameters(Map.of( + "spark.sql.sources.provider", "delta")) + .tableType("EXTERNAL_TABLE") + .partitionKeys(timestampColumn) + .build(); + + CreateTableRequest createTableRequest = CreateTableRequest.builder() + .databaseName(schema) + .tableInput(tableInput) + .build(); + try (GlueClient glueClient = GlueClient.create()) { + glueClient.createTable(createTableRequest); + + String tableDefinition = (String) computeScalar("SHOW CREATE TABLE hive_with_redirections." + schema + "." + tableName); + String expected = """ + CREATE TABLE delta_with_redirections.%s.%s ( + a_varchar varchar + ) + WITH ( + location = '%s' + )"""; + assertThat(tableDefinition).isEqualTo(expected.formatted(schema, tableName, location)); + + glueClient.deleteTable(DeleteTableRequest.builder() + .databaseName(schema) + .name(tableInput.name()) + .build()); + } + } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueConverter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueConverter.java index 227da0312c718..e36dacc6c04a2 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueConverter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueConverter.java @@ -164,7 +164,7 @@ public static Table fromGlueTable(software.amazon.awssdk.services.glue.model.Tab storage = FAKE_PARQUET_STORAGE; } else if (isIcebergTable(tableParameters)) { - // todo: any reason to not do this for delta and trino mv? + // todo: any reason to not do this for trino mv? if (sd.columns() == null) { dataColumns = ImmutableList.of(FAKE_COLUMN); } @@ -174,6 +174,11 @@ else if (isIcebergTable(tableParameters)) { partitionColumns = ImmutableList.of(); storage = FAKE_PARQUET_STORAGE; } + else if (isDeltaLakeTable(tableParameters)) { + dataColumns = ImmutableList.of(FAKE_COLUMN); + partitionColumns = ImmutableList.of(); + storage = fromGlueStorage(sd, databaseName + "." + glueTable.name()); + } else { boolean isCsv = sd.serdeInfo() != null && HiveStorageFormat.CSV.getSerde().equals(sd.serdeInfo().serializationLibrary()); dataColumns = fromGlueColumns(sd.columns(), ColumnType.DATA, isCsv); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueConverter.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueConverter.java index ee350bd9adb6e..261e10fcc1591 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueConverter.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueConverter.java @@ -41,7 +41,6 @@ import java.util.Optional; import java.util.Random; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.metastore.HiveType.HIVE_STRING; import static io.trino.metastore.Table.TABLE_COMMENT; import static io.trino.metastore.TableInfo.ICEBERG_MATERIALIZED_VIEW_COMMENT; @@ -403,11 +402,7 @@ void testDeltaTableNonNullStorageDescriptor() .build(); assertThat(table.storageDescriptor()).isNotNull(); io.trino.metastore.Table trinoTable = GlueConverter.fromGlueTable(table, table.databaseName()); - assertThat(trinoTable.getDataColumns().stream() - .map(Column::getName) - .collect(toImmutableSet())).isEqualTo(glueTable.storageDescriptor().columns().stream() - .map(software.amazon.awssdk.services.glue.model.Column::name) - .collect(toImmutableSet())); + assertThat(trinoTable.getDataColumns()).hasSize(1); } @Test From 21010fac441f8f44e4a6387ecc07016a29b228ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Wa=C5=9B?= Date: Fri, 25 Oct 2024 09:41:39 +0200 Subject: [PATCH 2/4] Simplify size asserts in TestGlueConverter --- .../hive/metastore/glue/TestGlueConverter.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueConverter.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueConverter.java index 261e10fcc1591..4eda8e5522545 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueConverter.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/TestGlueConverter.java @@ -219,7 +219,7 @@ void testToGlueFunctionInput() .build(); LanguageFunction actual = GlueConverter.fromGlueFunction(function); - assertThat(input.resourceUris().size()).isEqualTo(3); + assertThat(input.resourceUris()).hasSize(3); assertThat(actual).isEqualTo(expected); // verify that the owner comes from the metastore @@ -281,7 +281,7 @@ void testConvertTableWithOpenCSVSerDe() assertThat(trinoTable.getTableType()).isEqualTo(glueTable.tableType()); assertThat(trinoTable.getOwner().orElse(null)).isEqualTo(glueTable.owner()); assertThat(trinoTable.getParameters()).isEqualTo(glueTable.parameters()); - assertThat(trinoTable.getDataColumns().size()).isEqualTo(1); + assertThat(trinoTable.getDataColumns()).hasSize(1); assertThat(trinoTable.getDataColumns().getFirst().getType()).isEqualTo(HIVE_STRING); assertColumnList(glueTable.partitionKeys(), trinoTable.getPartitionColumns()); @@ -369,7 +369,7 @@ void testIcebergTableNullStorageDescriptor() .storageDescriptor((StorageDescriptor) null) .build(); io.trino.metastore.Table trinoTable = GlueConverter.fromGlueTable(table, table.databaseName()); - assertThat(trinoTable.getDataColumns().size()).isEqualTo(1); + assertThat(trinoTable.getDataColumns()).hasSize(1); } @Test @@ -380,7 +380,7 @@ void testIcebergTableNonNullStorageDescriptor() .build(); assertThat(table.storageDescriptor()).isNotNull(); io.trino.metastore.Table trinoTable = GlueConverter.fromGlueTable(table, table.databaseName()); - assertThat(trinoTable.getDataColumns().size()).isEqualTo(1); + assertThat(trinoTable.getDataColumns()).hasSize(1); } @Test @@ -391,7 +391,7 @@ void testDeltaTableNullStorageDescriptor() .storageDescriptor((StorageDescriptor) null) .build(); io.trino.metastore.Table trinoTable = GlueConverter.fromGlueTable(table, table.databaseName()); - assertThat(trinoTable.getDataColumns().size()).isEqualTo(1); + assertThat(trinoTable.getDataColumns()).hasSize(1); } @Test @@ -410,7 +410,7 @@ public void testIcebergMaterializedViewNullStorageDescriptor() { assertThat(glueMaterializedView.storageDescriptor()).isNull(); Table trinoTable = GlueConverter.fromGlueTable(glueMaterializedView, glueMaterializedView.databaseName()); - assertThat(trinoTable.getDataColumns().size()).isEqualTo(1); + assertThat(trinoTable.getDataColumns()).hasSize(1); } @Test @@ -428,7 +428,7 @@ private static void assertColumnList(List Date: Mon, 28 Oct 2024 11:15:27 +0100 Subject: [PATCH 3/4] Fix redirecting from some Delta Lake tables for Glue V1 Don't attempt to read Delta Lake table columns when performing table redirect to a different catalog. This allows redirecting from incomplete or invalid Delta Lake tables. --- .../glue/v1/converter/GlueToTrinoConverter.java | 16 ++++++++++++++-- .../glue/v1/TestGlueToTrinoConverter.java | 7 +------ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/converter/GlueToTrinoConverter.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/converter/GlueToTrinoConverter.java index 549ca17beb369..53bed07ae4935 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/converter/GlueToTrinoConverter.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/v1/converter/GlueToTrinoConverter.java @@ -153,14 +153,26 @@ public static Table convertTable(com.amazonaws.services.glue.model.Table glueTab Optional storageDescriptor = getStorageDescriptor(glueTable); if (isIcebergTable(tableParameters) || - (storageDescriptor.isEmpty() && isDeltaLakeTable(tableParameters)) || (storageDescriptor.isEmpty() && isTrinoMaterializedView(tableType, tableParameters))) { // Iceberg tables do not need to read the StorageDescriptor field, but we still need to return dummy properties for compatibility - // Delta Lake tables only need to provide a dummy properties if a StorageDescriptor was not explicitly configured. // Materialized views do not need to read the StorageDescriptor, but we still need to return dummy properties for compatibility tableBuilder.setDataColumns(ImmutableList.of(new Column("dummy", HIVE_INT, Optional.empty(), ImmutableMap.of()))); tableBuilder.getStorageBuilder().setStorageFormat(HiveStorageFormat.PARQUET.toStorageFormat()); } + else if (isDeltaLakeTable(tableParameters)) { + tableBuilder.setDataColumns(ImmutableList.of(new Column("dummy", HIVE_INT, Optional.empty(), ImmutableMap.of()))); + tableBuilder.setPartitionColumns(ImmutableList.of()); + if (storageDescriptor.isEmpty()) { + tableBuilder.getStorageBuilder().setStorageFormat(HiveStorageFormat.PARQUET.toStorageFormat()); + } + else { + StorageDescriptor sd = storageDescriptor.get(); + if (sd.getSerdeInfo() == null) { + throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Table SerdeInfo is null for table '%s' %s".formatted(table, glueTable)); + } + new StorageConverter().setStorageBuilder(sd, tableBuilder.getStorageBuilder()); + } + } else { if (storageDescriptor.isEmpty()) { throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Table StorageDescriptor is null for table '%s' %s".formatted(table, glueTable)); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/v1/TestGlueToTrinoConverter.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/v1/TestGlueToTrinoConverter.java index 345e11007fb18..4f6127041d25d 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/v1/TestGlueToTrinoConverter.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/v1/TestGlueToTrinoConverter.java @@ -35,7 +35,6 @@ import java.util.Optional; import static com.amazonaws.util.CollectionUtils.isNullOrEmpty; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.metastore.HiveType.HIVE_STRING; import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; import static io.trino.plugin.hive.metastore.glue.v1.TestingMetastoreObjects.getGlueTestColumn; @@ -251,11 +250,7 @@ public void testDeltaTableNonNullStorageDescriptor() testTable.setParameters(ImmutableMap.of(SPARK_TABLE_PROVIDER_KEY, DELTA_LAKE_PROVIDER)); assertThat(getStorageDescriptor(testTable)).isPresent(); io.trino.metastore.Table trinoTable = GlueToTrinoConverter.convertTable(testTable, testDatabase.getName()); - assertThat(trinoTable.getDataColumns().stream() - .map(Column::getName) - .collect(toImmutableSet())).isEqualTo(getStorageDescriptor(testTable).orElseThrow().getColumns().stream() - .map(com.amazonaws.services.glue.model.Column::getName) - .collect(toImmutableSet())); + assertThat(trinoTable.getDataColumns()).hasSize(1); } @Test From 6763e986a292cc71e4e85eada37d4f0350d2c2a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Wa=C5=9B?= Date: Mon, 28 Oct 2024 11:16:46 +0100 Subject: [PATCH 4/4] Simplify size asserts in TestGlueToTrinoConverter --- .../metastore/glue/v1/TestGlueToTrinoConverter.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/v1/TestGlueToTrinoConverter.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/v1/TestGlueToTrinoConverter.java index 4f6127041d25d..e7231d6ee9287 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/v1/TestGlueToTrinoConverter.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/glue/v1/TestGlueToTrinoConverter.java @@ -118,7 +118,7 @@ public void testConvertTableWithOpenCSVSerDe() assertThat(trinoTable.getTableType()).isEqualTo(getTableTypeNullable(glueTable)); assertThat(trinoTable.getOwner().orElse(null)).isEqualTo(glueTable.getOwner()); assertThat(trinoTable.getParameters()).isEqualTo(getTableParameters(glueTable)); - assertThat(trinoTable.getDataColumns().size()).isEqualTo(1); + assertThat(trinoTable.getDataColumns()).hasSize(1); assertThat(trinoTable.getDataColumns().get(0).getType()).isEqualTo(HIVE_STRING); assertColumnList(trinoTable.getPartitionColumns(), glueTable.getPartitionKeys()); @@ -223,7 +223,7 @@ public void testIcebergTableNullStorageDescriptor() testTable.setParameters(ImmutableMap.of(ICEBERG_TABLE_TYPE_NAME, ICEBERG_TABLE_TYPE_VALUE)); testTable.setStorageDescriptor(null); io.trino.metastore.Table trinoTable = GlueToTrinoConverter.convertTable(testTable, testDatabase.getName()); - assertThat(trinoTable.getDataColumns().size()).isEqualTo(1); + assertThat(trinoTable.getDataColumns()).hasSize(1); } @Test @@ -232,7 +232,7 @@ public void testIcebergTableNonNullStorageDescriptor() testTable.setParameters(ImmutableMap.of(ICEBERG_TABLE_TYPE_NAME, ICEBERG_TABLE_TYPE_VALUE)); assertThat(getStorageDescriptor(testTable)).isPresent(); io.trino.metastore.Table trinoTable = GlueToTrinoConverter.convertTable(testTable, testDatabase.getName()); - assertThat(trinoTable.getDataColumns().size()).isEqualTo(1); + assertThat(trinoTable.getDataColumns()).hasSize(1); } @Test @@ -241,7 +241,7 @@ public void testDeltaTableNullStorageDescriptor() testTable.setParameters(ImmutableMap.of(SPARK_TABLE_PROVIDER_KEY, DELTA_LAKE_PROVIDER)); testTable.setStorageDescriptor(null); io.trino.metastore.Table trinoTable = GlueToTrinoConverter.convertTable(testTable, testDatabase.getName()); - assertThat(trinoTable.getDataColumns().size()).isEqualTo(1); + assertThat(trinoTable.getDataColumns()).hasSize(1); } @Test @@ -259,7 +259,7 @@ public void testIcebergMaterializedViewNullStorageDescriptor() Table testMaterializedView = getGlueTestTrinoMaterializedView(testDatabase.getName()); assertThat(getStorageDescriptor(testMaterializedView)).isEmpty(); io.trino.metastore.Table trinoTable = GlueToTrinoConverter.convertTable(testMaterializedView, testDatabase.getName()); - assertThat(trinoTable.getDataColumns().size()).isEqualTo(1); + assertThat(trinoTable.getDataColumns()).hasSize(1); } @Test @@ -274,7 +274,7 @@ private static void assertColumnList(List actual, List