Skip to content

Commit

Permalink
[Iceberg]Enable setting seperate data write location on table creation
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Jan 18, 2025
1 parent dfc6304 commit 57dc966
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.facebook.presto.spi.statistics.TableStatisticsMetadata;
import com.google.common.base.Functions;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -134,6 +135,7 @@
import static com.facebook.presto.iceberg.IcebergTableProperties.METADATA_PREVIOUS_VERSIONS_MAX;
import static com.facebook.presto.iceberg.IcebergTableProperties.METRICS_MAX_INFERRED_COLUMN;
import static com.facebook.presto.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.WRITE_DATA_LOCATION_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
Expand All @@ -150,7 +152,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.getViewComment;
import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetLocation;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetSchema;
import static com.facebook.presto.iceberg.IcebergUtil.validateTableMode;
Expand Down Expand Up @@ -180,6 +181,7 @@
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;

public abstract class IcebergAbstractMetadata
implements ConnectorMetadata
Expand Down Expand Up @@ -487,7 +489,7 @@ protected ConnectorInsertTableHandle beginIcebergTableInsert(ConnectorSession se
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
getWriteDataLocation(icebergTable),
getFileFormat(icebergTable),
getCompressionCodec(session),
icebergTable.properties());
Expand Down Expand Up @@ -608,6 +610,11 @@ protected ImmutableMap<String, Object> createMetadataProperties(Table icebergTab
properties.put(LOCATION_PROPERTY, icebergTable.location());
}

String writeDataLocation = icebergTable.properties().get(WRITE_DATA_LOCATION);
if (!Strings.isNullOrEmpty(writeDataLocation)) {
properties.put(WRITE_DATA_LOCATION_PROPERTY, writeDataLocation);
}

properties.put(DELETE_MODE, IcebergUtil.getDeleteMode(icebergTable));
properties.put(METADATA_PREVIOUS_VERSIONS_MAX, IcebergUtil.getMetadataPreviousVersionsMax(icebergTable));
properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable));
Expand Down Expand Up @@ -826,7 +833,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
tableName.getSchemaName(),
new IcebergTableName(name.getTableName(), name.getTableType(), tableSnapshotId, name.getChangelogEndSnapshot()),
name.getSnapshotId().isPresent(),
tryGetLocation(table),
Optional.of(getWriteDataLocation(table)),
tryGetProperties(table),
tableSchemaJson,
Optional.empty(),
Expand Down Expand Up @@ -1083,4 +1090,10 @@ else if (tableVersion.getVersionExpressionType() instanceof VarcharType) {
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported table version type: " + tableVersion.getVersionType());
}

protected String getWriteDataLocation(Table table)
{
String writeDataLocation = table.properties().get(WRITE_DATA_LOCATION);
return Strings.isNullOrEmpty(writeDataLocation) ? table.location() : writeDataLocation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import static com.facebook.presto.hive.metastore.MetastoreUtil.isUserDefinedTypeEncodingEnabled;
import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyAndPopulateViews;
import static com.facebook.presto.hive.metastore.Statistics.createComputedStatisticsToPartitionMap;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.HiveTableOperations.STORAGE_FORMAT;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static com.facebook.presto.iceberg.IcebergSchemaProperties.getSchemaLocation;
Expand Down Expand Up @@ -319,7 +320,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
}

FileFormat fileFormat = getFileFormat(tableMetadata.getProperties());
TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, populateTableProperties(tableMetadata, fileFormat, session));
TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, populateTableProperties(tableMetadata, fileFormat, session, HIVE));
transaction = createTableTransaction(tableName, operations, metadata);

