Skip to content

Commit

Permalink
[Iceberg]Support setting warehouse data directory for Hadoop catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Jan 18, 2025
1 parent 57dc966 commit 0279ea7
Show file tree
Hide file tree
Showing 19 changed files with 931 additions and 46 deletions.
22 changes: 22 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,28 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class IcebergConfig
private HiveCompressionCodec compressionCodec = GZIP;
private CatalogType catalogType = HIVE;
private String catalogWarehouse;
private String catalogWarehouseDataDir;
private int catalogCacheSize = 10;
private int maxPartitionsPerWriter = 100;
private List<String> hadoopConfigResources = ImmutableList.of();
Expand Down Expand Up @@ -127,6 +128,19 @@ public IcebergConfig setCatalogWarehouse(String catalogWarehouse)
return this;
}

public String getCatalogWarehouseDataDir()
{
return catalogWarehouseDataDir;
}

@Config("iceberg.catalog.warehouse.datadir")
@ConfigDescription("Iceberg catalog default root data writing directory")
public IcebergConfig setCatalogWarehouseDataDir(String catalogWarehouseDataDir)
{
this.catalogWarehouseDataDir = catalogWarehouseDataDir;
return this;
}

@Min(1)
public int getCatalogCacheSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class IcebergNativeCatalogFactory
private final String catalogName;
protected final CatalogType catalogType;
private final String catalogWarehouse;
private final String catalogWarehouseDataDir;
protected final IcebergConfig icebergConfig;

private final List<String> hadoopConfigResources;
Expand All @@ -69,6 +70,7 @@ public IcebergNativeCatalogFactory(
this.icebergConfig = requireNonNull(config, "config is null");
this.catalogType = config.getCatalogType();
this.catalogWarehouse = config.getCatalogWarehouse();
this.catalogWarehouseDataDir = config.getCatalogWarehouseDataDir();
this.hadoopConfigResources = icebergConfig.getHadoopConfigResources();
this.s3ConfigurationUpdater = requireNonNull(s3ConfigurationUpdater, "s3ConfigurationUpdater is null");
this.gcsConfigurationInitialize = requireNonNull(gcsConfigurationInitialize, "gcsConfigurationInitialize is null");
Expand All @@ -90,6 +92,11 @@ public Catalog getCatalog(ConnectorSession session)
}
}

public String getCatalogWarehouseDataDir()
{
return this.catalogWarehouseDataDir;
}

