Skip to content

Conversation

@bibith4
Copy link
Contributor

@bibith4 bibith4 commented Nov 29, 2025

  • build indexes and mappings
  • store indexes and mappings files in s3
  • map node id to row id
  • enable TVF Top ANN K search support

Description

Motivation and Context

Impact

Test Plan

Contributor checklist

  • Please make sure your submission complies with our contributing guide, in particular code style and commit standards.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.
  • If adding new dependencies, verified they have an OpenSSF Scorecard score of 5.0 or higher (or obtained explicit TSC approval for lower scores).

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* ... 
* ... 

Hive Connector Changes
* ... 
* ... 

If release note is NOT required, use:

== NO RELEASE NOTE ==

Dilli Babu Godari and others added 4 commits November 29, 2025 15:39
 - build indexes and mappings
 - store indexes and mappings files in s3
 - map node id to row id
 - enable TVF Top ANN K search support

Co-authored-by: Nivin C S <[email protected]>
Co-authored-by: Shijin K  <[email protected]>
@prestodb-ci prestodb-ci added the from:IBM PR from IBM label Nov 29, 2025
@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Nov 29, 2025

CLA Missing ID CLA Not Signed

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Nov 29, 2025

Reviewer's Guide

Adds approximate nearest neighbor (ANN) similarity search capability to the Iceberg connector using the JVector library, including index building, S3 storage of index and node→row-id mappings, a TVF-based query interface, and connector wiring guarded by a new configuration flag.

Sequence diagram for ANN table function query execution

sequenceDiagram
    actor User
    participant PrestoEngine
    participant IcebergConnector
    participant IcebergMetadata as IcebergAbstractMetadata
    participant SplitManager as IcebergSplitManager
    participant PageSourceProvider as IcebergPageSourceProvider
    participant ANNPageSource
    participant JVectorIndex

    User->>PrestoEngine: SELECT * FROM TABLE(approx_nearest_neighbors(ARRAY[...], 'schema.table.column', 10))
    PrestoEngine->>IcebergConnector: resolve table function approx_nearest_neighbors
    IcebergConnector->>IcebergConnector: getTableFunctions()
    IcebergConnector-->>PrestoEngine: ApproxNearestNeighborsFunction

    PrestoEngine->>ApproxNearestNeighborsFunction: analyze(arguments)
    ApproxNearestNeighborsFunction-->>PrestoEngine: IcebergAnnTableFunctionHandle

    PrestoEngine->>IcebergMetadata: applyTableFunction(handle)
    IcebergMetadata->>IcebergMetadata: check icebergConfig.isSimilaritySearchEnabled()
    IcebergMetadata-->>PrestoEngine: TableFunctionApplicationResult(IcebergAnnTableHandle, columnHandles)

    PrestoEngine->>SplitManager: getSplits(IcebergAnnTableHandle)
    SplitManager->>SplitManager: check icebergConfig.isSimilaritySearchEnabled()
    SplitManager->>SplitManager: create IcebergSplit(ann=true, queryVector, topN)
    SplitManager-->>PrestoEngine: FixedSplitSource(IcebergSplit)

    PrestoEngine->>PageSourceProvider: createPageSource(split with ann=true)
    PageSourceProvider->>PageSourceProvider: if similaritySearchEnabled and split.isAnn()
    PageSourceProvider->>ANNPageSource: new ANNPageSource(FixedPageSource, queryVector, topN, tableLocation, HdfsFileIO)
    PageSourceProvider-->>PrestoEngine: ANNPageSource

    loop scan results
        PrestoEngine->>ANNPageSource: getNextPage()
        ANNPageSource->>JVectorIndex: load index and NodeRowIdMapping from S3 via HdfsFileIO
        ANNPageSource->>JVectorIndex: search topN using queryVector
        JVectorIndex-->>ANNPageSource: nodeIds with scores
        ANNPageSource->>ANNPageSource: map nodeIds to rowIds
        ANNPageSource-->>PrestoEngine: Page(row_id)
    end

    PrestoEngine-->>User: result rows with nearest neighbor row_id values
