Conversation
Coverage reportClick to see where and how coverage changed
This report was generated by python-coverage-comment-action |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
This PR pivots the “database/table discovery” layer from Hive Metastore–centric Delta Lake metadata to Polaris-backed Iceberg catalogs, updating Spark session configuration and API/tests/docs to use Iceberg’s catalog.namespace.table naming.
Changes:
- Reworks
data_store+ standalone subprocess operations to list Iceberg catalogs/namespaces/tables/schemas via Spark SQL (and returns databases ascatalog.namespace). - Adds Polaris catalog settings + Spark session config generation for Polaris REST catalogs, including Spark Connect immutability filtering.
- Updates FastAPI models/routes/tests/docs to remove HMS/namespace-filter parameters and adopt
catalog.namespaceinputs.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/service/test_standalone_operations.py | Updates standalone subprocess tests to match removed HMS parameters. |
| tests/routes/test_delta.py | Updates route tests to use Iceberg-style database identifiers and removes token-extraction tests. |
| tests/delta_lake/test_setup_spark_session.py | Adds coverage for _get_catalog_conf and updated SQL extensions. |
| tests/delta_lake/test_delta_service.py | Updates expected SQL quoting to support catalog.namespace databases. |
| tests/delta_lake/test_data_store.py | Replaces HMS/governance filtering tests with Iceberg catalog/namespace listing tests. |
| src/settings.py | Introduces Polaris configuration settings (URI/credential/personal+tenant catalogs). |
| src/service/standalone_operations.py | Removes HMS-related subprocess parameters; routes listing calls through Iceberg data_store. |
| src/service/models.py | Removes request fields for HMS/filtering; updates descriptions for Iceberg namespaces. |
| src/routes/delta.py | Simplifies endpoints to Iceberg namespace semantics; removes auth-token extraction path. |
| src/delta_lake/setup_spark_session.py | Adds Polaris catalog conf generation and catalog immutability filtering for Spark Connect. |
| src/delta_lake/delta_service.py | Allows dotted database identifiers; updates quoting and existence checks to require SparkSession. |
| src/delta_lake/data_store.py | Major rewrite: catalog discovery via SET, namespace/table/schema listing via Spark SQL. |
| scripts/init-polaris-db.sh | Adds init script to create a polaris database in Postgres. |
| docs/guide/user_guide.md | Updates guide language/examples toward Iceberg + Polaris usage. |
Comments suppressed due to low confidence (1)
src/delta_lake/delta_service.py:667
- _validate_identifier() now allows dots for identifier_type=="database", but the raised error message still claims identifiers may contain only alphanumerics/underscores. For database validation failures this is misleading (dots are allowed between components). Consider tailoring the message when identifier_type == "database" so it documents the catalog.namespace format explicitly.
pattern = (
VALID_DATABASE_PATTERN
if identifier_type == "database"
else VALID_IDENTIFIER_PATTERN
)
if not name or not pattern.match(name):
raise SparkQueryError(
f"Invalid {identifier_type}: '{name}'. "
"Identifiers must start with a letter or underscore and contain "
"only alphanumeric characters and underscores."
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| try: | ||
| rows = spark.sql(f"SHOW TABLES IN {database}").collect() | ||
| tables = sorted(row["tableName"] for row in rows) | ||
| except Exception: | ||
| tables = [] |
src/service/models.py
Outdated
| description="Whether to filter databases by user/tenant namespace prefixes" | ||
| ), | ||
| ] = True | ||
| pass |
| #### Get Sample Data | ||
| ```bash | ||
| curl -X 'POST' \ | ||
| 'http://localhost:8000/apis/mcp/delta/tables/sample' \ | ||
| 'http://localhost:8000/apis/mcp/delta/databases/tables/sample' \ | ||
| -H 'accept: application/json' \ | ||
| -H 'Content-Type: application/json' \ | ||
| -H 'Authorization: Bearer your-kbase-token-here' \ | ||
| -d '{"database": "default", "table": "products", "limit": 10}' | ||
| -d '{"database": "my.test", "table": "products", "limit": 10}' | ||
| ``` | ||
|
|
||
| #### Count Table Rows | ||
| ```bash | ||
| curl -X 'POST' \ | ||
| 'http://localhost:8000/apis/mcp/delta/tables/count' \ | ||
| 'http://localhost:8000/apis/mcp/delta/databases/tables/count' \ | ||
| -H 'accept: application/json' \ | ||
| -H 'Content-Type: application/json' \ | ||
| -H 'Authorization: Bearer your-kbase-token-here' \ | ||
| -d '{"database": "default", "table": "products"}' | ||
| -d '{"database": "my.test", "table": "products"}' | ||
| ``` | ||
|
|
||
| #### Query Table | ||
| ```bash | ||
| curl -X 'POST' \ | ||
| 'http://localhost:8000/apis/mcp/delta/tables/query' \ | ||
| 'http://localhost:8000/apis/mcp/delta/databases/tables/query' \ | ||
| -H 'accept: application/json' \ | ||
| -H 'Content-Type: application/json' \ | ||
| -H 'Authorization: Bearer your-kbase-token-here' \ | ||
| -d '{"query": "SELECT * FROM test.products WHERE price > 100 LIMIT 5"}' | ||
| -d '{"query": "SELECT * FROM my.test.products WHERE price > 100 LIMIT 5"}' |
Resolve conflicts between Trino engine additions (main) and Polaris catalog simplifications (polaris). Keep Trino engine dispatch paths with Polaris's simplified data_store API. Remove namespace filtering from trino_data_store since Polaris removed those functions.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 6 comments.
Comments suppressed due to low confidence (1)
src/delta_lake/delta_service.py:667
- The error message in
_validate_identifier()still says identifiers may contain only alphanumerics/underscores, but database identifiers now explicitly allow dots (catalog.namespace). Consider tailoring the message whenidentifier_type == "database"so it doesn’t incorrectly suggest dots are invalid.
pattern = (
VALID_DATABASE_PATTERN
if identifier_type == "database"
else VALID_IDENTIFIER_PATTERN
)
if not name or not pattern.match(name):
raise SparkQueryError(
f"Invalid {identifier_type}: '{name}'. "
"Identifiers must start with a letter or underscore and contain "
"only alphanumeric characters and underscores."
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| _validate_identifier(join.database, "join database") | ||
| _validate_identifier(join.table, "join table") | ||
| _validate_identifier(join.on_left_column, "join left column") | ||
| _validate_identifier(join.on_right_column, "join right column") | ||
|
|
||
| join_table = f"`{join.database}`.`{join.table}`" | ||
| join_table = f"{join.database}.`{join.table}`" | ||
| join_type = join.join_type |
There was a problem hiding this comment.
_validate_identifier() now allows dots for database identifiers, but _build_join_clause() validates join.database using identifier_type="join database". That still uses VALID_IDENTIFIER_PATTERN (no dots), so joins against Iceberg catalog.namespace databases (e.g., my.demo) will be rejected. Validate join.database with identifier_type="database" (or otherwise allow dotted database identifiers here) to match the new naming scheme.
| summary="List all Iceberg namespaces", | ||
| description="Lists all accessible Iceberg namespaces across all catalogs. Returns namespaces in catalog.namespace format.", | ||
| operation_id="list_databases", | ||
| ) | ||
| def list_databases( | ||
| body: DatabaseListRequest, | ||
| http_request: Request, | ||
| ctx: Annotated[SparkContext, Depends(get_spark_context)], | ||
| auth=Depends(auth), | ||
| ) -> DatabaseListResponse: | ||
| auth_token = None | ||
| if body.filter_by_namespace: | ||
| auth_token = _extract_token_from_request(http_request) | ||
| if not auth_token: | ||
| raise ValueError("Authorization token required for namespace filtering") | ||
|
|
||
| engine = resolve_engine() | ||
|
|
||
| if engine == QueryEngine.TRINO: | ||
| with _make_trino_ctx(http_request) as trino_ctx: | ||
| databases = trino_data_store.get_databases_trino( | ||
| conn=trino_ctx.connection, | ||
| use_hms=body.use_hms, | ||
| filter_by_namespace=body.filter_by_namespace, | ||
| auth_token=auth_token, | ||
| ) | ||
| elif ctx.is_standalone_subprocess: |
There was a problem hiding this comment.
For QueryEngine.TRINO, list_databases() calls get_databases_trino() with default use_hms=True, which returns HMS database names (not catalog.namespace Iceberg namespaces). This makes the endpoint’s contract inconsistent across engines and will likely break clients expecting Iceberg namespaces. Either force the Spark/Iceberg implementation for metadata endpoints when TRINO is selected, or update the Trino metadata implementation to return the same catalog.namespace format.
| def list_databases( | ||
| body: DatabaseListRequest, | ||
| http_request: Request, | ||
| ctx: Annotated[SparkContext, Depends(get_spark_context)], | ||
| auth=Depends(auth), | ||
| ) -> DatabaseListResponse: | ||
| auth_token = None | ||
| if body.filter_by_namespace: | ||
| auth_token = _extract_token_from_request(http_request) | ||
| if not auth_token: | ||
| raise ValueError("Authorization token required for namespace filtering") | ||
|
|
||
| engine = resolve_engine() |
There was a problem hiding this comment.
DatabaseListRequest is now an empty model, but list_databases() still requires it as a body parameter. FastAPI will treat the request body as required (clients must send {}), which is an avoidable breaking change for a no-arg endpoint. Consider removing the body parameter entirely or providing a default (e.g., Body(default_factory=DatabaseListRequest)) so the request body is optional.
| #### Get Sample Data | ||
| ```bash | ||
| curl -X 'POST' \ | ||
| 'http://localhost:8000/apis/mcp/delta/tables/sample' \ | ||
| 'http://localhost:8000/apis/mcp/delta/databases/tables/sample' \ | ||
| -H 'accept: application/json' \ | ||
| -H 'Content-Type: application/json' \ | ||
| -H 'Authorization: Bearer your-kbase-token-here' \ | ||
| -d '{"database": "default", "table": "products", "limit": 10}' | ||
| -d '{"database": "my.test", "table": "products", "limit": 10}' | ||
| ``` | ||
|
|
||
| #### Count Table Rows | ||
| ```bash | ||
| curl -X 'POST' \ | ||
| 'http://localhost:8000/apis/mcp/delta/tables/count' \ | ||
| 'http://localhost:8000/apis/mcp/delta/databases/tables/count' \ | ||
| -H 'accept: application/json' \ | ||
| -H 'Content-Type: application/json' \ | ||
| -H 'Authorization: Bearer your-kbase-token-here' \ | ||
| -d '{"database": "default", "table": "products"}' | ||
| -d '{"database": "my.test", "table": "products"}' | ||
| ``` | ||
|
|
||
| #### Query Table | ||
| ```bash | ||
| curl -X 'POST' \ | ||
| 'http://localhost:8000/apis/mcp/delta/tables/query' \ | ||
| 'http://localhost:8000/apis/mcp/delta/databases/tables/query' \ | ||
| -H 'accept: application/json' \ | ||
| -H 'Content-Type: application/json' \ | ||
| -H 'Authorization: Bearer your-kbase-token-here' \ | ||
| -d '{"query": "SELECT * FROM test.products WHERE price > 100 LIMIT 5"}' | ||
| -d '{"query": "SELECT * FROM my.test.products WHERE price > 100 LIMIT 5"}' | ||
| ``` |
There was a problem hiding this comment.
The docs were updated to call /apis/mcp/delta/databases/tables/sample|count|query, but the actual routes in src/routes/delta.py are still /delta/tables/sample, /delta/tables/count, and /delta/tables/query. Update the guide to match the real endpoint paths (or rename the routes if the docs are correct).
| @@ -49,22 +39,6 @@ def get_databases_trino( | |||
| except Exception as e: | |||
| raise TrinoOperationError(f"Failed to list databases via Trino: {e}") from e | |||
|
|
|||
| if filter_by_namespace: | |||
| try: | |||
| prefixes = _get_user_namespace_prefixes(auth_token) | |||
| owned = ( | |||
| [db for db in databases if db.startswith(tuple(prefixes))] | |||
| if prefixes | |||
| else [] | |||
| ) | |||
| accessible_paths = _get_accessible_paths(auth_token) | |||
| shared = _extract_databases_from_paths(accessible_paths) | |||
| all_accessible = set(owned) | set(shared) | |||
| databases = sorted(db for db in databases if db in all_accessible) | |||
| except Exception: | |||
| logger.error("Error filtering databases by namespace", exc_info=True) | |||
| raise | |||
|
|
|||
| return databases | |||
There was a problem hiding this comment.
get_databases_trino() still defaults to use_hms=True and returns Hive Metastore database names, but the rest of the PR shifts the API contract to Iceberg catalog.namespace identifiers. Unless TRINO mode is intentionally HMS-only, consider updating the Trino metadata layer (and/or defaults) so TRINO returns the same identifier format as Spark mode.
| def _check_exists(database: str, table: str, spark: SparkSession) -> bool: | ||
| """ | ||
| Check if a table exists in a database. | ||
| """ | ||
| if not database_exists(database): | ||
| if not database_exists(database, spark=spark): | ||
| raise DeltaDatabaseNotFoundError(f"Database [{database}] not found") | ||
| if not table_exists(database, table): | ||
| if not table_exists(database, table, spark=spark): | ||
| raise DeltaTableNotFoundError( |
There was a problem hiding this comment.
Changing _check_exists() to require a spark parameter breaks existing Trino code paths: src/trino_engine/trino_service.py imports _check_exists from this module and still calls it with only (database, table) (e.g., around trino_service.py:326/380/430). Either update the Trino service to use a Trino-specific existence check, or keep a backward-compatible wrapper here so TRINO engine requests don’t fail with a TypeError.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
src/delta_lake/delta_service.py:667
- _validate_identifier’s error message says identifiers may contain only alphanumerics/underscores, but database identifiers now also allow dot separators (catalog.namespace). Consider tailoring the message when identifier_type=="database" so users understand that dots are allowed between valid components.
pattern = (
VALID_DATABASE_PATTERN
if identifier_type == "database"
else VALID_IDENTIFIER_PATTERN
)
if not name or not pattern.match(name):
raise SparkQueryError(
f"Invalid {identifier_type}: '{name}'. "
"Identifiers must start with a letter or underscore and contain "
"only alphanumeric characters and underscores."
)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def _get_catalog_conf(settings: BERDLSettings) -> dict[str, str]: | ||
| """Get Iceberg catalog configuration for Polaris REST catalog.""" | ||
| if not settings.POLARIS_CATALOG_URI: | ||
| return {} | ||
|
|
||
| polaris_uri = str(settings.POLARIS_CATALOG_URI).rstrip("/") | ||
|
|
||
| # S3/MinIO properties for Iceberg's S3FileIO (used by executors to read/write data files). | ||
| # Iceberg does NOT use Spark's spark.hadoop.fs.s3a.* — it has its own AWS SDK S3 client. | ||
| s3_endpoint = settings.MINIO_ENDPOINT_URL | ||
| if not s3_endpoint.startswith("http"): | ||
| s3_endpoint = f"http://{s3_endpoint}" | ||
| s3_props = { | ||
| "s3.endpoint": s3_endpoint, | ||
| "s3.access-key-id": settings.MINIO_ACCESS_KEY, | ||
| "s3.secret-access-key": settings.MINIO_SECRET_KEY, | ||
| "s3.path-style-access": "true", | ||
| "s3.region": "us-east-1", | ||
| } | ||
|
|
||
| def _catalog_props(prefix: str, warehouse: str) -> dict[str, str]: | ||
| props = { | ||
| f"{prefix}": "org.apache.iceberg.spark.SparkCatalog", | ||
| f"{prefix}.type": "rest", | ||
| f"{prefix}.uri": polaris_uri, | ||
| f"{prefix}.credential": settings.POLARIS_CREDENTIAL or "", | ||
| f"{prefix}.warehouse": warehouse, | ||
| f"{prefix}.scope": "PRINCIPAL_ROLE:ALL", | ||
| f"{prefix}.token-refresh-enabled": "false", | ||
| f"{prefix}.client.region": "us-east-1", |
There was a problem hiding this comment.
_get_catalog_conf sets spark.sql.catalog.<name>.credential to an empty string when POLARIS_CATALOG_URI is set but POLARIS_CREDENTIAL is missing. This is likely an invalid configuration and will fail later in Spark with a less actionable error. Consider validating required Polaris settings up front (e.g., raise ValueError or skip catalog config unless credential + at least one warehouse name is provided).
| if settings.POLARIS_TENANT_CATALOGS: | ||
| for tenant_catalog in settings.POLARIS_TENANT_CATALOGS.split(","): | ||
| tenant_catalog = tenant_catalog.strip() | ||
| alias = tenant_catalog.replace("tenant_", "") |
There was a problem hiding this comment.
Tenant catalog alias derivation uses tenant_catalog.replace("tenant_", ""), which will remove occurrences in the middle of a name as well (not just a prefix). Use removeprefix("tenant_") (or an explicit startswith check) to avoid unexpected aliasing.
| alias = tenant_catalog.replace("tenant_", "") | |
| alias = tenant_catalog | |
| if alias.startswith("tenant_"): | |
| alias = alias[len("tenant_") :] |
| def _list_iceberg_catalogs(spark: SparkSession) -> List[str]: | ||
| """List all Iceberg catalogs (excluding spark_catalog). | ||
|
|
||
| In Spark 4.0 with Spark Connect, ``SHOW CATALOGS`` only returns catalogs | ||
| registered in the client session's CatalogManager. Catalogs configured | ||
| server-side (via ``spark-defaults.conf``) are accessible for queries but | ||
| invisible to ``SHOW CATALOGS``. | ||
|
|
||
| This function discovers catalogs by inspecting the Spark SQL configuration | ||
| via the ``SET`` command, which returns all server-side configs through the | ||
| Spark Connect gRPC channel. It looks for top-level | ||
| ``spark.sql.catalog.<name>`` keys to identify registered catalogs. | ||
| """ | ||
| rows = spark.sql("SET").collect() | ||
| catalog_names = set() | ||
| for row in rows: | ||
| match = _CATALOG_KEY_PATTERN.match(row["key"]) | ||
| if match: | ||
| catalog_names.add(match.group(1)) | ||
| logger.info( | ||
| f"Discovered {len(catalog_names)} catalog(s) from Spark config: " | ||
| f"{sorted(catalog_names)}" | ||
| ) | ||
| return sorted(c for c in catalog_names if c not in _EXCLUDED_CATALOGS) |
There was a problem hiding this comment.
_list_iceberg_catalogs runs spark.sql("SET").collect() on every call to get_databases, which can be large and may become a bottleneck under frequent metadata requests. Consider caching discovered catalog names for the lifetime of the SparkSession (or until configs change) to avoid repeated full SET scans.
| engine = resolve_engine() | ||
|
|
||
| if engine == QueryEngine.TRINO: | ||
| with _make_trino_ctx(http_request) as trino_ctx: | ||
| databases = trino_data_store.get_databases_trino( | ||
| conn=trino_ctx.connection, | ||
| use_hms=body.use_hms, | ||
| filter_by_namespace=body.filter_by_namespace, | ||
| auth_token=auth_token, | ||
| ) | ||
| elif ctx.is_standalone_subprocess: | ||
| databases = run_in_spark_process( | ||
| list_databases_subprocess, | ||
| ctx.settings_dict, | ||
| use_hms=body.use_hms, | ||
| filter_by_namespace=body.filter_by_namespace, | ||
| auth_token=auth_token, | ||
| app_name=ctx.app_name, | ||
| operation_name="list_databases", | ||
| ) | ||
| else: | ||
| databases = cast( | ||
| list[str], | ||
| data_store.get_databases( | ||
| spark=ctx.spark, | ||
| use_hms=body.use_hms, | ||
| return_json=False, | ||
| filter_by_namespace=body.filter_by_namespace, | ||
| auth_token=auth_token, | ||
| ), | ||
| ) | ||
|
|
||
| return DatabaseListResponse(databases=databases) | ||
|
|
||
|
|
||
| @router.post( | ||
| "/databases/tables/list", | ||
| response_model=TableListResponse, | ||
| status_code=status.HTTP_200_OK, | ||
| summary="List tables in a database", | ||
| description="Lists all tables in a specific database, optionally using PostgreSQL for faster retrieval.", | ||
| summary="List tables in an Iceberg namespace", | ||
| description="Lists all tables in a specific Iceberg namespace (catalog.namespace format).", | ||
| operation_id="list_database_tables", | ||
| ) | ||
| def list_database_tables( | ||
| request: TableListRequest, | ||
| http_request: Request, | ||
| ctx: Annotated[SparkContext, Depends(get_spark_context)], | ||
| auth=Depends(auth), | ||
| ) -> TableListResponse: | ||
| engine = resolve_engine() | ||
|
|
||
| if engine == QueryEngine.TRINO: | ||
| with _make_trino_ctx(http_request) as trino_ctx: | ||
| tables = trino_data_store.get_tables_trino( | ||
| conn=trino_ctx.connection, | ||
| database=request.database, | ||
| use_hms=request.use_hms, | ||
| ) | ||
| elif ctx.is_standalone_subprocess: |
There was a problem hiding this comment.
The metadata routes now document/expect Iceberg namespaces in catalog.namespace format, but the engine == QueryEngine.TRINO branch still calls trino_data_store functions that default to HMS (use_hms=True) and operate on single-level HMS database names. With QUERY_ENGINE=trino, /databases/list and /databases/tables/list will return incompatible identifiers (and my.demo inputs likely won’t work). Either disable TRINO for these endpoints or update the Trino path to list Iceberg namespaces/tables in the same format.
Resolve conflicts by keeping Iceberg-based architecture (no use_hms) while porting filter_by_namespace and auth_token parameters from main for API compatibility. All 877 tests pass.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| auth_token = None | ||
| if body.filter_by_namespace: | ||
| auth_token = get_token_from_request(http_request) | ||
| if not auth_token: | ||
| raise MissingTokenError( | ||
| "Authorization token required for namespace filtering" | ||
| ) | ||
|
|
||
| engine = resolve_engine() | ||
|
|
||
| if engine == QueryEngine.TRINO: | ||
| with _make_trino_ctx(http_request) as trino_ctx: | ||
| databases = trino_data_store.get_databases_trino( | ||
| conn=trino_ctx.connection, | ||
| use_hms=body.use_hms, | ||
| filter_by_namespace=body.filter_by_namespace, | ||
| auth_token=auth_token, | ||
| ) | ||
| elif ctx.is_standalone_subprocess: |
There was a problem hiding this comment.
filter_by_namespace currently triggers an Authorization-token requirement (default True in the request model), but the flag is not actually passed to or honored by any backend in this handler (Spark path calls data_store.get_databases(...) which has no filtering, and Trino path calls get_databases_trino(...) without filter/token). This makes the API behavior misleading and can unnecessarily 401 callers. Either implement filtering end-to-end (including passing auth_token) or remove/disable filter_by_namespace + token check for this endpoint.
| auth_token = None | ||
| if request.filter_by_namespace: | ||
| auth_token = get_token_from_request(http_request) | ||
| if not auth_token: | ||
| raise MissingTokenError( | ||
| "Authorization token required for namespace filtering" | ||
| ) | ||
|
|
||
| engine = resolve_engine() | ||
|
|
||
| if engine == QueryEngine.TRINO: | ||
| with _make_trino_ctx(http_request) as trino_ctx: | ||
| structure = trino_data_store.get_db_structure_trino( | ||
| conn=trino_ctx.connection, | ||
| with_schema=request.with_schema, | ||
| use_hms=request.use_hms, | ||
| filter_by_namespace=request.filter_by_namespace, | ||
| auth_token=auth_token, | ||
| ) | ||
| elif ctx.is_standalone_subprocess: | ||
| structure = run_in_spark_process( | ||
| get_db_structure_subprocess, | ||
| ctx.settings_dict, | ||
| with_schema=request.with_schema, | ||
| use_hms=request.use_hms, | ||
| filter_by_namespace=request.filter_by_namespace, | ||
| auth_token=auth_token, | ||
| app_name=ctx.app_name, | ||
| operation_name="get_db_structure", | ||
| ) | ||
| else: | ||
| settings = get_settings() | ||
| structure = cast( | ||
| dict[str, list[str] | dict[str, list[str]]], | ||
| data_store.get_db_structure( | ||
| spark=ctx.spark, | ||
| with_schema=request.with_schema, | ||
| use_hms=request.use_hms, | ||
| return_json=False, | ||
| filter_by_namespace=request.filter_by_namespace, | ||
| auth_token=auth_token, | ||
| settings=settings, | ||
| ), | ||
| ) |
There was a problem hiding this comment.
The Spark implementation of get_db_structure no longer uses filter_by_namespace/auth_token (see data_store.get_db_structure), but this route still enforces a token and forwards the flag/token. This means callers may get 401s for a feature that has no effect in Spark mode. Align behavior by either re-adding namespace filtering in data_store.get_db_structure/get_databases or removing the flag/token requirement for Spark mode.
| def get_db_structure( | ||
| spark: Optional[SparkSession] = None, | ||
| with_schema: bool = False, | ||
| use_hms: bool = True, | ||
| return_json: bool = True, | ||
| filter_by_namespace: bool = False, | ||
| auth_token: Optional[str] = None, | ||
| settings: Optional[BERDLSettings] = None, | ||
| settings: Optional[Any] = None, | ||
| ) -> Union[str, Dict]: | ||
| """Get the structure of databases in the Hive metastore. | ||
| """ | ||
| Get the structure of all accessible Iceberg namespaces. | ||
|
|
||
| Args: | ||
| spark: Optional SparkSession to use for operations | ||
| with_schema: Whether to include table schemas | ||
| use_hms: Whether to use Hive Metastore client for metadata retrieval | ||
| spark: SparkSession to use (required in MCP server context) | ||
| with_schema: Whether to include table column names | ||
| return_json: Whether to return the result as a JSON string | ||
| filter_by_namespace: Whether to filter databases by user/group ownership | ||
| and shared access (delegates to get_databases) | ||
| auth_token: KBase auth token (required if filter_by_namespace is True) | ||
| settings: BERDLSettings instance (required if use_hms is True) | ||
| settings: BERDLSettings instance (unused in Iceberg mode, kept for API compat) | ||
|
|
||
| Returns: | ||
| Database structure as either JSON string or dictionary: | ||
| { | ||
| "database_name": ["table1", "table2"] or | ||
| "database_name": { | ||
| "table1": ["column1", "column2"], | ||
| "table2": ["column1", "column2"] | ||
| Dictionary mapping ``catalog.namespace`` to table lists or schema dicts:: | ||
|
|
||
| { | ||
| "my.demo": ["table1", "table2"], | ||
| "globalusers.shared": ["dataset"] | ||
| } | ||
| } | ||
| """ | ||
|
|
||
| def _get_structure( | ||
| session: SparkSession, | ||
| ) -> Dict[str, Union[List[str], Dict[str, List[str]]]]: | ||
| db_structure = {} | ||
| databases = get_databases( | ||
| spark=session, | ||
| use_hms=use_hms, | ||
| return_json=False, | ||
| filter_by_namespace=filter_by_namespace, | ||
| auth_token=auth_token, | ||
| settings=settings, | ||
| ) | ||
| Or with ``with_schema=True``:: | ||
|
|
||
| { | ||
| "my.demo": { | ||
| "table1": ["col1", "col2"], | ||
| "table2": ["col1", "col2"] | ||
| } | ||
| } | ||
|
|
||
| for db in databases: | ||
| tables = get_tables( | ||
| database=db, | ||
| spark=session, | ||
| use_hms=use_hms, | ||
| return_json=False, | ||
| settings=settings, | ||
| ) | ||
| if with_schema and isinstance(tables, list): | ||
| db_structure[db] = _get_tables_with_schemas(db, tables, session) | ||
| else: | ||
| db_structure[db] = tables | ||
|
|
||
| return db_structure | ||
|
|
||
| if use_hms: | ||
| if settings is None: | ||
| settings = get_settings() | ||
| db_structure = {} | ||
| databases = get_databases( | ||
| spark=spark, | ||
| use_hms=True, | ||
| return_json=False, | ||
| filter_by_namespace=filter_by_namespace, | ||
| auth_token=auth_token, | ||
| settings=settings, | ||
| Raises: | ||
| ValueError: If spark is not provided | ||
| """ | ||
| if spark is None: | ||
| raise ValueError( | ||
| "SparkSession must be provided. In MCP server context, use FastAPI dependency injection." | ||
| ) | ||
|
|
||
| for db in databases: | ||
| tables = hive_metastore.get_tables(database=db, settings=settings) | ||
| if with_schema and isinstance(tables, list): | ||
| if spark is None: | ||
| raise ValueError( | ||
| "SparkSession must be provided for schema retrieval. " | ||
| "In MCP server context, use FastAPI dependency injection." | ||
| ) | ||
| db_structure[db] = _get_tables_with_schemas(db, tables, spark) | ||
| else: | ||
| db_structure[db] = tables | ||
|
|
||
| else: | ||
| db_structure = _execute_with_spark(_get_structure, spark) | ||
| databases = get_databases(spark=spark, return_json=False) | ||
|
|
||
| db_structure: Dict[str, Any] = {} | ||
| for db in databases: | ||
| tables = get_tables(database=db, spark=spark, return_json=False) | ||
| if with_schema: | ||
| db_structure[db] = { | ||
| tbl: get_table_schema( | ||
| database=db, table=tbl, spark=spark, return_json=False | ||
| ) | ||
| for tbl in tables | ||
| } | ||
| else: |
There was a problem hiding this comment.
get_db_structure() accepts filter_by_namespace/auth_token but ignores them (it always calls get_databases(spark=..., return_json=False) with no filtering). This is misleading for callers and conflicts with the route layer which enforces a token when filter_by_namespace=True. Either implement filtering, or remove these parameters (and corresponding request fields) to avoid a no-op security gate.
| spark = None | ||
| try: | ||
| spark = _create_spark_session(settings_dict, app_name) | ||
| settings = _reconstruct_settings(settings_dict) | ||
|
|
||
| result = data_store.get_databases( | ||
| spark=spark, | ||
| use_hms=use_hms, | ||
| return_json=False, | ||
| filter_by_namespace=filter_by_namespace, | ||
| auth_token=auth_token, | ||
| settings=settings, | ||
| ) | ||
| return list(result) |
There was a problem hiding this comment.
After removing the settings = _reconstruct_settings(...) usage in the subprocess helpers, _reconstruct_settings() is now unused in this module (no other references found). Consider deleting it to avoid dead code, or reusing it from _create_spark_session to keep a single code path for settings reconstruction.
No description provided.