From 0279ea72dcac2d8769ce685727654814f77ef856 Mon Sep 17 00:00:00 2001 From: wangd Date: Sat, 18 Jan 2025 21:41:21 +0800 Subject: [PATCH] [Iceberg]Support setting warehouse data directory for Hadoop catalog --- presto-iceberg/pom.xml | 22 + .../presto/iceberg/IcebergConfig.java | 14 + .../iceberg/IcebergNativeCatalogFactory.java | 7 + .../presto/iceberg/IcebergNativeMetadata.java | 27 +- .../IcebergDistributedSmokeTestBase.java | 10 +- .../iceberg/IcebergDistributedTestBase.java | 45 +- .../presto/iceberg/IcebergQueryRunner.java | 13 +- .../presto/iceberg/TestIcebergConfig.java | 3 + .../presto/iceberg/TestIcebergFileWriter.java | 6 +- .../container/IcebergMinIODataLake.java | 117 +++++ .../TestIcebergDistributedOnS3Hadoop.java | 133 +++++ ...rgHadoopCatalogOnS3DistributedQueries.java | 91 ++++ .../hadoop/TestIcebergSmokeHadoop.java | 13 +- .../hadoop/TestIcebergSmokeOnS3Hadoop.java | 461 ++++++++++++++++++ .../hive/TestIcebergDistributedHive.java | 2 +- .../iceberg/hive/TestIcebergSmokeHive.java | 2 +- .../nessie/TestIcebergDistributedNessie.java | 4 +- .../nessie/TestIcebergSmokeNessie.java | 2 +- .../iceberg/rest/IcebergRestTestUtil.java | 5 +- 19 files changed, 931 insertions(+), 46 deletions(-) create mode 100644 presto-iceberg/src/test/java/com/facebook/presto/iceberg/container/IcebergMinIODataLake.java create mode 100644 presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedOnS3Hadoop.java create mode 100644 presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergHadoopCatalogOnS3DistributedQueries.java create mode 100644 presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeOnS3Hadoop.java diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml index 36cbe08db3b03..c8802ab6a61d6 100644 --- a/presto-iceberg/pom.xml +++ b/presto-iceberg/pom.xml @@ -263,6 +263,28 @@ + + org.testcontainers + testcontainers + test + + + org.slf4j + slf4j-api + + + + + + com.amazonaws + aws-java-sdk-core + + + + com.amazonaws + aws-java-sdk-s3 + + org.apache.iceberg iceberg-core diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java index 42328ddb4e7f4..bc9efc73420e2 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java @@ -50,6 +50,7 @@ public class IcebergConfig private HiveCompressionCodec compressionCodec = GZIP; private CatalogType catalogType = HIVE; private String catalogWarehouse; + private String catalogWarehouseDataDir; private int catalogCacheSize = 10; private int maxPartitionsPerWriter = 100; private List hadoopConfigResources = ImmutableList.of(); @@ -127,6 +128,19 @@ public IcebergConfig setCatalogWarehouse(String catalogWarehouse) return this; } + public String getCatalogWarehouseDataDir() + { + return catalogWarehouseDataDir; + } + + @Config("iceberg.catalog.warehouse.datadir") + @ConfigDescription("Iceberg catalog default root data writing directory") + public IcebergConfig setCatalogWarehouseDataDir(String catalogWarehouseDataDir) + { + this.catalogWarehouseDataDir = catalogWarehouseDataDir; + return this; + } + @Min(1) public int getCatalogCacheSize() { diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java index 4d23a97a3ff71..5a24e94f1d8ea 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeCatalogFactory.java @@ -52,6 +52,7 @@ public class IcebergNativeCatalogFactory private final String catalogName; protected final CatalogType catalogType; private final String catalogWarehouse; + private final String catalogWarehouseDataDir; protected final IcebergConfig icebergConfig; private final List hadoopConfigResources; @@ -69,6 +70,7 @@ public IcebergNativeCatalogFactory( this.icebergConfig = requireNonNull(config, "config is null"); this.catalogType = config.getCatalogType(); this.catalogWarehouse = config.getCatalogWarehouse(); + this.catalogWarehouseDataDir = config.getCatalogWarehouseDataDir(); this.hadoopConfigResources = icebergConfig.getHadoopConfigResources(); this.s3ConfigurationUpdater = requireNonNull(s3ConfigurationUpdater, "s3ConfigurationUpdater is null"); this.gcsConfigurationInitialize = requireNonNull(gcsConfigurationInitialize, "gcsConfigurationInitialize is null"); @@ -90,6 +92,11 @@ public Catalog getCatalog(ConnectorSession session) } } + public String getCatalogWarehouseDataDir() + { + return this.catalogWarehouseDataDir; + } + public SupportsNamespaces getNamespaces(ConnectorSession session) { Catalog catalog = getCatalog(session); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index 123c45bf569f4..197cdab3fdde1 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.stream.Stream; +import static com.facebook.presto.iceberg.CatalogType.HADOOP; import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat; import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning; @@ -83,12 +84,14 @@ import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; public class IcebergNativeMetadata extends IcebergAbstractMetadata { private static final String VIEW_DIALECT = "presto"; + private final Optional warehouseDataDir; private final IcebergNativeCatalogFactory catalogFactory; private final CatalogType catalogType; private final ConcurrentMap icebergViews = new ConcurrentHashMap<>(); @@ -107,6 +110,7 @@ public IcebergNativeMetadata( super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache); this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); this.catalogType = requireNonNull(catalogType, "catalogType is null"); + this.warehouseDataDir = Optional.ofNullable(catalogFactory.getCatalogWarehouseDataDir()); } @Override @@ -316,20 +320,30 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con try { TableIdentifier tableIdentifier = toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()); String targetPath = getTableLocation(tableMetadata.getProperties()); + Map tableProperties = populateTableProperties(tableMetadata, fileFormat, session, catalogType); + if (!tableProperties.containsKey(WRITE_DATA_LOCATION)) { + Optional dataLocation = getDataLocationBasedOnWarehouseDataDir(schemaTableName); + if (dataLocation.isPresent()) { + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); + tableProperties = propertiesBuilder.putAll(tableProperties) + .put(WRITE_DATA_LOCATION, dataLocation.get()) + .build(); + } + } if (!isNullOrEmpty(targetPath)) { transaction = catalogFactory.getCatalog(session).newCreateTableTransaction( tableIdentifier, schema, partitionSpec, targetPath, - populateTableProperties(tableMetadata, fileFormat, session, catalogType)); + tableProperties); } else { transaction = catalogFactory.getCatalog(session).newCreateTableTransaction( tableIdentifier, schema, partitionSpec, - populateTableProperties(tableMetadata, fileFormat, session, catalogType)); + tableProperties); } } catch (AlreadyExistsException e) { @@ -379,4 +393,13 @@ public void unregisterTable(ConnectorSession clientSession, SchemaTableName sche { catalogFactory.getCatalog(clientSession).dropTable(toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()), false); } + + private Optional getDataLocationBasedOnWarehouseDataDir(SchemaTableName schemaTableName) + { + if (!catalogType.equals(HADOOP)) { + return Optional.empty(); + } + return Optional.ofNullable(warehouseDataDir.map(base -> base + schemaTableName.getSchemaName() + "/" + schemaTableName.getTableName()) + .orElse(null)); + } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java index 14f10c92b5928..42ec0fb2998b8 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedSmokeTestBase.java @@ -24,6 +24,7 @@ import com.facebook.presto.testing.assertions.Assert; import com.facebook.presto.tests.AbstractTestIntegrationSmokeTest; import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.Table; import org.apache.iceberg.UpdateProperties; import org.intellij.lang.annotations.Language; @@ -32,7 +33,6 @@ import java.io.IOException; import java.nio.file.Files; -import java.nio.file.Path; import java.util.function.BiConsumer; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -738,7 +738,7 @@ private void testSchemaEvolution(Session session, FileFormat fileFormat) } @Test - private void testCreateTableLike() + protected void testCreateTableLike() { Session session = getSession(); String schemaName = session.getSchema().get(); @@ -892,7 +892,7 @@ private void testWithAllFormatVersions(BiConsumer test) test.accept("2", "merge-on-read"); } - private String getTablePropertiesString(String tableName) + protected String getTablePropertiesString(String tableName) { MaterializedResult showCreateTable = computeActual("SHOW CREATE TABLE " + tableName); String createTable = (String) getOnlyElement(showCreateTable.getOnlyColumnAsSet()); @@ -1225,8 +1225,8 @@ protected String getLocation(String schema, String table) protected Path getCatalogDirectory() { - Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); - return getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false); + java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + return new Path(getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI()); } protected Table getIcebergTable(ConnectorSession session, String namespace, String tableName) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index cb341129b79df..76221f64ed259 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -32,6 +32,9 @@ import com.facebook.presto.hive.HiveHdfsConfiguration; import com.facebook.presto.hive.MetastoreClientConfig; import com.facebook.presto.hive.authentication.NoHdfsAuthentication; +import com.facebook.presto.hive.s3.HiveS3Config; +import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater; +import com.facebook.presto.hive.s3.S3ConfigurationUpdater; import com.facebook.presto.iceberg.delete.DeleteFile; import com.facebook.presto.metadata.CatalogMetadata; import com.facebook.presto.metadata.Metadata; @@ -63,6 +66,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.FileScanTask; @@ -96,7 +100,6 @@ import java.lang.reflect.Field; import java.net.URI; import java.nio.ByteBuffer; -import java.nio.file.Path; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.format.DateTimeFormatter; @@ -1679,14 +1682,14 @@ public void testMetadataVersionsMaintainingProperties() // Table `test_table_with_default_setting_properties`'s current metadata record all 5 previous metadata files assertEquals(defaultTableMetadata.previousFiles().size(), 5); - FileSystem fileSystem = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), new org.apache.hadoop.fs.Path(settingTable.location())); + FileSystem fileSystem = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), new Path(settingTable.location())); // Table `test_table_with_setting_properties`'s all existing metadata files count is 2 - FileStatus[] settingTableFiles = fileSystem.listStatus(new org.apache.hadoop.fs.Path(settingTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION)); + FileStatus[] settingTableFiles = fileSystem.listStatus(new Path(settingTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION)); assertEquals(settingTableFiles.length, 2); // Table `test_table_with_default_setting_properties`'s all existing metadata files count is 6 - FileStatus[] defaultTableFiles = fileSystem.listStatus(new org.apache.hadoop.fs.Path(defaultTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION)); + FileStatus[] defaultTableFiles = fileSystem.listStatus(new Path(defaultTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION)); assertEquals(defaultTableFiles.length, 6); } finally { @@ -2261,12 +2264,12 @@ private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(path, fs)) .createWriterFunc(GenericParquetWriter::buildWriter) .forTable(icebergTable) @@ -2293,13 +2296,13 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map overwriteValues, Map partitionValues) throws Exception { - Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); File metastoreDir = getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile(); - org.apache.hadoop.fs.Path metadataDir = new org.apache.hadoop.fs.Path(metastoreDir.toURI()); + Path metadataDir = new Path(metastoreDir.toURI()); String deleteFileName = "delete_file_" + randomUUID(); FileSystem fs = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), metadataDir); Schema deleteRowSchema = icebergTable.schema().select(overwriteValues.keySet()); - Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(HadoopOutputFile.fromPath(new org.apache.hadoop.fs.Path(metadataDir, deleteFileName), fs)) + Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(HadoopOutputFile.fromPath(new Path(metadataDir, deleteFileName), fs)) .forTable(icebergTable) .rowSchema(deleteRowSchema) .createWriterFunc(GenericParquetWriter::buildWriter) @@ -2320,13 +2323,19 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map {}), + ImmutableSet.of(), hiveClientConfig); return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication()); } @@ -2348,18 +2357,18 @@ protected Table loadTable(String tableName) protected Map getProperties() { - File metastoreDir = getCatalogDirectory(); + Path metastoreDir = getCatalogDirectory(); return ImmutableMap.of("warehouse", metastoreDir.toString()); } - protected File getCatalogDirectory() + protected Path getCatalogDirectory() { - Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); switch (catalogType) { case HIVE: case HADOOP: case NESSIE: - return getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile(); + return new Path(getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI()); } throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java index 6e37c8cb41bac..aae51883aa8c4 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java @@ -43,6 +43,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.OptionalInt; @@ -202,13 +203,11 @@ public static DistributedQueryRunner createIcebergQueryRunner( String catalogType = extraConnectorProperties.getOrDefault("iceberg.catalog.type", HIVE.name()); Path icebergDataDirectory = getIcebergDataDirectoryPath(queryRunner.getCoordinator().getDataDirectory(), catalogType, format, addStorageFormatToPath); - Map icebergProperties = ImmutableMap.builder() - .put("iceberg.file-format", format.name()) - .putAll(getConnectorProperties(CatalogType.valueOf(catalogType), icebergDataDirectory)) - .putAll(extraConnectorProperties) - .build(); - - queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties); + Map icebergProperties = new HashMap<>(); + icebergProperties.put("iceberg.file-format", format.name()); + icebergProperties.putAll(getConnectorProperties(CatalogType.valueOf(catalogType), icebergDataDirectory)); + icebergProperties.putAll(extraConnectorProperties); + queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", ImmutableMap.copyOf(icebergProperties)); if (addJmxPlugin) { queryRunner.installPlugin(new JmxPlugin()); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java index 588b7273d44c5..8912ef7937e98 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergConfig.java @@ -49,6 +49,7 @@ public void testDefaults() .setCompressionCodec(GZIP) .setCatalogType(HIVE) .setCatalogWarehouse(null) + .setCatalogWarehouseDataDir(null) .setCatalogCacheSize(10) .setHadoopConfigResources(null) .setHiveStatisticsMergeFlags("") @@ -81,6 +82,7 @@ public void testExplicitPropertyMappings() .put("iceberg.compression-codec", "NONE") .put("iceberg.catalog.type", "HADOOP") .put("iceberg.catalog.warehouse", "path") + .put("iceberg.catalog.warehouse.datadir", "path_data_dir") .put("iceberg.catalog.cached-catalog-num", "6") .put("iceberg.hadoop.config.resources", "/etc/hadoop/conf/core-site.xml") .put("iceberg.max-partitions-per-writer", "222") @@ -110,6 +112,7 @@ public void testExplicitPropertyMappings() .setCompressionCodec(NONE) .setCatalogType(HADOOP) .setCatalogWarehouse("path") + .setCatalogWarehouseDataDir("path_data_dir") .setCatalogCacheSize(6) .setHadoopConfigResources("/etc/hadoop/conf/core-site.xml") .setMaxPartitionsPerWriter(222) diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java index 734d47172ded8..44fc80cf2a01b 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java @@ -25,10 +25,13 @@ import com.facebook.presto.hive.FileFormatDataSourceStats; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; import com.facebook.presto.hive.HiveCompressionCodec; import com.facebook.presto.hive.HiveDwrfEncryptionProvider; +import com.facebook.presto.hive.MetastoreClientConfig; import com.facebook.presto.hive.NodeVersion; import com.facebook.presto.hive.OrcFileWriterConfig; +import com.facebook.presto.hive.s3.HiveS3Config; import com.facebook.presto.metadata.SessionPropertyManager; import com.facebook.presto.parquet.FileParquetDataSource; import com.facebook.presto.parquet.cache.MetadataReader; @@ -61,6 +64,7 @@ import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.iceberg.IcebergAbstractMetadata.toIcebergSchema; +import static com.facebook.presto.iceberg.IcebergDistributedTestBase.getHdfsEnvironment; import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static com.facebook.presto.iceberg.IcebergSessionProperties.dataSizeSessionProperty; import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager; @@ -113,7 +117,7 @@ public void setup() throws Exception this.connectorSession = session.toConnectorSession(connectorId); TypeManager typeManager = new TestingTypeManager(); this.hdfsContext = new HdfsContext(connectorSession); - HdfsEnvironment hdfsEnvironment = IcebergDistributedTestBase.getHdfsEnvironment(); + HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(new HiveClientConfig(), new MetastoreClientConfig(), new HiveS3Config()); this.icebergFileWriterFactory = new IcebergFileWriterFactory(hdfsEnvironment, typeManager, new FileFormatDataSourceStats(), new NodeVersion("test"), new OrcFileWriterConfig(), HiveDwrfEncryptionProvider.NO_ENCRYPTION); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/container/IcebergMinIODataLake.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/container/IcebergMinIODataLake.java new file mode 100644 index 0000000000000..69afec881af83 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/container/IcebergMinIODataLake.java @@ -0,0 +1,117 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.container; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.facebook.presto.testing.containers.MinIOContainer; +import com.facebook.presto.util.AutoCloseableCloser; +import com.google.common.collect.ImmutableMap; +import org.testcontainers.containers.Network; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Objects.requireNonNull; +import static org.testcontainers.containers.Network.newNetwork; + +public class IcebergMinIODataLake + implements Closeable +{ + public static final String ACCESS_KEY = "minioadmin"; + public static final String SECRET_KEY = "minioadmin"; + + private final String bucketName; + private final String warehouseDir; + private final MinIOContainer minIOContainer; + + private final AtomicBoolean isStarted = new AtomicBoolean(false); + private final AutoCloseableCloser closer = AutoCloseableCloser.create(); + + public IcebergMinIODataLake(String bucketName, String warehouseDir) + { + this.bucketName = requireNonNull(bucketName, "bucketName is null"); + this.warehouseDir = requireNonNull(warehouseDir, "warehouseDir is null"); + Network network = closer.register(newNetwork()); + this.minIOContainer = closer.register( + MinIOContainer.builder() + .withNetwork(network) + .withEnvVars(ImmutableMap.builder() + .put("MINIO_ACCESS_KEY", ACCESS_KEY) + .put("MINIO_SECRET_KEY", SECRET_KEY) + .build()) + .build()); + } + + public void start() + { + if (isStarted()) { + return; + } + try { + this.minIOContainer.start(); + AmazonS3 s3Client = AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + "http://localhost:" + minIOContainer.getMinioApiEndpoint().getPort(), + "us-east-1")) + .withPathStyleAccessEnabled(true) + .withCredentials(new AWSStaticCredentialsProvider( + new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY))) + .build(); + s3Client.createBucket(this.bucketName); + s3Client.putObject(this.bucketName, this.warehouseDir, ""); + } + finally { + isStarted.set(true); + } + } + + public boolean isStarted() + { + return isStarted.get(); + } + + public void stop() + { + if (!isStarted()) { + return; + } + try { + closer.close(); + } + catch (Exception e) { + throw new RuntimeException("Failed to stop IcebergMinioDataLake", e); + } + finally { + isStarted.set(false); + } + } + + public MinIOContainer getMinio() + { + return minIOContainer; + } + + @Override + public void close() + throws IOException + { + stop(); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedOnS3Hadoop.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedOnS3Hadoop.java new file mode 100644 index 0000000000000..866c0aef18570 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedOnS3Hadoop.java @@ -0,0 +1,133 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.hadoop; + +import com.facebook.presto.hive.HdfsContext; +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.MetastoreClientConfig; +import com.facebook.presto.hive.s3.HiveS3Config; +import com.facebook.presto.iceberg.IcebergDistributedTestBase; +import com.facebook.presto.iceberg.IcebergQueryRunner; +import com.facebook.presto.iceberg.container.IcebergMinIODataLake; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +import java.net.URI; + +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.ACCESS_KEY; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.SECRET_KEY; +import static com.facebook.presto.testing.TestingConnectorSession.SESSION; +import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; +import static com.google.common.io.Files.createTempDir; +import static java.lang.String.format; + +public class TestIcebergDistributedOnS3Hadoop + extends IcebergDistributedTestBase +{ + static final String WAREHOUSE_DATA_DIR = "warehouse_data/"; + final String bucketName; + final String catalogWarehouseDir; + private IcebergMinIODataLake dockerizedS3DataLake; + HostAndPort hostAndPort; + + public TestIcebergDistributedOnS3Hadoop() + { + super(HADOOP); + bucketName = "forhadoop-" + randomTableSuffix(); + catalogWarehouseDir = createTempDir().toURI().toString(); + } + + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.createIcebergQueryRunner(ImmutableMap.of(), HADOOP, + ImmutableMap.of( + "iceberg.catalog.warehouse", catalogWarehouseDir, + "iceberg.catalog.warehouse.datadir", getCatalogDataDirectory().toString(), + "hive.s3.use-instance-credentials", "false", + "hive.s3.aws-access-key", ACCESS_KEY, + "hive.s3.aws-secret-key", SECRET_KEY, + "hive.s3.endpoint", format("http://%s:%s", hostAndPort.getHost(), hostAndPort.getPort()), + "hive.s3.path-style-access", "true")); + } + + @BeforeClass + @Override + public void init() + throws Exception + { + this.dockerizedS3DataLake = new IcebergMinIODataLake(bucketName, WAREHOUSE_DATA_DIR); + this.dockerizedS3DataLake.start(); + hostAndPort = this.dockerizedS3DataLake.getMinio().getMinioApiEndpoint(); + super.init(); + } + + @AfterClass + public void tearDown() + { + if (dockerizedS3DataLake != null) { + dockerizedS3DataLake.stop(); + } + } + + @Override + public void testCreateTableWithCustomLocation() + { + String tableName = "test_hadoop_table_with_custom_location"; + URI tableTargetURI = createTempDir().toURI(); + assertQueryFails(format("create table %s (a int, b varchar)" + " with (location = '%s')", tableName, tableTargetURI.toString()), + "Cannot set a custom location for a path-based table.*"); + } + + protected Path getCatalogDataDirectory() + { + return new Path(URI.create(format("s3://%s/%s", bucketName, WAREHOUSE_DATA_DIR))); + } + + protected Path getCatalogDirectory() + { + return new Path(catalogWarehouseDir); + } + + protected HdfsEnvironment getHdfsEnvironment() + { + HiveClientConfig hiveClientConfig = new HiveClientConfig(); + MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig(); + HiveS3Config hiveS3Config = new HiveS3Config() + .setS3AwsAccessKey(ACCESS_KEY) + .setS3AwsSecretKey(SECRET_KEY) + .setS3PathStyleAccess(true) + .setS3Endpoint(format("http://%s:%s", hostAndPort.getHost(), hostAndPort.getPort())) + .setS3UseInstanceCredentials(false); + return getHdfsEnvironment(hiveClientConfig, metastoreClientConfig, hiveS3Config); + } + + protected Table loadTable(String tableName) + { + Configuration configuration = getHdfsEnvironment().getConfiguration(new HdfsContext(SESSION), getCatalogDataDirectory()); + Catalog catalog = CatalogUtil.loadCatalog(HADOOP.getCatalogImpl(), "test-hive", getProperties(), configuration); + return catalog.loadTable(TableIdentifier.of("tpch", tableName)); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergHadoopCatalogOnS3DistributedQueries.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergHadoopCatalogOnS3DistributedQueries.java new file mode 100644 index 0000000000000..44535b377d560 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergHadoopCatalogOnS3DistributedQueries.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.hadoop; + +import com.facebook.presto.iceberg.IcebergQueryRunner; +import com.facebook.presto.iceberg.TestIcebergDistributedQueries; +import com.facebook.presto.iceberg.container.IcebergMinIODataLake; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.ACCESS_KEY; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.SECRET_KEY; +import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; +import static com.google.common.io.Files.createTempDir; +import static java.lang.String.format; + +public class TestIcebergHadoopCatalogOnS3DistributedQueries + extends TestIcebergDistributedQueries +{ + static final String WAREHOUSE_DATA_DIR = "warehouse_data/"; + final String bucketName; + final String catalogWarehouseDir; + private IcebergMinIODataLake dockerizedS3DataLake; + HostAndPort hostAndPort; + + public TestIcebergHadoopCatalogOnS3DistributedQueries() + { + super(HADOOP); + bucketName = "forhadoop-" + randomTableSuffix(); + catalogWarehouseDir = createTempDir().toURI().toString(); + } + + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.createIcebergQueryRunner(ImmutableMap.of(), HADOOP, + ImmutableMap.of( + "iceberg.catalog.warehouse", catalogWarehouseDir, + "iceberg.catalog.warehouse.datadir", format("s3://%s/%s", bucketName, WAREHOUSE_DATA_DIR), + "hive.s3.use-instance-credentials", "false", + "hive.s3.aws-access-key", ACCESS_KEY, + "hive.s3.aws-secret-key", SECRET_KEY, + "hive.s3.endpoint", format("http://%s:%s", hostAndPort.getHost(), hostAndPort.getPort()), + "hive.s3.path-style-access", "true")); + } + + @BeforeClass + @Override + public void init() + throws Exception + { + this.dockerizedS3DataLake = new IcebergMinIODataLake(bucketName, WAREHOUSE_DATA_DIR); + this.dockerizedS3DataLake.start(); + hostAndPort = this.dockerizedS3DataLake.getMinio().getMinioApiEndpoint(); + super.init(); + } + + @AfterClass + public void tearDown() + { + if (dockerizedS3DataLake != null) { + dockerizedS3DataLake.stop(); + } + } + + protected boolean supportsViews() + { + return false; + } + + @Override + public void testRenameTable() + { + // Rename table are not supported by hadoop catalog + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java index bcb41f0f87eb0..8e1ae66d31a6c 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeHadoop.java @@ -24,13 +24,12 @@ import com.facebook.presto.iceberg.IcebergUtil; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.SchemaTableName; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.Table; import org.testng.annotations.Test; -import java.io.File; import java.io.IOException; import java.nio.file.Files; -import java.nio.file.Path; import static com.facebook.presto.iceberg.CatalogType.HADOOP; import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; @@ -117,15 +116,15 @@ public void testPartitionedTableWithSpecifiedWriteDataLocation() @Override protected String getLocation(String schema, String table) { - File tempLocation = getCatalogDirectory().toFile(); - return format("%s%s/%s", tempLocation.toURI(), schema, table); + Path tempLocation = getCatalogDirectory(); + return format("%s%s/%s", tempLocation.toUri(), schema, table); } @Override protected Path getCatalogDirectory() { - Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); - Path catalogDirectory = getIcebergDataDirectoryPath(dataDirectory, HADOOP.name(), new IcebergConfig().getFileFormat(), false); + java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); + Path catalogDirectory = new Path(getIcebergDataDirectoryPath(dataDirectory, HADOOP.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI()); return catalogDirectory; } @@ -134,7 +133,7 @@ protected Table getIcebergTable(ConnectorSession session, String schema, String { IcebergConfig icebergConfig = new IcebergConfig(); icebergConfig.setCatalogType(HADOOP); - icebergConfig.setCatalogWarehouse(getCatalogDirectory().toFile().getPath()); + icebergConfig.setCatalogWarehouse(getCatalogDirectory().toString()); IcebergNativeCatalogFactory catalogFactory = new IcebergNativeCatalogFactory(icebergConfig, new IcebergCatalogName(ICEBERG_CATALOG), diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeOnS3Hadoop.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeOnS3Hadoop.java new file mode 100644 index 0000000000000..03d8a23bfa66f --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergSmokeOnS3Hadoop.java @@ -0,0 +1,461 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.hadoop; + +import com.facebook.presto.Session; +import com.facebook.presto.hive.gcs.HiveGcsConfig; +import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer; +import com.facebook.presto.hive.s3.HiveS3Config; +import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater; +import com.facebook.presto.iceberg.FileFormat; +import com.facebook.presto.iceberg.IcebergCatalogName; +import com.facebook.presto.iceberg.IcebergConfig; +import com.facebook.presto.iceberg.IcebergDistributedSmokeTestBase; +import com.facebook.presto.iceberg.IcebergNativeCatalogFactory; +import com.facebook.presto.iceberg.IcebergQueryRunner; +import com.facebook.presto.iceberg.IcebergUtil; +import com.facebook.presto.iceberg.container.IcebergMinIODataLake; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.testing.MaterializedResult; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Table; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.net.URI; + +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.ACCESS_KEY; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.SECRET_KEY; +import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; +import static com.google.common.collect.Iterables.getOnlyElement; +import static com.google.common.io.Files.createTempDir; +import static java.lang.String.format; +import static java.util.Locale.ENGLISH; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; + +public class TestIcebergSmokeOnS3Hadoop + extends IcebergDistributedSmokeTestBase +{ + static final String WAREHOUSE_DATA_DIR = "warehouse_data/"; + final String bucketName; + final String catalogWarehouseDir; + + private IcebergMinIODataLake dockerizedS3DataLake; + HostAndPort hostAndPort; + + public TestIcebergSmokeOnS3Hadoop() + { + super(HADOOP); + bucketName = "forhadoop-" + randomTableSuffix(); + catalogWarehouseDir = createTempDir().toURI().toString(); + } + + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.createIcebergQueryRunner(ImmutableMap.of(), HADOOP, + ImmutableMap.of( + "iceberg.catalog.warehouse", catalogWarehouseDir, + "iceberg.catalog.warehouse.datadir", getCatalogDataDirectory().toString(), + "hive.s3.use-instance-credentials", "false", + "hive.s3.aws-access-key", ACCESS_KEY, + "hive.s3.aws-secret-key", SECRET_KEY, + "hive.s3.endpoint", format("http://%s:%s", hostAndPort.getHost(), hostAndPort.getPort()), + "hive.s3.path-style-access", "true")); + } + + @BeforeClass + @Override + public void init() + throws Exception + { + this.dockerizedS3DataLake = new IcebergMinIODataLake(bucketName, WAREHOUSE_DATA_DIR); + this.dockerizedS3DataLake.start(); + hostAndPort = this.dockerizedS3DataLake.getMinio().getMinioApiEndpoint(); + super.init(); + } + + @AfterClass + public void tearDown() + { + if (dockerizedS3DataLake != null) { + dockerizedS3DataLake.stop(); + } + } + + @Test + public void testShowCreateTableWithSpecifiedWriteDataLocation() + { + String tableName = "test_table_with_specified_write_data_location"; + String dataWriteLocation = getPathBasedOnDataDirectory("test-" + randomTableSuffix()); + try { + assertUpdate(format("CREATE TABLE %s(a int, b varchar) with (write_data_path = '%s')", tableName, dataWriteLocation)); + String schemaName = getSession().getSchema().get(); + String location = getLocation(schemaName, tableName); + assertThat(computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue()) + .isEqualTo(format("CREATE TABLE iceberg.%s.%s (\n" + + " \"a\" integer,\n" + + " \"b\" varchar\n" + + ")\n" + + "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'PARQUET',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " write_data_path = '%s'\n" + + ")", schemaName, tableName, location, dataWriteLocation)); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + tableName); + } + } + + @Test + public void testTableWithSpecifiedWriteDataLocation() + { + String tableName = "test_table_with_specified_write_data_location2"; + String dataWriteLocation = getPathBasedOnDataDirectory("test-" + randomTableSuffix()); + try { + assertUpdate(format("create table %s(a int, b varchar) with (write_data_path = '%s')", tableName, dataWriteLocation)); + assertUpdate(format("insert into %s values(1, '1001'), (2, '1002'), (3, '1003')", tableName), 3); + assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003')"); + assertUpdate(format("delete from %s where a > 2", tableName), 1); + assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002')"); + } + finally { + assertUpdate("drop table if exists " + tableName); + } + } + + @Test + public void testPartitionedTableWithSpecifiedWriteDataLocation() + { + String tableName = "test_table_with_specified_write_data_location3"; + String dataWriteLocation = getPathBasedOnDataDirectory("test-" + randomTableSuffix()); + try { + assertUpdate(format("create table %s(a int, b varchar) with (partitioning = ARRAY['a'], write_data_path = '%s')", tableName, dataWriteLocation)); + assertUpdate(format("insert into %s values(1, '1001'), (2, '1002'), (3, '1003')", tableName), 3); + assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002'), (3, '1003')"); + assertUpdate(format("delete from %s where a > 2", tableName), 1); + assertQuery("select * from " + tableName, "values(1, '1001'), (2, '1002')"); + } + finally { + assertUpdate("drop table if exists " + tableName); + } + } + + @Override + protected void testCreatePartitionedTableAs(Session session, FileFormat fileFormat) + { + @Language("SQL") String createTable = "" + + "CREATE TABLE test_create_partitioned_table_as_" + fileFormat.toString().toLowerCase(ENGLISH) + " " + + "WITH (" + + "format = '" + fileFormat + "', " + + "partitioning = ARRAY['ORDER_STATUS', 'Ship_Priority', 'Bucket(order_key,9)']" + + ") " + + "AS " + + "SELECT orderkey AS order_key, shippriority AS ship_priority, orderstatus AS order_status " + + "FROM tpch.tiny.orders"; + + assertUpdate(session, createTable, "SELECT count(*) from orders"); + + String createTableSql = format("" + + "CREATE TABLE %s.%s.%s (\n" + + " \"order_key\" bigint,\n" + + " \"ship_priority\" integer,\n" + + " \"order_status\" varchar\n" + + ")\n" + + "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = '" + fileFormat + "',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " partitioning = ARRAY['order_status','ship_priority','bucket(order_key, 9)'],\n" + + " write_data_path = '%s'\n" + + ")", + getSession().getCatalog().get(), + getSession().getSchema().get(), + "test_create_partitioned_table_as_" + fileFormat.toString().toLowerCase(ENGLISH), + getLocation(getSession().getSchema().get(), "test_create_partitioned_table_as_" + fileFormat.toString().toLowerCase(ENGLISH)), + getPathBasedOnDataDirectory(getSession().getSchema().get() + "/test_create_partitioned_table_as_" + fileFormat.toString().toLowerCase(ENGLISH))); + + MaterializedResult actualResult = computeActual("SHOW CREATE TABLE test_create_partitioned_table_as_" + fileFormat.toString().toLowerCase(ENGLISH)); + assertEquals(getOnlyElement(actualResult.getOnlyColumnAsSet()), createTableSql); + + assertQuery(session, "SELECT * from test_create_partitioned_table_as_" + fileFormat.toString().toLowerCase(ENGLISH), "SELECT orderkey, shippriority, orderstatus FROM orders"); + + dropTable(session, "test_create_partitioned_table_as_" + fileFormat.toString().toLowerCase(ENGLISH)); + } + + @Override + protected void testCreateTableLike() + { + Session session = getSession(); + String schemaName = session.getSchema().get(); + + assertUpdate(session, "CREATE TABLE test_create_table_like_original (col1 INTEGER, aDate DATE) WITH(format = 'PARQUET', partitioning = ARRAY['aDate'])"); + assertEquals(getTablePropertiesString("test_create_table_like_original"), format("WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'PARQUET',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " partitioning = ARRAY['adate'],\n" + + " write_data_path = '%s'\n" + + ")", + getLocation(schemaName, "test_create_table_like_original"), + getPathBasedOnDataDirectory(schemaName + "/test_create_table_like_original"))); + + assertUpdate(session, "CREATE TABLE test_create_table_like_copy0 (LIKE test_create_table_like_original, col2 INTEGER)"); + assertUpdate(session, "INSERT INTO test_create_table_like_copy0 (col1, aDate, col2) VALUES (1, CAST('1950-06-28' AS DATE), 3)", 1); + assertQuery(session, "SELECT * from test_create_table_like_copy0", "VALUES(1, CAST('1950-06-28' AS DATE), 3)"); + dropTable(session, "test_create_table_like_copy0"); + + assertUpdate(session, "CREATE TABLE test_create_table_like_copy1 (LIKE test_create_table_like_original)"); + assertEquals(getTablePropertiesString("test_create_table_like_copy1"), format("WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'PARQUET',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " write_data_path = '%s'\n" + + ")", + getLocation(schemaName, "test_create_table_like_copy1"), + getPathBasedOnDataDirectory(schemaName + "/test_create_table_like_copy1"))); + dropTable(session, "test_create_table_like_copy1"); + + assertUpdate(session, "CREATE TABLE test_create_table_like_copy2 (LIKE test_create_table_like_original EXCLUDING PROPERTIES)"); + assertEquals(getTablePropertiesString("test_create_table_like_copy2"), format("WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'PARQUET',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " write_data_path = '%s'\n" + + ")", + getLocation(schemaName, "test_create_table_like_copy2"), + getPathBasedOnDataDirectory(schemaName + "/test_create_table_like_copy2"))); + dropTable(session, "test_create_table_like_copy2"); + + assertUpdate(session, "CREATE TABLE test_create_table_like_copy5 (LIKE test_create_table_like_original INCLUDING PROPERTIES)" + + " WITH (location = '', write_data_path = '', format = 'ORC')"); + assertEquals(getTablePropertiesString("test_create_table_like_copy5"), format("WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'ORC',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " partitioning = ARRAY['adate'],\n" + + " write_data_path = '%s'\n" + + ")", + getLocation(schemaName, "test_create_table_like_copy5"), + getPathBasedOnDataDirectory(schemaName + "/test_create_table_like_copy5"))); + dropTable(session, "test_create_table_like_copy5"); + + assertQueryFails(session, "CREATE TABLE test_create_table_like_copy6 (LIKE test_create_table_like_original INCLUDING PROPERTIES)", + "Cannot set a custom location for a path-based table.*"); + + dropTable(session, "test_create_table_like_original"); + } + + @Override + protected void testCreateTableWithFormatVersion(String formatVersion, String defaultDeleteMode) + { + @Language("SQL") String createTable = "" + + "CREATE TABLE test_create_table_with_format_version_" + formatVersion + " " + + "WITH (" + + "format = 'PARQUET', " + + "format_version = '" + formatVersion + "'" + + ") " + + "AS " + + "SELECT orderkey AS order_key, shippriority AS ship_priority, orderstatus AS order_status " + + "FROM tpch.tiny.orders"; + + Session session = getSession(); + + assertUpdate(session, createTable, "SELECT count(*) from orders"); + + String createTableSql = format("" + + "CREATE TABLE %s.%s.%s (\n" + + " \"order_key\" bigint,\n" + + " \"ship_priority\" integer,\n" + + " \"order_status\" varchar\n" + + ")\n" + + "WITH (\n" + + " delete_mode = '%s',\n" + + " format = 'PARQUET',\n" + + " format_version = '%s',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " write_data_path = '%s'\n" + + ")", + getSession().getCatalog().get(), + getSession().getSchema().get(), + "test_create_table_with_format_version_" + formatVersion, + defaultDeleteMode, + formatVersion, + getLocation(getSession().getSchema().get(), "test_create_table_with_format_version_" + formatVersion), + getPathBasedOnDataDirectory(getSession().getSchema().get() + "/test_create_table_with_format_version_" + formatVersion)); + + MaterializedResult actualResult = computeActual("SHOW CREATE TABLE test_create_table_with_format_version_" + formatVersion); + assertEquals(getOnlyElement(actualResult.getOnlyColumnAsSet()), createTableSql); + + dropTable(session, "test_create_table_with_format_version_" + formatVersion); + } + + @Override + public void testShowCreateTable() + { + String schemaName = getSession().getSchema().get(); + assertThat(computeActual("SHOW CREATE TABLE orders").getOnlyValue()) + .isEqualTo(format("CREATE TABLE iceberg.%s.orders (\n" + + " \"orderkey\" bigint,\n" + + " \"custkey\" bigint,\n" + + " \"orderstatus\" varchar,\n" + + " \"totalprice\" double,\n" + + " \"orderdate\" date,\n" + + " \"orderpriority\" varchar,\n" + + " \"clerk\" varchar,\n" + + " \"shippriority\" integer,\n" + + " \"comment\" varchar\n" + + ")\n" + + "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'PARQUET',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " write_data_path = '%s'\n" + + ")", + schemaName, + getLocation(schemaName, "orders"), + getPathBasedOnDataDirectory(schemaName + "/orders"))); + } + + @Test + public void testTableComments() + { + Session session = getSession(); + String schemaName = session.getSchema().get(); + + @Language("SQL") String createTable = "" + + "CREATE TABLE iceberg.%s.test_table_comments (\n" + + " \"_x\" bigint\n" + + ")\n" + + "COMMENT '%s'\n" + + "WITH (\n" + + " format = 'ORC',\n" + + " format_version = '2'\n" + + ")"; + + assertUpdate(format(createTable, schemaName, "test table comment")); + + String createTableTemplate = "" + + "CREATE TABLE iceberg.%s.test_table_comments (\n" + + " \"_x\" bigint\n" + + ")\n" + + "COMMENT '%s'\n" + + "WITH (\n" + + " delete_mode = 'merge-on-read',\n" + + " format = 'ORC',\n" + + " format_version = '2',\n" + + " location = '%s',\n" + + " metadata_delete_after_commit = false,\n" + + " metadata_previous_versions_max = 100,\n" + + " metrics_max_inferred_column = 100,\n" + + " write_data_path = '%s'\n" + + ")"; + String createTableSql = format(createTableTemplate, schemaName, "test table comment", + getLocation(schemaName, "test_table_comments"), + getPathBasedOnDataDirectory(schemaName + "/test_table_comments")); + + MaterializedResult resultOfCreate = computeActual("SHOW CREATE TABLE test_table_comments"); + assertEquals(getOnlyElement(resultOfCreate.getOnlyColumnAsSet()), createTableSql); + + dropTable(session, "test_table_comments"); + } + + @Override + protected String getLocation(String schema, String table) + { + Path tempLocation = getCatalogDirectory(); + return format("%s/%s/%s", tempLocation.toUri(), schema, table); + } + + @Override + protected Table getIcebergTable(ConnectorSession session, String schema, String tableName) + { + IcebergConfig icebergConfig = new IcebergConfig(); + icebergConfig.setCatalogType(HADOOP); + icebergConfig.setCatalogWarehouse(getCatalogDirectory().toString()); + + HiveS3Config hiveS3Config = new HiveS3Config() + .setS3AwsAccessKey(ACCESS_KEY) + .setS3AwsSecretKey(SECRET_KEY) + .setS3PathStyleAccess(true) + .setS3Endpoint(format("http://%s:%s", hostAndPort.getHost(), hostAndPort.getPort())) + .setS3UseInstanceCredentials(false); + + IcebergNativeCatalogFactory catalogFactory = new IcebergNativeCatalogFactory(icebergConfig, + new IcebergCatalogName(ICEBERG_CATALOG), + new PrestoS3ConfigurationUpdater(hiveS3Config), + new HiveGcsConfigurationInitializer(new HiveGcsConfig())); + + return IcebergUtil.getNativeIcebergTable(catalogFactory, + session, + SchemaTableName.valueOf(schema + "." + tableName)); + } + + protected Path getCatalogDirectory() + { + return new Path(catalogWarehouseDir); + } + + private Path getCatalogDataDirectory() + { + return new Path(URI.create(format("s3://%s/%s", bucketName, WAREHOUSE_DATA_DIR))); + } + + private String getPathBasedOnDataDirectory(String name) + { + return new Path(getCatalogDataDirectory(), name).toString(); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java index 3954524b959b2..fdb1f7b487152 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java @@ -83,7 +83,7 @@ protected Table loadTable(String tableName) protected ExtendedHiveMetastore getFileHiveMetastore() { FileHiveMetastore fileHiveMetastore = new FileHiveMetastore(getHdfsEnvironment(), - getCatalogDirectory().getPath(), + getCatalogDirectory().toString(), "test"); return memoizeMetastore(fileHiveMetastore, false, 1000, 0); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java index d83095286dfa7..c4cf8cc916bfb 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java @@ -69,7 +69,7 @@ protected static HdfsEnvironment getHdfsEnvironment() protected ExtendedHiveMetastore getFileHiveMetastore() { FileHiveMetastore fileHiveMetastore = new FileHiveMetastore(getHdfsEnvironment(), - getCatalogDirectory().toFile().getPath(), + getCatalogDirectory().toString(), "test"); return memoizeMetastore(fileHiveMetastore, false, 1000, 0); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergDistributedNessie.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergDistributedNessie.java index d9a5884ad52c2..c72612e799fa7 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergDistributedNessie.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergDistributedNessie.java @@ -18,11 +18,11 @@ import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.testing.containers.NessieContainer; import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.fs.Path; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.io.File; import java.util.Map; import static com.facebook.presto.iceberg.CatalogType.NESSIE; @@ -43,7 +43,7 @@ protected TestIcebergDistributedNessie() @Override protected Map getProperties() { - File metastoreDir = getCatalogDirectory(); + Path metastoreDir = getCatalogDirectory(); return ImmutableMap.of("warehouse", metastoreDir.toString(), "uri", nessieContainer.getRestApiUri()); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java index efebec74b12b5..6a36d204c56df 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSmokeNessie.java @@ -103,7 +103,7 @@ protected Table getIcebergTable(ConnectorSession session, String schema, String { IcebergConfig icebergConfig = new IcebergConfig(); icebergConfig.setCatalogType(NESSIE); - icebergConfig.setCatalogWarehouse(getCatalogDirectory().toFile().getPath()); + icebergConfig.setCatalogWarehouse(getCatalogDirectory().toString()); IcebergNessieConfig nessieConfig = new IcebergNessieConfig().setServerUri(nessieContainer.getRestApiUri()); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java index 4baf4560b28c6..193fc0a22975d 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java @@ -20,6 +20,9 @@ import com.facebook.airlift.node.NodeInfo; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.MetastoreClientConfig; +import com.facebook.presto.hive.s3.HiveS3Config; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.testing.TestingConnectorSession; import com.google.common.collect.ImmutableList; @@ -62,7 +65,7 @@ public static Map restConnectorProperties(String serverUri) public static TestingHttpServer getRestServer(String location) { JdbcCatalog backingCatalog = new JdbcCatalog(); - HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(); + HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(new HiveClientConfig(), new MetastoreClientConfig(), new HiveS3Config()); backingCatalog.setConf(hdfsEnvironment.getConfiguration(new HdfsContext(SESSION), new Path(location))); Map properties = ImmutableMap.builder()