Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@

<dependencies>

<!-- https://mvnrepository.com/artifact/io.github.jbellis/jvector -->
<dependency>
<groupId>io.github.jbellis</groupId>
<artifactId>jvector</artifactId>
<version>4.0.0-rc.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>concurrent</artifactId>
Expand Down Expand Up @@ -752,11 +763,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -771,6 +777,7 @@
<ignoredNonTestScopedDependency>org.apache.httpcomponents.core5:httpcore5</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>com.amazonaws:aws-java-sdk-s3</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>com.amazonaws:aws-java-sdk-core</ignoredNonTestScopedDependency>
<ignoredNonTestScopedDependency>org.apache.commons:commons-math3</ignoredNonTestScopedDependency>
</ignoredNonTestScopedDependencies>
<!-- <ignoredUsedUndeclaredDependencies>-->
<!-- &lt;!&ndash; Ignore these because they are picked up as false-positives when configuring logging in the IcebergQueryRunner &ndash;&gt;-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.facebook.presto.iceberg.changelog.ChangelogOperation;
import com.facebook.presto.iceberg.changelog.ChangelogUtil;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.iceberg.tvf.ApproxNearestNeighborsFunction;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
Expand Down Expand Up @@ -60,7 +61,9 @@
import com.facebook.presto.spi.connector.ConnectorTableVersion;
import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionOperator;
import com.facebook.presto.spi.connector.ConnectorTableVersion.VersionType;
import com.facebook.presto.spi.connector.TableFunctionApplicationResult;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.function.table.ConnectorTableFunctionHandle;
import com.facebook.presto.spi.plan.FilterStatsCalculatorService;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.RowExpressionService;
Expand Down Expand Up @@ -259,6 +262,7 @@ public abstract class IcebergAbstractMetadata
protected Transaction transaction;
protected final StatisticsFileCache statisticsFileCache;
protected final IcebergTableProperties tableProperties;
protected final IcebergConfig icebergConfig;

private final StandardFunctionResolution functionResolution;
private final ConcurrentMap<SchemaTableName, Table> icebergTables = new ConcurrentHashMap<>();
Expand All @@ -272,7 +276,8 @@ public IcebergAbstractMetadata(
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
StatisticsFileCache statisticsFileCache,
IcebergTableProperties tableProperties)
IcebergTableProperties tableProperties,
IcebergConfig icebergConfig)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
Expand All @@ -283,6 +288,7 @@ public IcebergAbstractMetadata(
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.statisticsFileCache = requireNonNull(statisticsFileCache, "statisticsFileCache is null");
this.tableProperties = requireNonNull(tableProperties, "tableProperties is null");
this.icebergConfig = requireNonNull(icebergConfig, "icebergConfig is null");
}

protected final Table getIcebergTable(ConnectorSession session, SchemaTableName schemaTableName)
Expand Down Expand Up @@ -1424,6 +1430,11 @@ protected Optional<String> getDataLocationBasedOnWarehouseDataDir(SchemaTableNam
return Optional.empty();
}

public TypeManager getTypeManager()
{
return typeManager;
}

@Override
public Optional<Object> getInfo(ConnectorTableLayoutHandle tableHandle)
{
Expand Down Expand Up @@ -1765,4 +1776,28 @@ private boolean viewExists(ConnectorSession session, ConnectorTableMetadata view
return false;
}
}

