From a2165923ef31649908674426861a454f391f3b16 Mon Sep 17 00:00:00 2001 From: wangd Date: Sat, 29 Nov 2025 17:39:50 +0800 Subject: [PATCH 1/3] docs: Add developer documentation for procedures --- presto-docs/src/main/sphinx/develop.rst | 1 + .../src/main/sphinx/develop/procedures.rst | 500 ++++++++++++++++++ 2 files changed, 501 insertions(+) create mode 100644 presto-docs/src/main/sphinx/develop/procedures.rst diff --git a/presto-docs/src/main/sphinx/develop.rst b/presto-docs/src/main/sphinx/develop.rst index 67e033cee8984..8f58863395ad8 100644 --- a/presto-docs/src/main/sphinx/develop.rst +++ b/presto-docs/src/main/sphinx/develop.rst @@ -13,6 +13,7 @@ This guide is intended for Presto contributors and plugin developers. develop/delete-and-update develop/types develop/functions + develop/procedures develop/system-access-control develop/password-authenticator develop/event-listener diff --git a/presto-docs/src/main/sphinx/develop/procedures.rst b/presto-docs/src/main/sphinx/develop/procedures.rst new file mode 100644 index 0000000000000..bac724f3c44b6 --- /dev/null +++ b/presto-docs/src/main/sphinx/develop/procedures.rst @@ -0,0 +1,500 @@ +========== +Procedures +========== + +PrestoDB's procedures allow users to perform data manipulation and management tasks. Unlike traditional databases where procedural objects are +defined by users via SQL, the procedures in PrestoDB are a set of system routines provided by developers through Connectors. The overall type hierarchy +is illustrated in the diagram below: + +.. code-block:: text + + «abstract» BaseProcedure + |-- Procedure // (Normal) Procedure + |-- «abstract» DistributedProcedure + |-- TableDataRewriteDistributedProcedure + |-- ...... // Other future subtypes + +PrestoDB supports two categories of procedures: + +1. **Normal Procedure** + + These procedures are executed directly on the Coordinator node, and PrestoDB does not build distributed execution plans for them. + They are designed mainly for administrative tasks involving table/system metadata and cache management, such as ``sync_partition_metadata`` + and ``invalidate_directory_list_cache`` in Hive Connector, or ``expire_snapshots`` and ``invalidate_statistics_file_cache`` in Iceberg Connector. + +2. **Distributed Procedure** + + Procedures of this type are executed via a distributed execution plan constructed by PrestoDB, which utilizes the entire cluster of Worker nodes + for distributed computation. They are suitable for operations involving table data—such as data optimization, re-partitioning, sorting, + and pre-processing—as well as for administrative tasks that need to be executed across the Worker nodes, for instance, clearing caches on specific workers. + +The type hierarchy for Distributed Procedures is designed to be extensible. Different distributed tasks can have different invocation parameters and +are planned into differently shaped execution plans; as such, they can be implemented as distinct subtypes of ``DistributedProcedure``. For example, +for the kind of table data rewrite tasks, PrestoDB provides the ``TableDataRewriteDistributedProcedure`` subtype. +Connector developers can leverage this subtype to implement specific data-rewrite distributed procedures—such as table data optimization, compression, +repartitioning, or sorting—for their connectors. Within the PrestoDB engine, tasks of this subtype are uniformly planned into an execution plan tree with the +following shape: + +.. code-block:: text + + TableScanNode → FilterNode → CallDistributedProcedureNode → TableFinishNode → OutputNode + +In addition, developers can implement other kinds of distributed procedures by extending the type hierarchy—defining new subtypes that are mapped to +execution plans of varying shapes. + +For further design details, see: + +- https://github.com/prestodb/rfcs/blob/main/RFC-0021-support-distributed-procedure.md + +Normal Procedure +---------------- + +To make a procedure callable, a connector must first expose it to the PrestoDB engine. PrestoDB leverages the Guice dependency injection framework to +manage procedure registration and lifecycle. A specific procedure is implemented and bound as a ``Provider``, thus creating an instance +only when it is actually needed for execution, enabling on-demand instantiation. The following steps will guide you on how to implement and provide +a procedure in a connector. + +1. Procedure Provider Class +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +An implementation class must implement the ``Provider`` interface. Its constructor can use ``@Inject`` to receive any required dependencies +that are managed by Guice. The class must then implement the ``get()`` method from the Provider interface, which is responsible for constructing and +returning a new Procedure instance. + +2. Creation of a Procedure Instance +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +A Procedure object requires the following parameters upon creation: + + * ``String schema`` - The schema namespace to which this procedure belongs (typically ``system`` in PrestoDB) + * ``String name`` - The name of this procedure, for example, ``expire_snapshots`` + * ``List arguments`` - The parameter declarations list for this procedure + * ``MethodHandle methodHandle`` - PrestoDB abstracts procedure execution via ``MethodHandle``. A procedure provider implements the core logic in + a dedicated method and exposes it as a ``MethodHandle`` that is injected into the procedure instance. + +.. note:: + + * The Java method corresponding to the ``MethodHandle`` have a correspondence with the procedure parameters. Its first parameter must be a session object + of type ``ConnectorSession.class``. The subsequent parameters must have a strict one-to-one correspondence in both type and order with the + procedure parameters declared in the arguments list. + * The method implementation for the ``MethodHandle`` must account for classloader isolation. Since PrestoDB employs a plugin isolation mechanism where each + connector has its own ClassLoader, the engine must temporarily switch to the connector's specific ClassLoader when invoking its procedure logic. This context + switch is critical to prevent ``ClassNotFoundException`` or ``NoClassDefFoundError`` issues. + +As an example, the following is the ``expire_snapshots`` procedure implemented in the Iceberg connector: + +.. code-block:: java + + public class ExpireSnapshotsProcedure + implements Provider + { + private static final MethodHandle EXPIRE_SNAPSHOTS = methodHandle( + ExpireSnapshotsProcedure.class, + "expireSnapshots", + ConnectorSession.class, + String.class, + String.class, + SqlTimestamp.class, + Integer.class, + List.class); + private final IcebergMetadataFactory metadataFactory; + + @Inject + public ExpireSnapshotsProcedure(IcebergMetadataFactory metadataFactory) + { + this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null"); + } + + @Override + public Procedure get() + { + return new Procedure( + "system", + "expire_snapshots", + ImmutableList.of( + new Argument("schema", VARCHAR), + new Argument("table_name", VARCHAR), + new Argument("older_than", TIMESTAMP, false, null), + new Argument("retain_last", INTEGER, false, null), + new Argument("snapshot_ids", "array(bigint)", false, null)), + EXPIRE_SNAPSHOTS.bindTo(this)); + } + + public void expireSnapshots(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan, Integer retainLast, List snapshotIds) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + doExpireSnapshots(clientSession, schema, tableName, olderThan, retainLast, snapshotIds); + } + } + + private void doExpireSnapshots(ConnectorSession clientSession, String schema, String tableName, SqlTimestamp olderThan, Integer retainLast, List snapshotIds) + { + // Execute the snapshot expiration for the target table using the Iceberg interface + // ...... + } + } + +3. Exposing the Procedure Provider to PrestoDB +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In the Guice binding module of your target connector, add a binding for the procedure provider class defined above: + +.. code-block:: java + + ...... + Multibinder> procedures = newSetBinder(binder, new TypeLiteral>() {}); + procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON); + ...... + +As a result, during startup, the PrestoDB engine collects the procedure providers exposed by all connectors and maintains them within their +respective namespaces (e.g., ``hive.system`` or ``iceberg.system``). Once startup is complete, users can invoke these procedures by specifying +the corresponding connector namespace, for example: + +.. code-block:: java + + call iceberg.system.expire_snapshots('default', 'test_table'); + call hive.system.invalidate_directory_list_cache(); + ...... + +Distributed Procedure +--------------------- + +PrestoDB supports building distributed execution plans for certain types of procedures, enabling them to leverage the calculation resources of +the entire cluster. Since different kinds of distributed procedures may correspond to distinct execution plan shapes, extending and implementing them +should be approached at two levels: + + * For a category of procedures that share the same execution plan shape, extend a subtype of ``DistributedProcedure``. (The currently supported + ``TableDataRewriteDistributedProcedure`` subtype is designed for table data rewrite operations) + * Implement a concrete distributed procedure in a connector by building upon a specific ``DistributedProcedure`` subtype. For instance, + ``rewrite_data_files`` in the Iceberg connector is built upon the ``TableDataRewriteDistributedProcedure`` subtype. + +.. important:: + The ``DistributedProcedure`` class is abstract. Connector developers cannot implement it directly. + Instead, you must build your concrete distributed procedure upon a specific **subtype** (like ``TableDataRewriteDistributedProcedure``) + that the PrestoDB engine already knows how to analyze and plan. + +Extending a DistributedProcedure Subtype +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +1. Adding a DistributedProcedureType Enum Value +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Add a new value to the ``DistributedProcedure.DistributedProcedureType`` enum, for example: ``TABLE_DATA_REWRITE``. + +This enum value is important, as it is used during both the analysis and planning phases to distinguish between different ``DistributedProcedure`` subtypes +and execute the corresponding branch logic. + +2. Creating a subclass of DistributedProcedure +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Create a new subclass of ``DistributedProcedure``, such as: + +.. code-block:: java + + public class TableDataRewriteDistributedProcedure + extends DistributedProcedure + +* In the constructor, pass the corresponding ``DistributedProcedureType`` enum value (e.g., ``TABLE_DATA_REWRITE``) to the ``super(...)`` method. +* In addition to the base parameters required by ``BaseProcedure`` (schema, name, and arguments, which are consistent with those in ``Procedure``), + a subtype can define and is responsible for processing and validating any additional parameters it requires. + +Additionally, the following three abstract methods defined by the base class ``DistributedProcedure`` should be implemented: + +.. code-block:: java + + /** + * This method creates a connector-specific, or even a distributed procedure subtype-specific, context object. + * In connectors that support distributed procedures, it is invoked at the start of a distributed procedure's execution. + * The generated procedure context is then bound to the current ConnectorMetadata to maintain all contextual information + * involved throughout the execution, which may be utilized when the procedure finishes. + */ + public ConnectorProcedureContext createContext() + + /** + * Performs the preparatory work required when starting the execution of this distributed procedure + * */ + public abstract ConnectorDistributedProcedureHandle begin(ConnectorSession session, + ConnectorProcedureContext procedureContext, + ConnectorTableLayoutHandle tableLayoutHandle, + Object[] arguments); + + /** + * Performs the work required for the final centralized commit, after all distributed execution tasks have completed + * */ + public abstract void finish(ConnectorProcedureContext procedureContext, + ConnectorDistributedProcedureHandle procedureHandle, + Collection fragments); + +.. note:: + + At this architectural level, distributed procedure subtypes are designed to be decoupled from specific connectors. Therefore, when implementing + the three aforementioned abstract methods, it is recommended to focus solely on the common logic of the subtype. Connector-specific functionality + should be abstracted into method interfaces and delegated to the final, concrete distributed procedure implementations. + +As an illustration, the ``TableDataRewriteDistributedProcedure`` subtype, which handles table data rewrite operations, is defined as follows: + +.. code-block:: java + + public class TableDataRewriteDistributedProcedure + extends DistributedProcedure + { + private final BeginCallDistributedProcedure beginCallDistributedProcedure; + private final FinishCallDistributedProcedure finishCallDistributedProcedure; + private Supplier contextSupplier; + + public TableDataRewriteDistributedProcedure(String schema, String name, + List arguments, + BeginCallDistributedProcedure beginCallDistributedProcedure, + FinishCallDistributedProcedure finishCallDistributedProcedure, + Supplier contextSupplier) + { + super(TABLE_DATA_REWRITE, schema, name, arguments); + this.beginCallDistributedProcedure = requireNonNull(beginCallDistributedProcedure, "beginCallDistributedProcedure is null"); + this.finishCallDistributedProcedure = requireNonNull(finishCallDistributedProcedure, "finishCallDistributedProcedure is null"); + this.contextSupplier = requireNonNull(contextSupplier, "contextSupplier is null"); + + // Performs subtype-specific validation and processing logic on the parameters + ...... + } + + @Override + public ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments) + { + return this.beginCallDistributedProcedure.begin(session, procedureContext, tableLayoutHandle, arguments); + } + + @Override + public void finish(ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments) + { + this.finishCallDistributedProcedure.finish(procedureContext, procedureHandle, fragments); + } + + @Override + public ConnectorProcedureContext createContext() + { + return contextSupplier.get(); + } + + @FunctionalInterface + public interface BeginCallDistributedProcedure + { + ConnectorDistributedProcedureHandle begin(ConnectorSession session, ConnectorProcedureContext procedureContext, ConnectorTableLayoutHandle tableLayoutHandle, Object[] arguments); + } + + @FunctionalInterface + public interface FinishCallDistributedProcedure + { + void finish(ConnectorProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments); + } + } + +3. Processing of Subtypes in the Analysis Phase +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +In the ``visitCall(...)`` method of ``StatementAnalyzer``, add a branch to handle the newly defined subtypes, such as ``TABLE_DATA_REWRITE``: + +.. code-block:: java + + @Override + protected Scope visitCall(Call call, Optional scope) + { + QualifiedObjectName procedureName = analysis.getProcedureName() + .orElse(createQualifiedObjectName(session, call, call.getName(), metadata)); + ConnectorId connectorId = metadata.getCatalogHandle(session, procedureName.getCatalogName()) + .orElseThrow(() -> new SemanticException(MISSING_CATALOG, call, "Catalog %s does not exist", procedureName.getCatalogName())); + + if (!metadata.getProcedureRegistry().isDistributedProcedure(connectorId, toSchemaTableName(procedureName))) { + throw new SemanticException(PROCEDURE_NOT_FOUND, "Distributed procedure not registered: " + procedureName); + } + DistributedProcedure procedure = metadata.getProcedureRegistry().resolveDistributed(connectorId, toSchemaTableName(procedureName)); + Object[] values = extractParameterValuesInOrder(call, procedure, metadata, session, analysis.getParameters()); + + analysis.setUpdateInfo(call.getUpdateInfo()); + analysis.setDistributedProcedureType(Optional.of(procedure.getType())); + analysis.setProcedureArguments(Optional.of(values)); + switch (procedure.getType()) { + case TABLE_DATA_REWRITE: + TableDataRewriteDistributedProcedure tableDataRewriteDistributedProcedure = (TableDataRewriteDistributedProcedure) procedure; + + // Performs analysis on the tableDataRewriteDistributedProcedure + ...... + + break; + default: + throw new PrestoException(StandardErrorCode.NOT_SUPPORTED, "Unsupported distributed procedure type: " + procedure.getType()); + } + return createAndAssignScope(call, scope, Field.newUnqualified(Optional.empty(), "rows", BIGINT)); + } + +4. Processing of Subtypes in the Planning Phase +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +In the ``planStatementWithoutOutput(...)`` method of ``LogicalPlanner``, when the statement type is Call, add a branch to handle newly defined subtypes +such as ``TABLE_DATA_REWRITE``: + +.. code-block:: java + + private RelationPlan planStatementWithoutOutput(Analysis analysis, Statement statement) + { + ...... + else if (statement instanceof Call) { + checkState(analysis.getDistributedProcedureType().isPresent(), "Call distributed procedure analysis is missing"); + switch (analysis.getDistributedProcedureType().get()) { + case TABLE_DATA_REWRITE: + return createCallDistributedProcedurePlanForTableDataRewrite(analysis, (Call) statement); + default: + throw new PrestoException(NOT_SUPPORTED, "Unsupported distributed procedure type: " + analysis.getDistributedProcedureType().get()); + } + } + else { + throw new PrestoException(NOT_SUPPORTED, "Unsupported statement type " + statement.getClass().getSimpleName()); + } + } + + private RelationPlan createCallDistributedProcedurePlanForTableDataRewrite(Analysis analysis, Call statement) + { + // Builds the logical plan for the table data rewrite procedure subtype from the analysis results: + // TableScanNode → FilterNode → CallDistributedProcedureNode → TableFinishNode → OutputNode + ...... + } + +.. note:: + + If a custom plan node is required, it must subsequently be handled in the plan visitors, the optimizers, and the local execution planner. If a custom + local execution operator ultimately needs to be generated, it must be implemented within PrestoDB as well. (This part is beyond the scope of this document + and will not be elaborated further) + +Implementing a Concrete Distributed Procedure in a Specific Connector +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Similar to normal procedures, PrestoDB uses the Guice dependency injection framework to manage the registration and lifecycle of distributed procedures, +enabling connectors to dynamically provide these callable distributed procedures to the engine. A concrete distributed procedure is implemented and bound +as a ``Provider``, which ensures an instance is created on-demand when a procedure needs to be executed. The following steps will +guide you through implementing and supplying a distributed procedure in your connector. + +1. Procedure Provider Class +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +An implementation class must implement the ``Provider`` interface. In its constructor, it can use @Inject to receive any Guice-managed +dependencies. The class is required to implement the ``get()`` method from the Provider interface, which is responsible for constructing and returning a +specific subclass instance of ``DistributedProcedure``. + +2. Creation of a DistributedProcedure Instance +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The parameters required to create a ``DistributedProcedure`` subclass differ, but the following common parameters are mandatory and consistent with those +described in the normal ``Procedure`` above. + + * ``String schema`` - The schema namespace to which this procedure belongs (typically ``system`` in PrestoDB) + * ``String name`` - The name of this procedure, for example, ``expire_snapshots`` + * ``List arguments`` - The parameter declarations list for this procedure + +The following code demonstrates how to implement ``rewrite_data_files`` for the Iceberg connector, based on the ``TableDataRewriteDistributedProcedure`` class: + +.. code-block:: java + + public class RewriteDataFilesProcedure + implements Provider + { + TypeManager typeManager; + JsonCodec commitTaskCodec; + + @Inject + public RewriteDataFilesProcedure( + TypeManager typeManager, + JsonCodec commitTaskCodec) + { + this.typeManager = requireNonNull(typeManager, "typeManager is null"); + this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); + } + + @Override + public DistributedProcedure get() + { + return new TableDataRewriteDistributedProcedure( + "system", + "rewrite_data_files", + ImmutableList.of( + new Argument(SCHEMA, VARCHAR), + new Argument(TABLE_NAME, VARCHAR), + new Argument("filter", VARCHAR, false, "TRUE"), + new Argument("options", "map(varchar, varchar)", false, null)), + (session, procedureContext, tableLayoutHandle, arguments) -> beginCallDistributedProcedure(session, (IcebergProcedureContext) procedureContext, (IcebergTableLayoutHandle) tableLayoutHandle, arguments), + ((procedureContext, tableHandle, fragments) -> finishCallDistributedProcedure((IcebergProcedureContext) procedureContext, tableHandle, fragments)), + IcebergProcedureContext::new); + } + + private ConnectorDistributedProcedureHandle beginCallDistributedProcedure(ConnectorSession session, IcebergProcedureContext procedureContext, IcebergTableLayoutHandle layoutHandle, Object[] arguments) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + Table icebergTable = procedureContext.getTable().orElseThrow(() -> new VerifyException("No partition data for partitioned table")); + IcebergTableHandle tableHandle = layoutHandle.getTable(); + + // Performs the preparatory work required when starting the execution of ``rewrite_data_files``, + // and encapsulate the necessary information and handling logic within the ``procedureContext`` + ...... + + return new IcebergDistributedProcedureHandle( + tableHandle.getSchemaName(), + tableHandle.getIcebergTableName(), + toPrestoSchema(icebergTable.schema(), typeManager), + toPrestoPartitionSpec(icebergTable.spec(), typeManager), + getColumns(icebergTable.schema(), icebergTable.spec(), typeManager), + icebergTable.location(), + getFileFormat(icebergTable), + getCompressionCodec(session), + icebergTable.properties()); + } + } + + private void finishCallDistributedProcedure(IcebergProcedureContext procedureContext, ConnectorDistributedProcedureHandle procedureHandle, Collection fragments) + { + if (fragments.isEmpty() && + procedureContext.getScannedDataFiles().isEmpty() && + procedureContext.getFullyAppliedDeleteFiles().isEmpty()) { + return; + } + + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) { + IcebergDistributedProcedureHandle handle = (IcebergDistributedProcedureHandle) procedureHandle; + Table icebergTable = procedureContext.getTransaction().table(); + + // Performs the final atomic commit by leveraging Iceberg's `RewriteFiles` API. + // This integrates the commit information from the distributed tasks (in `commitTasks`) + // with the file change tracking (e.g., `scannedDataFiles`, `fullyAppliedDeleteFiles`, `newFiles`) + // maintained within the `procedureContext`. + List commitTasks = fragments.stream() + .map(slice -> commitTaskCodec.fromJson(slice.getBytes())) + .collect(toImmutableList()); + ...... + + RewriteFiles rewriteFiles = procedureContext.getTransaction().newRewrite(); + Set scannedDataFiles = procedureContext.getScannedDataFiles(); + Set fullyAppliedDeleteFiles = procedureContext.getFullyAppliedDeleteFiles(); + rewriteFiles.rewriteFiles(scannedDataFiles, fullyAppliedDeleteFiles, newFiles, ImmutableSet.of()); + + ...... + rewriteFiles.commit(); + } + } + } + +3. Exposing the DistributedProcedure Provider to PrestoDB +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +In the Guice binding module of your target connector, add a binding for the distributed procedure provider class defined above: + +.. code-block:: java + + Multibinder> procedures = newSetBinder(binder, new TypeLiteral>() {}); + procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON); + ...... + +As a result, during startup, the PrestoDB engine collects the distributed procedure providers the same way as normal procedure providers exposed by +all connectors and maintains them within their respective namespaces (e.g., ``hive.system`` or ``iceberg.system``). Once startup is complete, users +can invoke these distributed procedures by specifying the corresponding connector namespace, for example: + +.. code-block:: sql + + call iceberg.system.rewrite_data_files('default', 'test_table'); + call iceberg.system.rewrite_data_files(table_name => 'test_table', schema => 'default', filter => 'c1 > 3'); \ No newline at end of file From e415c908ee9b2742ce381b7e99f742a50b093958 Mon Sep 17 00:00:00 2001 From: wangd Date: Tue, 2 Dec 2025 11:14:04 +0800 Subject: [PATCH 2/3] Address review comments --- .../src/main/sphinx/develop/procedures.rst | 90 ++++++++++--------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/presto-docs/src/main/sphinx/develop/procedures.rst b/presto-docs/src/main/sphinx/develop/procedures.rst index bac724f3c44b6..cdab0a028dcff 100644 --- a/presto-docs/src/main/sphinx/develop/procedures.rst +++ b/presto-docs/src/main/sphinx/develop/procedures.rst @@ -3,7 +3,7 @@ Procedures ========== PrestoDB's procedures allow users to perform data manipulation and management tasks. Unlike traditional databases where procedural objects are -defined by users via SQL, the procedures in PrestoDB are a set of system routines provided by developers through Connectors. The overall type hierarchy +defined by users by using SQL, the procedures in PrestoDB are a set of system routines provided by developers through Connectors. The overall type hierarchy is illustrated in the diagram below: .. code-block:: text @@ -16,21 +16,22 @@ is illustrated in the diagram below: PrestoDB supports two categories of procedures: -1. **Normal Procedure** +* **Normal Procedure** - These procedures are executed directly on the Coordinator node, and PrestoDB does not build distributed execution plans for them. - They are designed mainly for administrative tasks involving table/system metadata and cache management, such as ``sync_partition_metadata`` - and ``invalidate_directory_list_cache`` in Hive Connector, or ``expire_snapshots`` and ``invalidate_statistics_file_cache`` in Iceberg Connector. +These procedures are executed directly on the Coordinator node, and PrestoDB does not build distributed execution plans for them. +They are designed mainly for administrative tasks involving table or system metadata and cache management, such as ``sync_partition_metadata`` +and ``invalidate_directory_list_cache`` in Hive Connector, or ``expire_snapshots`` and ``invalidate_statistics_file_cache`` in Iceberg Connector. -2. **Distributed Procedure** +* **Distributed Procedure** - Procedures of this type are executed via a distributed execution plan constructed by PrestoDB, which utilizes the entire cluster of Worker nodes - for distributed computation. They are suitable for operations involving table data—such as data optimization, re-partitioning, sorting, - and pre-processing—as well as for administrative tasks that need to be executed across the Worker nodes, for instance, clearing caches on specific workers. +Procedures of this type are executed via a distributed execution plan constructed by PrestoDB, which utilizes the entire cluster of Worker nodes +for distributed computation. They are suitable for operations involving table data—such as data optimization, re-partitioning, sorting, +and pre-processing—as well as for administrative tasks that need to be executed across the Worker nodes, for instance, clearing caches on specific workers. The type hierarchy for Distributed Procedures is designed to be extensible. Different distributed tasks can have different invocation parameters and -are planned into differently shaped execution plans; as such, they can be implemented as distinct subtypes of ``DistributedProcedure``. For example, -for the kind of table data rewrite tasks, PrestoDB provides the ``TableDataRewriteDistributedProcedure`` subtype. +are planned into differently shaped execution plans; as such, they can be implemented as distinct subtypes of ``DistributedProcedure``. + +For example, for table data rewrite tasks, PrestoDB provides the ``TableDataRewriteDistributedProcedure`` subtype. Connector developers can leverage this subtype to implement specific data-rewrite distributed procedures—such as table data optimization, compression, repartitioning, or sorting—for their connectors. Within the PrestoDB engine, tasks of this subtype are uniformly planned into an execution plan tree with the following shape: @@ -42,17 +43,15 @@ following shape: In addition, developers can implement other kinds of distributed procedures by extending the type hierarchy—defining new subtypes that are mapped to execution plans of varying shapes. -For further design details, see: - -- https://github.com/prestodb/rfcs/blob/main/RFC-0021-support-distributed-procedure.md +For further design details, see `RFC-0021 for Presto `_. Normal Procedure ---------------- -To make a procedure callable, a connector must first expose it to the PrestoDB engine. PrestoDB leverages the Guice dependency injection framework to -manage procedure registration and lifecycle. A specific procedure is implemented and bound as a ``Provider``, thus creating an instance -only when it is actually needed for execution, enabling on-demand instantiation. The following steps will guide you on how to implement and provide -a procedure in a connector. +To make a procedure callable, a connector must first expose it to the PrestoDB engine. PrestoDB leverages the `Guice dependency injection framework `_ +to manage procedure registration and lifecycle. A specific procedure is implemented and bound as a ``Provider``, +thus creating an instance only when it is actually needed for execution, enabling on-demand instantiation. The following steps will guide you on how to +implement and provide a procedure in a connector. 1. Procedure Provider Class ^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -66,20 +65,23 @@ returning a new Procedure instance. A Procedure object requires the following parameters upon creation: - * ``String schema`` - The schema namespace to which this procedure belongs (typically ``system`` in PrestoDB) - * ``String name`` - The name of this procedure, for example, ``expire_snapshots`` - * ``List arguments`` - The parameter declarations list for this procedure - * ``MethodHandle methodHandle`` - PrestoDB abstracts procedure execution via ``MethodHandle``. A procedure provider implements the core logic in - a dedicated method and exposes it as a ``MethodHandle`` that is injected into the procedure instance. +* ``String schema`` - The schema namespace to which this procedure belongs (typically ``system`` in PrestoDB). +* ``String name`` - The name of this procedure, for example, ``expire_snapshots``. +* ``List arguments`` - The parameter declarations list for this procedure. +* ``MethodHandle methodHandle`` - PrestoDB abstracts procedure execution via ``MethodHandle``. A procedure provider implements the core logic in + a dedicated method and exposes it as a ``MethodHandle`` that is injected into the procedure instance. .. note:: - * The Java method corresponding to the ``MethodHandle`` have a correspondence with the procedure parameters. Its first parameter must be a session object - of type ``ConnectorSession.class``. The subsequent parameters must have a strict one-to-one correspondence in both type and order with the - procedure parameters declared in the arguments list. - * The method implementation for the ``MethodHandle`` must account for classloader isolation. Since PrestoDB employs a plugin isolation mechanism where each - connector has its own ClassLoader, the engine must temporarily switch to the connector's specific ClassLoader when invoking its procedure logic. This context - switch is critical to prevent ``ClassNotFoundException`` or ``NoClassDefFoundError`` issues. + * The Java method corresponding to the ``MethodHandle`` have a correspondence with the procedure parameters. + + * Its first parameter must be a session object of type ``ConnectorSession.class``. + * The subsequent parameters must have a strict one-to-one correspondence in both type and order with the + procedure parameters declared in the arguments list. + + * The method implementation for the ``MethodHandle`` must account for classloader isolation. Since PrestoDB employs a plugin isolation mechanism where each + connector has its own ClassLoader, the engine must temporarily switch to the connector's specific ClassLoader when invoking its procedure logic. This context + switch is critical to prevent ``ClassNotFoundException`` or ``NoClassDefFoundError`` issues. As an example, the following is the ``expire_snapshots`` procedure implemented in the Iceberg connector: @@ -146,8 +148,8 @@ In the Guice binding module of your target connector, add a binding for the proc procedures.addBinding().toProvider(ExpireSnapshotsProcedure.class).in(Scopes.SINGLETON); ...... -As a result, during startup, the PrestoDB engine collects the procedure providers exposed by all connectors and maintains them within their -respective namespaces (e.g., ``hive.system`` or ``iceberg.system``). Once startup is complete, users can invoke these procedures by specifying +During startup, the PrestoDB engine collects the procedure providers exposed by all connectors and maintains them within their +respective namespaces (for example, ``hive.system`` or ``iceberg.system``). Once startup is complete, users can invoke these procedures by specifying the corresponding connector namespace, for example: .. code-block:: java @@ -163,14 +165,14 @@ PrestoDB supports building distributed execution plans for certain types of proc the entire cluster. Since different kinds of distributed procedures may correspond to distinct execution plan shapes, extending and implementing them should be approached at two levels: - * For a category of procedures that share the same execution plan shape, extend a subtype of ``DistributedProcedure``. (The currently supported - ``TableDataRewriteDistributedProcedure`` subtype is designed for table data rewrite operations) - * Implement a concrete distributed procedure in a connector by building upon a specific ``DistributedProcedure`` subtype. For instance, - ``rewrite_data_files`` in the Iceberg connector is built upon the ``TableDataRewriteDistributedProcedure`` subtype. +* For a category of procedures that share the same execution plan shape, extend a subtype of ``DistributedProcedure``. The currently supported + ``TableDataRewriteDistributedProcedure`` subtype is designed for table data rewrite operations. +* Implement a concrete distributed procedure in a connector by building upon a specific ``DistributedProcedure`` subtype. For instance, + ``rewrite_data_files`` in the Iceberg connector is built upon the ``TableDataRewriteDistributedProcedure`` subtype. .. important:: The ``DistributedProcedure`` class is abstract. Connector developers cannot implement it directly. - Instead, you must build your concrete distributed procedure upon a specific **subtype** (like ``TableDataRewriteDistributedProcedure``) + You must build your concrete distributed procedure upon a specific **subtype** (like ``TableDataRewriteDistributedProcedure``) that the PrestoDB engine already knows how to analyze and plan. Extending a DistributedProcedure Subtype @@ -194,7 +196,7 @@ Create a new subclass of ``DistributedProcedure``, such as: public class TableDataRewriteDistributedProcedure extends DistributedProcedure -* In the constructor, pass the corresponding ``DistributedProcedureType`` enum value (e.g., ``TABLE_DATA_REWRITE``) to the ``super(...)`` method. +* In the constructor, pass the corresponding ``DistributedProcedureType`` enum value such as ``TABLE_DATA_REWRITE`` to the ``super(...)`` method. * In addition to the base parameters required by ``BaseProcedure`` (schema, name, and arguments, which are consistent with those in ``Procedure``), a subtype can define and is responsible for processing and validating any additional parameters it requires. @@ -227,7 +229,7 @@ Additionally, the following three abstract methods defined by the base class ``D .. note:: - At this architectural level, distributed procedure subtypes are designed to be decoupled from specific connectors. Therefore, when implementing + At this architectural level, distributed procedure subtypes are designed to be decoupled from specific connectors. When implementing the three aforementioned abstract methods, it is recommended to focus solely on the common logic of the subtype. Connector-specific functionality should be abstracted into method interfaces and delegated to the final, concrete distributed procedure implementations. @@ -385,9 +387,9 @@ specific subclass instance of ``DistributedProcedure``. The parameters required to create a ``DistributedProcedure`` subclass differ, but the following common parameters are mandatory and consistent with those described in the normal ``Procedure`` above. - * ``String schema`` - The schema namespace to which this procedure belongs (typically ``system`` in PrestoDB) - * ``String name`` - The name of this procedure, for example, ``expire_snapshots`` - * ``List arguments`` - The parameter declarations list for this procedure +* ``String schema`` - The schema namespace to which this procedure belongs (typically ``system`` in PrestoDB) +* ``String name`` - The name of this procedure, for example, ``expire_snapshots`` +* ``List arguments`` - The parameter declarations list for this procedure The following code demonstrates how to implement ``rewrite_data_files`` for the Iceberg connector, based on the ``TableDataRewriteDistributedProcedure`` class: @@ -431,7 +433,7 @@ The following code demonstrates how to implement ``rewrite_data_files`` for the IcebergTableHandle tableHandle = layoutHandle.getTable(); // Performs the preparatory work required when starting the execution of ``rewrite_data_files``, - // and encapsulate the necessary information and handling logic within the ``procedureContext`` + // and encapsulates the necessary information and handling logic within the ``procedureContext`` ...... return new IcebergDistributedProcedureHandle( @@ -490,8 +492,8 @@ In the Guice binding module of your target connector, add a binding for the dist procedures.addBinding().toProvider(RewriteDataFilesProcedure.class).in(Scopes.SINGLETON); ...... -As a result, during startup, the PrestoDB engine collects the distributed procedure providers the same way as normal procedure providers exposed by -all connectors and maintains them within their respective namespaces (e.g., ``hive.system`` or ``iceberg.system``). Once startup is complete, users +During startup, the PrestoDB engine collects the distributed procedure providers the same way as normal procedure providers exposed by +all connectors and maintains them within their respective namespaces (for example, ``hive.system`` or ``iceberg.system``). Once startup is complete, users can invoke these distributed procedures by specifying the corresponding connector namespace, for example: .. code-block:: sql From 0f74c37f1dc3297b21cb91c4266511bb6cb84ee1 Mon Sep 17 00:00:00 2001 From: wangd Date: Thu, 4 Dec 2025 01:16:03 +0800 Subject: [PATCH 3/3] Address review comments --- presto-docs/src/main/sphinx/develop/procedures.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/presto-docs/src/main/sphinx/develop/procedures.rst b/presto-docs/src/main/sphinx/develop/procedures.rst index cdab0a028dcff..47ba0782ef9e9 100644 --- a/presto-docs/src/main/sphinx/develop/procedures.rst +++ b/presto-docs/src/main/sphinx/develop/procedures.rst @@ -24,7 +24,7 @@ and ``invalidate_directory_list_cache`` in Hive Connector, or ``expire_snapshots * **Distributed Procedure** -Procedures of this type are executed via a distributed execution plan constructed by PrestoDB, which utilizes the entire cluster of Worker nodes +Procedures of this type are executed with a distributed execution plan constructed by PrestoDB, which utilizes the entire cluster of Worker nodes for distributed computation. They are suitable for operations involving table data—such as data optimization, re-partitioning, sorting, and pre-processing—as well as for administrative tasks that need to be executed across the Worker nodes, for instance, clearing caches on specific workers. @@ -68,7 +68,7 @@ A Procedure object requires the following parameters upon creation: * ``String schema`` - The schema namespace to which this procedure belongs (typically ``system`` in PrestoDB). * ``String name`` - The name of this procedure, for example, ``expire_snapshots``. * ``List arguments`` - The parameter declarations list for this procedure. -* ``MethodHandle methodHandle`` - PrestoDB abstracts procedure execution via ``MethodHandle``. A procedure provider implements the core logic in +* ``MethodHandle methodHandle`` - PrestoDB abstracts procedure execution through ``MethodHandle``. A procedure provider implements the core logic in a dedicated method and exposes it as a ``MethodHandle`` that is injected into the procedure instance. .. note:: @@ -463,7 +463,7 @@ The following code demonstrates how to implement ``rewrite_data_files`` for the // Performs the final atomic commit by leveraging Iceberg's `RewriteFiles` API. // This integrates the commit information from the distributed tasks (in `commitTasks`) - // with the file change tracking (e.g., `scannedDataFiles`, `fullyAppliedDeleteFiles`, `newFiles`) + // with the file change tracking (for example, `scannedDataFiles`, `fullyAppliedDeleteFiles`, `newFiles`) // maintained within the `procedureContext`. List commitTasks = fragments.stream() .map(slice -> commitTaskCodec.fromJson(slice.getBytes()))