return new IcebergOutputTableHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,14 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
schema,
partitionSpec,
targetPath,
populateTableProperties(tableMetadata, fileFormat, session));
populateTableProperties(tableMetadata, fileFormat, session, catalogType));
}
else {
transaction = catalogFactory.getCatalog(session).newCreateTableTransaction(
tableIdentifier,
schema,
partitionSpec,
populateTableProperties(tableMetadata, fileFormat, session));
populateTableProperties(tableMetadata, fileFormat, session, catalogType));
}
}
catch (AlreadyExistsException e) {
Expand All @@ -343,7 +343,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
toPrestoSchema(icebergTable.schema(), typeManager),
toPrestoPartitionSpec(icebergTable.spec(), typeManager),
getColumns(icebergTable.schema(), icebergTable.spec(), typeManager),
icebergTable.location(),
getWriteDataLocation(icebergTable),
fileFormat,
getCompressionCodec(session),
icebergTable.properties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class IcebergTableProperties
public static final String FILE_FORMAT_PROPERTY = "format";
public static final String PARTITIONING_PROPERTY = "partitioning";
public static final String LOCATION_PROPERTY = "location";
public static final String WRITE_DATA_LOCATION_PROPERTY = "write_data_path";
public static final String FORMAT_VERSION = "format_version";
public static final String COMMIT_RETRIES = "commit_retries";
public static final String DELETE_MODE = "delete_mode";
Expand Down Expand Up @@ -78,6 +79,11 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
"File system location URI for the table",
null,
false))
.add(stringProperty(
WRITE_DATA_LOCATION_PROPERTY,
"File system location URI for the table's data",
null,
false))
.add(stringProperty(
FORMAT_VERSION,
"Format version for the table",
Expand Down Expand Up @@ -148,6 +154,11 @@ public static String getTableLocation(Map<String, Object> tableProperties)
return (String) tableProperties.get(LOCATION_PROPERTY);
}

public static String getWriteDataLocation(Map<String, Object> tableProperties)
{
return (String) tableProperties.get(WRITE_DATA_LOCATION_PROPERTY);
}

public static String getFormatVersion(Map<String, Object> tableProperties)
{
return (String) tableProperties.get(FORMAT_VERSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionOperator;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -130,7 +131,6 @@
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES;
import static com.facebook.presto.iceberg.FileContent.fromIcebergFileContent;
import static com.facebook.presto.iceberg.FileFormat.PARQUET;
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_FORMAT_VERSION;
Expand Down Expand Up @@ -197,6 +197,7 @@
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.types.Type.TypeID.BINARY;
import static org.apache.iceberg.types.Type.TypeID.FIXED;
Expand Down Expand Up @@ -498,17 +499,6 @@ public static Optional<Snapshot> tryGetCurrentSnapshot(Table table)
}
}

public static Optional<String> tryGetLocation(Table table)
{
try {
return Optional.ofNullable(table.location());
}
catch (TableNotFoundException e) {
log.warn(String.format("Unable to fetch location for table %s: %s", table.name(), e.getMessage()));
return Optional.empty();
}
}

private static boolean isValidPartitionType(Type type)
{
return type instanceof DecimalType ||
Expand Down Expand Up @@ -1120,9 +1110,20 @@ public void close()
}
}

public static Map<String, String> populateTableProperties(ConnectorTableMetadata tableMetadata, FileFormat fileFormat, ConnectorSession session)
public static Map<String, String> populateTableProperties(ConnectorTableMetadata tableMetadata, FileFormat fileFormat, ConnectorSession session, CatalogType catalogType)
{
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builderWithExpectedSize(5);

String writeDataLocation = IcebergTableProperties.getWriteDataLocation(tableMetadata.getProperties());
if (!Strings.isNullOrEmpty(writeDataLocation)) {
if (catalogType.equals(CatalogType.HADOOP)) {
propertiesBuilder.put(WRITE_DATA_LOCATION, writeDataLocation);
}
else {
throw new PrestoException(NOT_SUPPORTED, "Not support set write_data_path on catalog: " + catalogType);
}
}

Integer commitRetries = getCommitRetries(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries));
Expand Down Expand Up @@ -1249,7 +1250,7 @@ public static String metadataLocation(Table icebergTable)
public static String dataLocation(Table icebergTable)
{
Map<String, String> properties = icebergTable.properties();
String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION);
String dataLocation = properties.get(WRITE_DATA_LOCATION);
if (dataLocation == null) {
dataLocation = properties.get(TableProperties.OBJECT_STORE_PATH);
if (dataLocation == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -150,6 +152,34 @@ public void testShowCreateTable()
")", schemaName, getLocation(schemaName, "orders")));
}

@Test
public void testTableWithSpecifiedWriteDataLocation()
throws IOException
{
String dataWriteLocation = Files.createTempDirectory("test_table_with_specified_write_data_location2").toAbsolutePath().toString();
assertQueryFails(String.format("create table test_table_with_specified_write_data_location2(a int, b varchar) with (write_data_path = '%s')", dataWriteLocation),
"Not support set write_data_path on catalog: " + catalogType);
}