Loading

Sequence diagram for building a vector index via procedure

sequenceDiagram
    actor User
    participant PrestoEngine
    participant BuildVecProc as BuildVectorIndexProcedure
    participant TxManager as IcebergTransactionManager
    participant MetadataFactory as IcebergMetadataFactory
    participant Metadata as ConnectorMetadata
    participant PageSourceProvider as ConnectorPageSourceProvider
    participant VectorBuilder as IcebergVectorIndexBuilder
    participant JVector
    participant HdfsFileIO
    participant S3

    User->>PrestoEngine: CALL system.CREATE_VEC_INDEX('catalog.schema.table.column')
    PrestoEngine->>BuildVecProc: invoke buildVectorIndex(session, columnPath)

    BuildVecProc->>BuildVecProc: parse catalog, schema, table, column
    BuildVecProc->>MetadataFactory: create()
    MetadataFactory-->>BuildVecProc: ConnectorMetadata

    BuildVecProc->>TxManager: put(transactionHandle, metadata)

    BuildVecProc->>VectorBuilder: buildAndSaveVectorIndex(metadata, pageSourceProvider, transactionHandle, session, schemaTableName, columnName, indexName, catalogName, similarityFunction, m, efConstruction)

    VectorBuilder->>Metadata: getTableHandle(schemaTableName)
    VectorBuilder->>Metadata: getTypeManager()
    VectorBuilder->>Metadata: getIcebergTable(schemaTableName)
    VectorBuilder->>HdfsFileIO: resolve tableLocation and FileIO

    loop scan data files
        VectorBuilder->>PageSourceProvider: createPageSource(split, layout, [vectorColumn, row_id])
        PageSourceProvider-->>VectorBuilder: ConnectorPageSource
        loop pages
            VectorBuilder->>VectorBuilder: read vectors and rowIds into memory
        end
    end

    VectorBuilder->>JVector: normalize vectors and build ImmutableGraphIndex using GraphIndexBuilder
    VectorBuilder->>VectorBuilder: create NodeRowIdMapping(rowIds)

    VectorBuilder->>HdfsFileIO: open temp local files for index and mapping
    VectorBuilder->>VectorBuilder: write OnDiskGraphIndex and mapping to temp files
    VectorBuilder->>S3: upload index and mapping to tableLocation/.vector_index via HdfsOutputFile
    VectorBuilder-->>BuildVecProc: Path to index file

    BuildVecProc->>TxManager: remove(transactionHandle)
    BuildVecProc-->>PrestoEngine: success
    PrestoEngine-->>User: procedure completed
Loading

Updated class diagram for ANN similarity search and index support