@Override
public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle)
{
if (!icebergConfig.isSimilaritySearchEnabled()) {
return Optional.empty();
}
if (handle instanceof ApproxNearestNeighborsFunction.IcebergAnnTableFunctionHandle) {
ApproxNearestNeighborsFunction.IcebergAnnTableFunctionHandle annTableFunctionHandle = (ApproxNearestNeighborsFunction.IcebergAnnTableFunctionHandle) handle;
ApproxNearestNeighborsFunction.IcebergAnnTableHandle originalHandle = (ApproxNearestNeighborsFunction.IcebergAnnTableHandle) annTableFunctionHandle.getTableHandle();
SchemaTableName schemaTableName = new SchemaTableName(originalHandle.getSchemaName(), originalHandle.getTableName());
Table icebergTable = getIcebergTable(session, schemaTableName);
Optional<String> tableLocation = tryGetLocation(icebergTable);
ApproxNearestNeighborsFunction.IcebergAnnTableHandle updatedHandle = new ApproxNearestNeighborsFunction.IcebergAnnTableHandle(
originalHandle.getInputVector(),
originalHandle.getLimit(),
originalHandle.getSchemaName(),
originalHandle.getTableName(),
tableLocation);
return Optional.of(new TableFunctionApplicationResult<>(updatedHandle, annTableFunctionHandle.getColumnHandles()));
}

throw new IllegalArgumentException("Unsupported function handle: " + handle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.facebook.presto.hive.gcs.HiveGcsConfigurationInitializer;
import com.facebook.presto.iceberg.nessie.IcebergNessieConfig;
import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizerProvider;
import com.facebook.presto.iceberg.procedure.BuildVectorIndexProcedure;
import com.facebook.presto.iceberg.procedure.ExpireSnapshotsProcedure;
import com.facebook.presto.iceberg.procedure.FastForwardBranchProcedure;
import com.facebook.presto.iceberg.procedure.ManifestFileCacheInvalidationProcedure;
Expand All @@ -56,6 +57,7 @@
import com.facebook.presto.iceberg.procedure.UnregisterTableProcedure;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.iceberg.statistics.StatisticsFileCacheKey;
import com.facebook.presto.iceberg.tvf.ApproxNearestNeighborsFunction;
import com.facebook.presto.orc.CachingStripeMetadataSource;
import com.facebook.presto.orc.DwrfAwareStripeMetadataSourceFactory;
import com.facebook.presto.orc.EncryptionLibrary;
Expand Down Expand Up @@ -83,6 +85,7 @@
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.facebook.presto.spi.procedure.BaseProcedure;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.google.common.cache.Cache;
Expand Down Expand Up @@ -188,6 +191,9 @@ protected void setup(Binder binder)
procedures.addBinding().toProvider(SetTablePropertyProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(StatisticsFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(ManifestFileCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
if (icebergConfig.isSimilaritySearchEnabled()) {
procedures.addBinding().toProvider(BuildVectorIndexProcedure.class).in(Scopes.SINGLETON);
}

// for orc
binder.bind(EncryptionLibrary.class).annotatedWith(HiveDwrfEncryptionProvider.ForCryptoService.class).to(UnsupportedEncryptionLibrary.class).in(Scopes.SINGLETON);
Expand All @@ -198,7 +204,9 @@ protected void setup(Binder binder)
configBinder(binder).bindConfig(OrcFileWriterConfig.class);

configBinder(binder).bindConfig(ParquetCacheConfig.class, connectorId);

if (icebergConfig.isSimilaritySearchEnabled()) {
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(ApproxNearestNeighborsFunction.class).in(Scopes.SINGLETON);
}
binder.bind(ConnectorPlanOptimizerProvider.class).to(IcebergPlanOptimizerProvider.class).in(Scopes.SINGLETON);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class IcebergConfig
private boolean mergeOnReadModeEnabled = true;
private double statisticSnapshotRecordDifferenceWeight;
private boolean pushdownFilterEnabled;
private boolean similaritySearchEnabled;
private boolean deleteAsJoinRewriteEnabled = true;
private int deleteAsJoinRewriteMaxDeleteColumns = 400;
private int rowsForMetadataOptimizationThreshold = 1000;
Expand Down Expand Up @@ -496,4 +497,17 @@ public IcebergConfig setMaterializedViewStoragePrefix(String materializedViewSto
this.materializedViewStoragePrefix = materializedViewStoragePrefix;
return this;
}

public boolean isSimilaritySearchEnabled()
{
return similaritySearchEnabled;
}

@Config("iceberg.similarity-search-enabled")
@ConfigDescription("Enable filter for similarity search")
public IcebergConfig setSimilaritySearchEnabled(boolean similaritySearchEnabled)
{
this.similaritySearchEnabled = similaritySearchEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
import com.facebook.presto.spi.function.table.ConnectorTableFunction;
import com.facebook.presto.spi.procedure.BaseProcedure;
import com.facebook.presto.spi.procedure.DistributedProcedure;
import com.facebook.presto.spi.procedure.Procedure;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class IcebergConnector
private final ConnectorAccessControl accessControl;
private final Set<BaseProcedure<?>> procedures;
private final ConnectorPlanOptimizerProvider planOptimizerProvider;
private final Set<ConnectorTableFunction> connectorTableFunctions;

public IcebergConnector(
LifeCycleManager lifeCycleManager,
Expand All @@ -84,7 +86,8 @@ public IcebergConnector(
List<PropertyMetadata<?>> columnProperties,
ConnectorAccessControl accessControl,
Set<BaseProcedure<?>> procedures,
ConnectorPlanOptimizerProvider planOptimizerProvider)
ConnectorPlanOptimizerProvider planOptimizerProvider,
Set<ConnectorTableFunction> connectorTableFunctions)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
Expand All @@ -101,6 +104,7 @@ public IcebergConnector(
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.procedures = requireNonNull(procedures, "procedures is null");
this.planOptimizerProvider = requireNonNull(planOptimizerProvider, "planOptimizerProvider is null");
this.connectorTableFunctions = connectorTableFunctions;
}

@Override
Expand Down Expand Up @@ -246,4 +250,9 @@ private <T extends BaseProcedure<?>> Set<T> getProcedures(Class<T> clazz)
.map(clazz::cast)
.collect(Collectors.toSet());
}

public Set<ConnectorTableFunction> getTableFunctions()
{
return connectorTableFunctions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,10 @@ public IcebergHiveMetadata(
StatisticsFileCache statisticsFileCache,
ManifestFileCache manifestFileCache,
IcebergTableProperties tableProperties,
ConnectorSystemConfig connectorSystemConfig)
ConnectorSystemConfig connectorSystemConfig,
IcebergConfig icebergConfig)
{
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties);
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties, icebergConfig);
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metastore = requireNonNull(metastore, "metastore is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class IcebergHiveMetadataFactory
final ManifestFileCache manifestFileCache;
final IcebergTableProperties tableProperties;
final ConnectorSystemConfig connectorSystemConfig;
final IcebergConfig icebergConfig;

@Inject
public IcebergHiveMetadataFactory(
Expand All @@ -66,7 +67,8 @@ public IcebergHiveMetadataFactory(
StatisticsFileCache statisticsFileCache,
ManifestFileCache manifestFileCache,
IcebergTableProperties tableProperties,
ConnectorSystemConfig connectorSystemConfig)
ConnectorSystemConfig connectorSystemConfig,
IcebergConfig icebergConfig)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null");
this.metastore = requireNonNull(metastore, "metastore is null");
Expand All @@ -83,6 +85,7 @@ public IcebergHiveMetadataFactory(
this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null");
this.tableProperties = requireNonNull(tableProperties, "icebergTableProperties is null");
this.connectorSystemConfig = requireNonNull(connectorSystemConfig, "connectorSystemConfig is null");
this.icebergConfig = requireNonNull(icebergConfig, "icebergConfig is null");
}

public ConnectorMetadata create()
Expand All @@ -102,6 +105,7 @@ public ConnectorMetadata create()
statisticsFileCache,
manifestFileCache,
tableProperties,
connectorSystemConfig);
connectorSystemConfig,
icebergConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ public IcebergNativeMetadata(
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
StatisticsFileCache statisticsFileCache,
IcebergTableProperties tableProperties)
IcebergTableProperties tableProperties,
IcebergConfig icebergConfig)
{
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties);
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties, icebergConfig);
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.catalogType = requireNonNull(catalogType, "catalogType is null");
this.warehouseDataDir = Optional.ofNullable(catalogFactory.getCatalogWarehouseDataDir());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class IcebergNativeMetadataFactory
final FilterStatsCalculatorService filterStatsCalculatorService;
final StatisticsFileCache statisticsFileCache;
final IcebergTableProperties tableProperties;
final IcebergConfig icebergConfig;

@Inject
public IcebergNativeMetadataFactory(
Expand All @@ -55,7 +56,8 @@ public IcebergNativeMetadataFactory(
NodeVersion nodeVersion,
FilterStatsCalculatorService filterStatsCalculatorService,
StatisticsFileCache statisticsFileCache,
IcebergTableProperties tableProperties)
IcebergTableProperties tableProperties,
IcebergConfig icebergConfig)
{
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand All @@ -68,10 +70,11 @@ public IcebergNativeMetadataFactory(
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
this.statisticsFileCache = requireNonNull(statisticsFileCache, "statisticsFileCache is null");
this.tableProperties = requireNonNull(tableProperties, "tableProperties is null");
this.icebergConfig = requireNonNull(icebergConfig, "icebergConfig is null");
}

public ConnectorMetadata create()
{
return new IcebergNativeMetadata(catalogFactory, typeManager, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, catalogType, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties);
return new IcebergNativeMetadata(catalogFactory, typeManager, functionResolution, rowExpressionService, commitTaskCodec, columnMappingsCodec, catalogType, nodeVersion, filterStatsCalculatorService, statisticsFileCache, tableProperties, icebergConfig);
}
}
Loading
Loading