public SupportsNamespaces getNamespaces(ConnectorSession session)
{
Catalog catalog = getCatalog(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;

import static com.facebook.presto.iceberg.CatalogType.HADOOP;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat;
import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning;
Expand Down Expand Up @@ -83,12 +84,14 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;

public class IcebergNativeMetadata
extends IcebergAbstractMetadata
{
private static final String VIEW_DIALECT = "presto";

private final Optional<String> warehouseDataDir;
private final IcebergNativeCatalogFactory catalogFactory;
private final CatalogType catalogType;
private final ConcurrentMap<SchemaTableName, View> icebergViews = new ConcurrentHashMap<>();
Expand All @@ -107,6 +110,7 @@ public IcebergNativeMetadata(
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache);
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.catalogType = requireNonNull(catalogType, "catalogType is null");
this.warehouseDataDir = Optional.ofNullable(catalogFactory.getCatalogWarehouseDataDir());
}

@Override
Expand Down Expand Up @@ -316,20 +320,30 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
try {
TableIdentifier tableIdentifier = toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled());
String targetPath = getTableLocation(tableMetadata.getProperties());
Map<String, String> tableProperties = populateTableProperties(tableMetadata, fileFormat, session, catalogType);
if (!tableProperties.containsKey(WRITE_DATA_LOCATION)) {
Optional<String> dataLocation = getDataLocationBasedOnWarehouseDataDir(schemaTableName);
if (dataLocation.isPresent()) {
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
tableProperties = propertiesBuilder.putAll(tableProperties)
.put(WRITE_DATA_LOCATION, dataLocation.get())
.build();
}
}
if (!isNullOrEmpty(targetPath)) {
transaction = catalogFactory.getCatalog(session).newCreateTableTransaction(
tableIdentifier,
schema,
partitionSpec,
targetPath,
populateTableProperties(tableMetadata, fileFormat, session, catalogType));
tableProperties);
}
else {
transaction = catalogFactory.getCatalog(session).newCreateTableTransaction(
tableIdentifier,
schema,
partitionSpec,
populateTableProperties(tableMetadata, fileFormat, session, catalogType));
tableProperties);
}
}
catch (AlreadyExistsException e) {
Expand Down Expand Up @@ -379,4 +393,13 @@ public void unregisterTable(ConnectorSession clientSession, SchemaTableName sche
{
catalogFactory.getCatalog(clientSession).dropTable(toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()), false);
}

private Optional<String> getDataLocationBasedOnWarehouseDataDir(SchemaTableName schemaTableName)
{
if (!catalogType.equals(HADOOP)) {
return Optional.empty();
}
return Optional.ofNullable(warehouseDataDir.map(base -> base + schemaTableName.getSchemaName() + "/" + schemaTableName.getTableName())
.orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.testing.assertions.Assert;
import com.facebook.presto.tests.AbstractTestIntegrationSmokeTest;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateProperties;
import org.intellij.lang.annotations.Language;
Expand All @@ -32,7 +33,6 @@

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -738,7 +738,7 @@ private void testSchemaEvolution(Session session, FileFormat fileFormat)
}

@Test
private void testCreateTableLike()
protected void testCreateTableLike()
{
Session session = getSession();
String schemaName = session.getSchema().get();
Expand Down Expand Up @@ -892,7 +892,7 @@ private void testWithAllFormatVersions(BiConsumer<String, String> test)
test.accept("2", "merge-on-read");
}

private String getTablePropertiesString(String tableName)
protected String getTablePropertiesString(String tableName)
{
MaterializedResult showCreateTable = computeActual("SHOW CREATE TABLE " + tableName);
String createTable = (String) getOnlyElement(showCreateTable.getOnlyColumnAsSet());
Expand Down Expand Up @@ -1225,8 +1225,8 @@ protected String getLocation(String schema, String table)

protected Path getCatalogDirectory()
{
Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
return getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false);
java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
return new Path(getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI());
}

protected Table getIcebergTable(ConnectorSession session, String namespace, String tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import com.facebook.presto.hive.HiveHdfsConfiguration;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater;
import com.facebook.presto.hive.s3.S3ConfigurationUpdater;
import com.facebook.presto.iceberg.delete.DeleteFile;
import com.facebook.presto.metadata.CatalogMetadata;
import com.facebook.presto.metadata.Metadata;
Expand Down Expand Up @@ -63,6 +66,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.FileScanTask;
Expand Down Expand Up @@ -96,7 +100,6 @@
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -1679,14 +1682,14 @@ public void testMetadataVersionsMaintainingProperties()
// Table `test_table_with_default_setting_properties`'s current metadata record all 5 previous metadata files
assertEquals(defaultTableMetadata.previousFiles().size(), 5);

FileSystem fileSystem = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), new org.apache.hadoop.fs.Path(settingTable.location()));
FileSystem fileSystem = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), new Path(settingTable.location()));

// Table `test_table_with_setting_properties`'s all existing metadata files count is 2
FileStatus[] settingTableFiles = fileSystem.listStatus(new org.apache.hadoop.fs.Path(settingTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION));
FileStatus[] settingTableFiles = fileSystem.listStatus(new Path(settingTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION));
assertEquals(settingTableFiles.length, 2);

// Table `test_table_with_default_setting_properties`'s all existing metadata files count is 6
FileStatus[] defaultTableFiles = fileSystem.listStatus(new org.apache.hadoop.fs.Path(defaultTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION));
FileStatus[] defaultTableFiles = fileSystem.listStatus(new Path(defaultTable.location(), "metadata"), name -> name.getName().contains(METADATA_FILE_EXTENSION));
assertEquals(defaultTableFiles.length, 6);
}
finally {
Expand Down Expand Up @@ -2261,12 +2264,12 @@ private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List<Fil
private void writePositionDeleteToNationTable(Table icebergTable, String dataFilePath, long deletePos)
throws IOException
{
Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
File metastoreDir = getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile();
org.apache.hadoop.fs.Path metadataDir = new org.apache.hadoop.fs.Path(metastoreDir.toURI());
Path metadataDir = new Path(metastoreDir.toURI());
String deleteFileName = "delete_file_" + randomUUID();
FileSystem fs = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), metadataDir);
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(metadataDir, deleteFileName);
Path path = new Path(metadataDir, deleteFileName);
PositionDeleteWriter<Record> writer = Parquet.writeDeletes(HadoopOutputFile.fromPath(path, fs))
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(icebergTable)
Expand All @@ -2293,13 +2296,13 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map<String, Ob
private void writeEqualityDeleteToNationTable(Table icebergTable, Map<String, Object> overwriteValues, Map<String, Object> partitionValues)
throws Exception
{
Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
File metastoreDir = getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile();
org.apache.hadoop.fs.Path metadataDir = new org.apache.hadoop.fs.Path(metastoreDir.toURI());
Path metadataDir = new Path(metastoreDir.toURI());
String deleteFileName = "delete_file_" + randomUUID();
FileSystem fs = getHdfsEnvironment().getFileSystem(new HdfsContext(SESSION), metadataDir);
Schema deleteRowSchema = icebergTable.schema().select(overwriteValues.keySet());
Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(HadoopOutputFile.fromPath(new org.apache.hadoop.fs.Path(metadataDir, deleteFileName), fs))
Parquet.DeleteWriteBuilder writerBuilder = Parquet.writeDeletes(HadoopOutputFile.fromPath(new Path(metadataDir, deleteFileName), fs))
.forTable(icebergTable)
.rowSchema(deleteRowSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
Expand All @@ -2320,13 +2323,19 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map<String, Ob
icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit();
}

public static HdfsEnvironment getHdfsEnvironment()
protected HdfsEnvironment getHdfsEnvironment()
{
HiveClientConfig hiveClientConfig = new HiveClientConfig();
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig),
ImmutableSet.of(),
hiveClientConfig);
HiveS3Config hiveS3Config = new HiveS3Config();
return getHdfsEnvironment(hiveClientConfig, metastoreClientConfig, hiveS3Config);
}