classDiagram
    class IcebergConfig {
        - boolean similaritySearchEnabled
        + boolean isSimilaritySearchEnabled()
        + IcebergConfig setSimilaritySearchEnabled(boolean similaritySearchEnabled)
    }

    class IcebergConnector {
        - Set~ConnectorTableFunction~ connectorTableFunctions
        + Set~ConnectorTableFunction~ getTableFunctions()
    }

    class IcebergAbstractMetadata {
        - IcebergConfig icebergConfig
        + TypeManager getTypeManager()
        + Optional~TableFunctionApplicationResult~ applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle)
    }

    class IcebergSplit {
        - boolean ann
        - List~Float~ queryVector
        - int topN
        + boolean isAnn()
        + List~Float~ getQueryVector()
        + int getTopN()
    }

    class IcebergSplitManager {
        - IcebergConfig icebergConfig
        + ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, SplitSchedulingContext splitSchedulingContext)
    }

    class IcebergTableLayoutHandle {
        - Optional~String~ tableLocation
        + Optional~String~ getTableLocation()
    }

    class IcebergTableLayoutHandle_Builder {
        - Optional~String~ tableLocation
        + IcebergTableLayoutHandle_Builder setTableLocation(Optional~String~ tableLocation)
        + IcebergTableLayoutHandle build()
    }

    class ApproxNearestNeighborsFunction {
        <<table_function_provider>>
        + ConnectorTableFunction get()
    }

    class ApproxNearestNeighborsFunction_QueryFunction {
        + TableFunctionAnalysis analyze(ConnectorSession session, ConnectorTransactionHandle transaction, Map~String,Argument~ arguments)
    }

    class ApproxNearestNeighborsFunction_QualifiedNameParts {
        - String schema
        - String table
        - String column
        + String getSchema()
        + String getTable()
        + String getColumn()
    }

    class ApproxNearestNeighborsFunction_IcebergAnnTableHandle {
        - List~Float~ queryVector
        - int limit
        + ApproxNearestNeighborsFunction_IcebergAnnTableHandle(List~Float~ queryVector, int limit, String schema, String table)
        + List~Float~ getInputVector()
        + int getLimit()
    }

    class ApproxNearestNeighborsFunction_IcebergAnnTableFunctionHandle {
        - List~Float~ queryVector
        - int limit
        - ConnectorTableHandle tableHandle
        - List~ColumnHandle~ columnHandles
        + ApproxNearestNeighborsFunction_IcebergAnnTableFunctionHandle(String schema, String table, ScalarArgument inputVector, ScalarArgument limit, List~ColumnHandle~ columnHandles)
        + List~Float~ getInputVector()
        + int getLimit()
        + ConnectorTableHandle getTableHandle()
        + List~ColumnHandle~ getColumnHandles()
    }

    class ANNPageSource {
        - ConnectorPageSource delegate
        - List~Float~ queryVector
        - int topN
        - boolean finished
        - String tableLocation
        - HdfsFileIO hdfsFileIO
        + ANNPageSource(ConnectorPageSource delegate, List~Float~ queryVector, int topN, String tableLocation, HdfsFileIO hdfsFileIO)
        + Page getNextPage()
        + long getCompletedBytes()
        + long getCompletedPositions()
        + long getReadTimeNanos()
        + boolean isFinished()
        + long getSystemMemoryUsage()
        + void close()
    }

    class IcebergVectorIndexBuilder {
        + static Path buildAndSaveVectorIndex(ConnectorMetadata metadata, ConnectorPageSourceProvider pageSourceProvider, ConnectorTransactionHandle transactionHandle, ConnectorSession session, SchemaTableName schemaTableName, String columnName, String indexName, String catalogName, String similarityFunction, int m, int efConstruction)
        - static VectorData readVectorsFromTable(ConnectorMetadata metadata, ConnectorPageSourceProvider pageSourceProvider, ConnectorTransactionHandle transactionHandle, ConnectorSession session, SchemaTableName schemaTableName, String columnName)
        - static IcebergTableLayoutHandle createTableLayoutHandle(IcebergTableHandle tableHandle, List~IcebergColumnHandle~ columns)
        - static VectorSimilarityFunction getVectorSimilarityFunction(String similarityFunction)
    }

    class IcebergVectorIndexBuilder_VectorData {
        - List~float[]~ vectors
        - List~Long~ rowIds
    }

    class ListRandomAccessVectorValues {
        - List~float[]~ vectors
        - int dimension
        - Constructor arrayVectorFloatConstructor
        + ListRandomAccessVectorValues(List~float[]~ vectors, int dimension)
        + int size()
        + int dimension()
        + VectorFloat getVector(int ord)
        + boolean isValueShared()
        + RandomAccessVectorValues copy()
    }

    class CustomVectorFloat {
        - float[] values
        + CustomVectorFloat(float[] values)
        + VectorFloat toArrayVectorFloat()
        + CustomVectorFloat get()
        + float get(int index)
        + void set(int index, float value)
        + int length()
        + void copyFrom(VectorFloat src, int srcOffset, int destOffset, int length)
        + void zero()
        + int getHashCode()
        + long ramBytesUsed()
        + float[] vectorValue()
        + CustomVectorFloat copy()
        + float[] getFloatArray()
    }

    class NodeRowIdMapping {
        - long[] nodeToRowId
        + NodeRowIdMapping(List~Long~ rowIds)
        + long getRowId(int nodeId)
        + int size()
        + void save(OutputStream out)
        + static NodeRowIdMapping load(InputStream in)
    }

    class BuildVectorIndexProcedure {
        - IcebergTransactionManager transactionManager
        - IcebergMetadataFactory metadataFactory
        - ConnectorPageSourceProvider pageSourceProvider
        + BuildVectorIndexProcedure(IcebergTransactionManager transactionManager, IcebergMetadataFactory metadataFactory, ConnectorPageSourceProvider pageSourceProvider)
        + Procedure get()
        + void buildVectorIndex(ConnectorSession session, String columnPath)
    }

    IcebergConfig <|.. IcebergAbstractMetadata
    IcebergConfig <|.. IcebergSplitManager
    IcebergConfig <|.. IcebergConnector

    IcebergConnector o--> ConnectorTableFunction
    IcebergConnector --> ApproxNearestNeighborsFunction

    IcebergAbstractMetadata --> ApproxNearestNeighborsFunction_IcebergAnnTableFunctionHandle : uses
    IcebergAbstractMetadata --> IcebergTableLayoutHandle : setsTableLocation

    IcebergSplitManager --> ApproxNearestNeighborsFunction_IcebergAnnTableHandle : checksInstance
    IcebergSplitManager --> IcebergSplit : createsAnnSplit

    IcebergTableLayoutHandle_Builder --> IcebergTableLayoutHandle : builds

    ApproxNearestNeighborsFunction_QueryFunction --> ApproxNearestNeighborsFunction_IcebergAnnTableFunctionHandle : creates
    ApproxNearestNeighborsFunction_IcebergAnnTableFunctionHandle --> ApproxNearestNeighborsFunction_IcebergAnnTableHandle : wraps

    ANNPageSource --> NodeRowIdMapping : uses
    ANNPageSource --> IcebergVectorIndexBuilder : consumesIndex

    IcebergVectorIndexBuilder --> IcebergVectorIndexBuilder_VectorData : uses
    IcebergVectorIndexBuilder --> NodeRowIdMapping : creates
    IcebergVectorIndexBuilder --> ListRandomAccessVectorValues : uses
    IcebergVectorIndexBuilder --> CustomVectorFloat : uses

    BuildVectorIndexProcedure --> IcebergVectorIndexBuilder : calls
    BuildVectorIndexProcedure --> IcebergTransactionManager : uses
    BuildVectorIndexProcedure --> IcebergMetadataFactory : uses