@Test
public void testPartitionedTableWithSpecifiedWriteDataLocation()
throws IOException
{
String dataWriteLocation = Files.createTempDirectory("test_table_with_specified_write_data_location3").toAbsolutePath().toString();
assertQueryFails(String.format("create table test_table_with_specified_write_data_location3(a int, b varchar) with (write_data_path = '%s')", dataWriteLocation),
"Not support set write_data_path on catalog: " + catalogType);
}

@Test
public void testShowCreateTableWithSpecifiedWriteDataLocation()
throws IOException
{
String tableName = "test_table_with_specified_write_data_location";
String dataWriteLocation = java.nio.file.Files.createTempDirectory("test1").toAbsolutePath().toString();
assertQueryFails(format("CREATE TABLE %s(a int, b varchar) with (write_data_path = '%s')", tableName, dataWriteLocation),
"Not support set write_data_path on catalog: " + catalogType);
}

@Test
public void testDecimal()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,25 @@ public void testSessionPropertiesInManuallyStartedTransaction()
}
}

protected void checkTableProperties(String schemaName, String tableName, String deleteMode, String dataWriteLocation)
{
checkTableProperties(schemaName, tableName, deleteMode, 8, ImmutableMap.of("write.data.path", dataWriteLocation));
}

protected void checkTableProperties(String tableName, String deleteMode)
{
assertQuery(String.format("SHOW COLUMNS FROM test_schema.\"%s$properties\"", tableName),
checkTableProperties("test_schema", tableName, deleteMode, 7, ImmutableMap.of());
}

protected void checkTableProperties(String schemaName, String tableName, String deleteMode, int propertiesCount, Map<String, String> additionalValidateProperties)
{
assertQuery(String.format("SHOW COLUMNS FROM %s.\"%s$properties\"", schemaName, tableName),
"VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')");
assertQuery(String.format("SELECT COUNT(*) FROM test_schema.\"%s$properties\"", tableName), "VALUES 7");
assertQuery(String.format("SELECT COUNT(*) FROM %s.\"%s$properties\"", schemaName, tableName), "VALUES " + propertiesCount);
List<MaterializedRow> materializedRows = computeActual(getSession(),
String.format("SELECT * FROM test_schema.\"%s$properties\"", tableName)).getMaterializedRows();
String.format("SELECT * FROM %s.\"%s$properties\"", schemaName, tableName)).getMaterializedRows();

assertThat(materializedRows).hasSize(7);
assertThat(materializedRows).hasSize(propertiesCount);
assertThat(materializedRows)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.delete.mode", deleteMode)))
Expand All @@ -278,6 +288,11 @@ protected void checkTableProperties(String tableName, String deleteMode)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.delete-after-commit.enabled", "false")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.metrics.max-inferred-column-defaults", "100")));

additionalValidateProperties.entrySet().stream()
.forEach(entry -> assertThat(materializedRows)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, entry.getKey(), entry.getValue()))));
}

protected void checkORCFormatTableProperties(String tableName, String deleteMode)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.testing.QueryRunner;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.io.IOException;
import java.nio.file.Files;

import static com.facebook.presto.iceberg.CatalogType.HADOOP;

public class TestIcebergSystemTablesHadoop
extends TestIcebergSystemTables
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return IcebergQueryRunner.createIcebergQueryRunner(ImmutableMap.of(), HADOOP, ImmutableMap.of());
}

@Test
public void testPropertiesTableWithSpecifiedDataWriteLocation()
throws IOException
{
String dataLocation = Files.createTempDirectory("test_table_with_write_data_location").toAbsolutePath().toString();
assertUpdate("CREATE SCHEMA test_schema_temp");
try {
assertUpdate(String.format("CREATE TABLE test_schema_temp.test_table_with_write_data_location (_bigint BIGINT, _date DATE) WITH (partitioning = ARRAY['_date'], write_data_path = '%s')", dataLocation));
checkTableProperties("test_schema_temp", "test_table_with_write_data_location", "merge-on-read", dataLocation);
}
finally {
assertUpdate("DROP TABLE test_schema_temp.test_table_with_write_data_location");
assertUpdate("DROP SCHEMA test_schema_temp");
}
}
}
Loading

0 comments on commit 57dc966

Please sign in to comment.