public static HdfsEnvironment getHdfsEnvironment(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig, HiveS3Config hiveS3Config)
{
S3ConfigurationUpdater s3ConfigurationUpdater = new PrestoS3ConfigurationUpdater(hiveS3Config);
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig, s3ConfigurationUpdater, ignored -> {}),
ImmutableSet.of(), hiveClientConfig);
return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
}

Expand All @@ -2348,18 +2357,18 @@ protected Table loadTable(String tableName)

protected Map<String, String> getProperties()
{
File metastoreDir = getCatalogDirectory();
Path metastoreDir = getCatalogDirectory();
return ImmutableMap.of("warehouse", metastoreDir.toString());
}

protected File getCatalogDirectory()
protected Path getCatalogDirectory()
{
Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
java.nio.file.Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
switch (catalogType) {
case HIVE:
case HADOOP:
case NESSIE:
return getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile();
return new Path(getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI());
}

throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
Expand Down Expand Up @@ -202,13 +203,11 @@ public static DistributedQueryRunner createIcebergQueryRunner(
String catalogType = extraConnectorProperties.getOrDefault("iceberg.catalog.type", HIVE.name());
Path icebergDataDirectory = getIcebergDataDirectoryPath(queryRunner.getCoordinator().getDataDirectory(), catalogType, format, addStorageFormatToPath);

Map<String, String> icebergProperties = ImmutableMap.<String, String>builder()
.put("iceberg.file-format", format.name())
.putAll(getConnectorProperties(CatalogType.valueOf(catalogType), icebergDataDirectory))
.putAll(extraConnectorProperties)
.build();

queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties);
Map<String, String> icebergProperties = new HashMap<>();
icebergProperties.put("iceberg.file-format", format.name());
icebergProperties.putAll(getConnectorProperties(CatalogType.valueOf(catalogType), icebergDataDirectory));
icebergProperties.putAll(extraConnectorProperties);
queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", ImmutableMap.copyOf(icebergProperties));

if (addJmxPlugin) {
queryRunner.installPlugin(new JmxPlugin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void testDefaults()
.setCompressionCodec(GZIP)
.setCatalogType(HIVE)
.setCatalogWarehouse(null)
.setCatalogWarehouseDataDir(null)
.setCatalogCacheSize(10)
.setHadoopConfigResources(null)
.setHiveStatisticsMergeFlags("")
Expand Down Expand Up @@ -81,6 +82,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.compression-codec", "NONE")
.put("iceberg.catalog.type", "HADOOP")
.put("iceberg.catalog.warehouse", "path")
.put("iceberg.catalog.warehouse.datadir", "path_data_dir")
.put("iceberg.catalog.cached-catalog-num", "6")
.put("iceberg.hadoop.config.resources", "/etc/hadoop/conf/core-site.xml")
.put("iceberg.max-partitions-per-writer", "222")
Expand Down Expand Up @@ -110,6 +112,7 @@ public void testExplicitPropertyMappings()
.setCompressionCodec(NONE)
.setCatalogType(HADOOP)
.setCatalogWarehouse("path")
.setCatalogWarehouseDataDir("path_data_dir")
.setCatalogCacheSize(6)
.setHadoopConfigResources("/etc/hadoop/conf/core-site.xml")
.setMaxPartitionsPerWriter(222)
Expand Down
Loading

0 comments on commit 0279ea7

Please sign in to comment.