Loading

File-Level Changes

Change Details Files
Enable ANN query splits and page sources in the Iceberg connector and plumb table location into layouts.
  • Extend IcebergPageSourceProvider to inject metastore-, table- and IO-related dependencies and a similaritySearchEnabled flag from IcebergConfig.
  • Short-circuit createPageSource for ANN splits to return a specialized ANNPageSource that runs vector search over a JVector index stored under the table location.
  • Augment IcebergSplit with ANN-related fields (flag, query vector, topN) plus JSON serialization accessors, and ensure all split producers populate default ANN values for non-ANN flows.
  • Extend IcebergTableLayoutHandle to carry an Optional tableLocation with full JSON support and thread it through filter pushdown and plan optimizer rewrites so the page source can access the table location.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplit.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/changelog/ChangelogSplitSource.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/equalitydeletes/EqualityDeletesSplitSource.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableLayoutHandle.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java
Introduce a table function + page source path for ANN queries that returns row_ids from a vector index.
  • Add ApproxNearestNeighborsFunction TVF that takes a REAL array query vector, a fully-qualified column name, and a limit, and analyzes into an IcebergAnnTableFunctionHandle that wraps an IcebergAnnTableHandle with query vector and limit.
  • Implement ANNPageSource to load an on-disk JVector HNSW index and a NodeRowIdMapping from the table’s .vector_index directory (via HdfsFileIO), execute a top-K search, translate node IDs to row IDs, and return a single BIGINT column page.
  • Wire the TVF into the Iceberg connector by exposing ConnectorTableFunction instances from IcebergConnector, registering them in IcebergCommonModule when similarity search is enabled, and handling the TVF in IcebergAbstractMetadata.applyTableFunction by returning the underlying IcebergAnnTableHandle and column handles.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/tvf/ApproxNearestNeighborsFunction.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/tvf/ANNPageSource.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConnector.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
Add infrastructure to build, persist, and load JVector HNSW vector indexes along with node-to-row-id mappings.
  • Create IcebergVectorIndexBuilder to scan the target Iceberg table column + row_id via normal splits, normalize vectors, build a JVector ImmutableGraphIndex using a ListRandomAccessVectorValues wrapper and chosen similarity function, construct a NodeRowIdMapping, and write both index and mapping to the table’s .vector_index directory using HdfsFileIO with retry and temp-file semantics.
  • Implement NodeRowIdMapping as a compact, versioned, binary-serializable mapping from node IDs to row IDs using bulk ByteBuffer-based IO and expose getRowId/size APIs.
  • Provide CustomVectorFloat and ListRandomAccessVectorValues adapters around float[] vectors to satisfy JVector’s VectorFloat and RandomAccessVectorValues contracts while working around non-public ArrayVectorFloat constructors.
  • Add BuildVectorIndexProcedure (CREATE_VEC_INDEX system procedure) that parses a [catalog.]schema.table.column path, instantiates metadata via IcebergMetadataFactory, and invokes IcebergVectorIndexBuilder.buildAndSaveVectorIndex inside a managed transaction, using default index parameters and COSINE similarity.
  • Update IcebergHiveMetadataFactory and IcebergNativeMetadataFactory construction to pass IcebergConfig into IcebergAbstractMetadata so the metadata layer can participate in similarity search features.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/vectors/IcebergVectorIndexBuilder.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/vectors/ListRandomAccessVectorValues.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/vectors/CustomVectorFloat.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/vectors/NodeRowIdMapping.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/procedure/BuildVectorIndexProcedure.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java
Guard similarity search behavior behind a new IcebergConfig flag and add required dependencies.
  • Extend IcebergConfig with a boolean similaritySearchEnabled property backed by iceberg.similarity-search-enabled and accessor methods.
  • Inject IcebergConfig into IcebergSplitManager to emit a synthetic ANN split when the table handle is an IcebergAnnTableHandle, into IcebergAbstractMetadata to gate applyTableFunction behavior, and into metadata factories and common module wiring to conditionally register the BuildVectorIndexProcedure and ApproxNearestNeighborsFunction TVF.
  • Update InternalIcebergConnectorFactory to construct the connector’s table-function set based on the config and expose them via IcebergConnector.getTableFunctions.
  • Add the JVector and non-test-scoped commons-math3 dependencies to presto-iceberg/pom.xml, and mark commons-math3 as an ignored non-test-scoped dependency for the dependency plugin; move commons-math3 from test to main scope.
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConfig.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadataFactory.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/InternalIcebergConnectorFactory.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java
presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConnector.java
presto-iceberg/pom.xml
presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@bibith4 bibith4 changed the title feat(connector): Add similiarity search capabilties using Jvector lib [Do not Review]feat(connector): Add similiarity search capabilties using Jvector lib Nov 29, 2025
@bibith4 bibith4 force-pushed the jvector-presto-integration-tech-preview branch from be49301 to 47ebd99 Compare November 29, 2025 12:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

from:IBM PR from IBM

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants