diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index a28ff562bd..7866f5e8bd 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -21,7 +21,7 @@ Union, ) -from pydantic import Field, field_validator +from pydantic import ConfigDict, Field, field_validator from requests import HTTPError, Session from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt @@ -76,6 +76,39 @@ import pyarrow as pa +class HttpMethod(str, Enum): + GET = "GET" + HEAD = "HEAD" + POST = "POST" + DELETE = "DELETE" + + +class Endpoint(IcebergBaseModel): + model_config = ConfigDict(frozen=True) + + http_method: HttpMethod = Field() + path: str = Field() + + @field_validator("path", mode="before") + @classmethod + def _validate_path(cls, raw_path: str) -> str: + raw_path = raw_path.strip() + if not raw_path: + raise ValueError("Invalid path: empty") + return raw_path + + def __str__(self) -> str: + """Return the string representation of the Endpoint class.""" + return f"{self.http_method.value} {self.path}" + + @classmethod + def from_string(cls, endpoint: str) -> "Endpoint": + elements = endpoint.strip().split(None, 1) + if len(elements) != 2: + raise ValueError(f"Invalid endpoint (must consist of two elements separated by a single space): {endpoint}") + return cls(http_method=HttpMethod(elements[0].upper()), path=elements[1]) + + class Endpoints: get_config: str = "config" list_namespaces: str = "namespaces" @@ -86,7 +119,7 @@ class Endpoints: namespace_exists: str = "namespaces/{namespace}" list_tables: str = "namespaces/{namespace}/tables" create_table: str = "namespaces/{namespace}/tables" - register_table = "namespaces/{namespace}/register" + register_table: str = "namespaces/{namespace}/register" load_table: str = "namespaces/{namespace}/tables/{table}" update_table: str = "namespaces/{namespace}/tables/{table}" drop_table: str = "namespaces/{namespace}/tables/{table}" @@ -100,6 +133,61 @@ class Endpoints: fetch_scan_tasks: str = "namespaces/{namespace}/tables/{table}/tasks" +API_PREFIX = "/v1/{prefix}" + + +class Capability: + V1_LIST_NAMESPACES = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.list_namespaces}") + V1_LOAD_NAMESPACE = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.load_namespace_metadata}") + V1_NAMESPACE_EXISTS = Endpoint(http_method=HttpMethod.HEAD, path=f"{API_PREFIX}/{Endpoints.namespace_exists}") + V1_UPDATE_NAMESPACE = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.update_namespace_properties}") + V1_CREATE_NAMESPACE = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.create_namespace}") + V1_DELETE_NAMESPACE = Endpoint(http_method=HttpMethod.DELETE, path=f"{API_PREFIX}/{Endpoints.drop_namespace}") + + V1_LIST_TABLES = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.list_tables}") + V1_LOAD_TABLE = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.load_table}") + V1_TABLE_EXISTS = Endpoint(http_method=HttpMethod.HEAD, path=f"{API_PREFIX}/{Endpoints.table_exists}") + V1_CREATE_TABLE = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.create_table}") + V1_UPDATE_TABLE = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.update_table}") + V1_DELETE_TABLE = Endpoint(http_method=HttpMethod.DELETE, path=f"{API_PREFIX}/{Endpoints.drop_table}") + V1_RENAME_TABLE = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.rename_table}") + V1_REGISTER_TABLE = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.register_table}") + + V1_LIST_VIEWS = Endpoint(http_method=HttpMethod.GET, path=f"{API_PREFIX}/{Endpoints.list_views}") + V1_VIEW_EXISTS = Endpoint(http_method=HttpMethod.HEAD, path=f"{API_PREFIX}/{Endpoints.view_exists}") + V1_DELETE_VIEW = Endpoint(http_method=HttpMethod.DELETE, path=f"{API_PREFIX}/{Endpoints.drop_view}") + V1_SUBMIT_TABLE_SCAN_PLAN = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.plan_table_scan}") + V1_TABLE_SCAN_PLAN_TASKS = Endpoint(http_method=HttpMethod.POST, path=f"{API_PREFIX}/{Endpoints.fetch_scan_tasks}") + + +# Default endpoints for backwards compatibility with legacy servers that don't return endpoints +# in ConfigResponse. Only includes namespace and table endpoints. +DEFAULT_ENDPOINTS: frozenset[Endpoint] = frozenset( + ( + Capability.V1_LIST_NAMESPACES, + Capability.V1_LOAD_NAMESPACE, + Capability.V1_CREATE_NAMESPACE, + Capability.V1_UPDATE_NAMESPACE, + Capability.V1_DELETE_NAMESPACE, + Capability.V1_LIST_TABLES, + Capability.V1_LOAD_TABLE, + Capability.V1_CREATE_TABLE, + Capability.V1_UPDATE_TABLE, + Capability.V1_DELETE_TABLE, + Capability.V1_RENAME_TABLE, + Capability.V1_REGISTER_TABLE, + ) +) + +# View endpoints conditionally added based on VIEW_ENDPOINTS_SUPPORTED property. +VIEW_ENDPOINTS: frozenset[Endpoint] = frozenset( + ( + Capability.V1_LIST_VIEWS, + Capability.V1_DELETE_VIEW, + ) +) + + class IdentifierKind(Enum): TABLE = "table" VIEW = "view" @@ -134,6 +222,10 @@ class IdentifierKind(Enum): CUSTOM = "custom" REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled" REST_SCAN_PLANNING_ENABLED_DEFAULT = False +# for backwards compatibility with older REST servers where it can be assumed that a particular +# server supports view endpoints but doesn't send the "endpoints" field in the ConfigResponse +VIEW_ENDPOINTS_SUPPORTED = "view-endpoints-supported" +VIEW_ENDPOINTS_SUPPORTED_DEFAULT = False NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8) @@ -180,6 +272,14 @@ class RegisterTableRequest(IcebergBaseModel): class ConfigResponse(IcebergBaseModel): defaults: Properties | None = Field(default_factory=dict) overrides: Properties | None = Field(default_factory=dict) + endpoints: set[Endpoint] | None = Field(default=None) + + @field_validator("endpoints", mode="before") + @classmethod + def _parse_endpoints(cls, v: list[str] | None) -> set[Endpoint] | None: + if v is None: + return None + return {Endpoint.from_string(s) for s in v} class ListNamespaceResponse(IcebergBaseModel): @@ -218,6 +318,7 @@ class ListViewsResponse(IcebergBaseModel): class RestCatalog(Catalog): uri: str _session: Session + _supported_endpoints: set[Endpoint] def __init__(self, name: str, **properties: str): """Rest Catalog. @@ -279,7 +380,9 @@ def is_rest_scan_planning_enabled(self) -> bool: Returns: True if enabled, False otherwise. """ - return property_as_bool(self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT) + return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in self._supported_endpoints and property_as_bool( + self.properties, REST_SCAN_PLANNING_ENABLED, REST_SCAN_PLANNING_ENABLED_DEFAULT + ) def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager: """Create the LegacyOAuth2AuthManager by fetching required properties. @@ -327,6 +430,18 @@ def url(self, endpoint: str, prefixed: bool = True, **kwargs: Any) -> str: return url + endpoint.format(**kwargs) + def _check_endpoint(self, endpoint: Endpoint) -> None: + """Check if an endpoint is supported by the server. + + Args: + endpoint: The endpoint to check against the set of supported endpoints + + Raises: + NotImplementedError: If the endpoint is not supported. + """ + if endpoint not in self._supported_endpoints: + raise NotImplementedError(f"Server does not support endpoint: {endpoint}") + @property def auth_url(self) -> str: self._warn_oauth_tokens_deprecation() @@ -384,6 +499,17 @@ def _fetch_config(self) -> None: # Update URI based on overrides self.uri = config[URI] + # Determine supported endpoints + endpoints = config_response.endpoints + if endpoints: + self._supported_endpoints = set(endpoints) + else: + # Use default endpoints for legacy servers that don't return endpoints + self._supported_endpoints = set(DEFAULT_ENDPOINTS) + # Conditionally add view endpoints based on config + if property_as_bool(self.properties, VIEW_ENDPOINTS_SUPPORTED, VIEW_ENDPOINTS_SUPPORTED_DEFAULT): + self._supported_endpoints.update(VIEW_ENDPOINTS) + def _identifier_to_validated_tuple(self, identifier: str | Identifier) -> Identifier: identifier_tuple = self.identifier_to_tuple(identifier) if len(identifier_tuple) <= 1: @@ -503,6 +629,7 @@ def _create_table( properties: Properties = EMPTY_DICT, stage_create: bool = False, ) -> TableResponse: + self._check_endpoint(Capability.V1_CREATE_TABLE) iceberg_schema = self._convert_schema_if_needed( schema, int(properties.get(TableProperties.FORMAT_VERSION, TableProperties.DEFAULT_FORMAT_VERSION)), # type: ignore @@ -591,6 +718,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) - Raises: TableAlreadyExistsError: If the table already exists """ + self._check_endpoint(Capability.V1_REGISTER_TABLE) namespace_and_table = self._split_identifier_for_path(identifier) request = RegisterTableRequest( name=namespace_and_table["table"], @@ -611,6 +739,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) - @retry(**_RETRY_ARGS) def list_tables(self, namespace: str | Identifier) -> list[Identifier]: + self._check_endpoint(Capability.V1_LIST_TABLES) namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat)) @@ -622,6 +751,7 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: @retry(**_RETRY_ARGS) def load_table(self, identifier: str | Identifier) -> Table: + self._check_endpoint(Capability.V1_LOAD_TABLE) params = {} if mode := self.properties.get(SNAPSHOT_LOADING_MODE): if mode in {"all", "refs"}: @@ -642,6 +772,7 @@ def load_table(self, identifier: str | Identifier) -> Table: @retry(**_RETRY_ARGS) def drop_table(self, identifier: str | Identifier, purge_requested: bool = False) -> None: + self._check_endpoint(Capability.V1_DELETE_TABLE) response = self._session.delete( self.url(Endpoints.drop_table, prefixed=True, **self._split_identifier_for_path(identifier)), params={"purgeRequested": purge_requested}, @@ -657,6 +788,7 @@ def purge_table(self, identifier: str | Identifier) -> None: @retry(**_RETRY_ARGS) def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table: + self._check_endpoint(Capability.V1_RENAME_TABLE) payload = { "source": self._split_identifier_for_json(from_identifier), "destination": self._split_identifier_for_json(to_identifier), @@ -692,6 +824,8 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm @retry(**_RETRY_ARGS) def list_views(self, namespace: str | Identifier) -> list[Identifier]: + if Capability.V1_LIST_VIEWS not in self._supported_endpoints: + return [] namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat)) @@ -720,6 +854,7 @@ def commit_table( CommitFailedException: Requirement not met, or a conflict with a concurrent commit. CommitStateUnknownException: Failed due to an internal exception on the side of the catalog. """ + self._check_endpoint(Capability.V1_UPDATE_TABLE) identifier = table.name() table_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1]) table_request = CommitTableRequest(identifier=table_identifier, requirements=requirements, updates=updates) @@ -749,6 +884,7 @@ def commit_table( @retry(**_RETRY_ARGS) def create_namespace(self, namespace: str | Identifier, properties: Properties = EMPTY_DICT) -> None: + self._check_endpoint(Capability.V1_CREATE_NAMESPACE) namespace_tuple = self._check_valid_namespace_identifier(namespace) payload = {"namespace": namespace_tuple, "properties": properties} response = self._session.post(self.url(Endpoints.create_namespace), json=payload) @@ -759,6 +895,7 @@ def create_namespace(self, namespace: str | Identifier, properties: Properties = @retry(**_RETRY_ARGS) def drop_namespace(self, namespace: str | Identifier) -> None: + self._check_endpoint(Capability.V1_DELETE_NAMESPACE) namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.delete(self.url(Endpoints.drop_namespace, namespace=namespace)) @@ -769,6 +906,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None: @retry(**_RETRY_ARGS) def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: + self._check_endpoint(Capability.V1_LIST_NAMESPACES) namespace_tuple = self.identifier_to_tuple(namespace) response = self._session.get( self.url( @@ -786,6 +924,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: @retry(**_RETRY_ARGS) def load_namespace_properties(self, namespace: str | Identifier) -> Properties: + self._check_endpoint(Capability.V1_LOAD_NAMESPACE) namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) response = self._session.get(self.url(Endpoints.load_namespace_metadata, namespace=namespace)) @@ -800,6 +939,7 @@ def load_namespace_properties(self, namespace: str | Identifier) -> Properties: def update_namespace_properties( self, namespace: str | Identifier, removals: set[str] | None = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: + self._check_endpoint(Capability.V1_UPDATE_NAMESPACE) namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) payload = {"removals": list(removals or []), "updates": updates} @@ -819,6 +959,14 @@ def update_namespace_properties( def namespace_exists(self, namespace: str | Identifier) -> bool: namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace = NAMESPACE_SEPARATOR.join(namespace_tuple) + # fallback in order to work with older rest catalog implementations + if Capability.V1_NAMESPACE_EXISTS not in self._supported_endpoints: + try: + self.load_namespace_properties(namespace_tuple) + return True + except NoSuchNamespaceError: + return False + response = self._session.head(self.url(Endpoints.namespace_exists, namespace=namespace)) if response.status_code == 404: @@ -843,6 +991,14 @@ def table_exists(self, identifier: str | Identifier) -> bool: Returns: bool: True if the table exists, False otherwise. """ + # fallback in order to work with older rest catalog implementations + if Capability.V1_TABLE_EXISTS not in self._supported_endpoints: + try: + self.load_table(identifier) + return True + except NoSuchTableError: + return False + response = self._session.head( self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier)) ) @@ -886,6 +1042,7 @@ def view_exists(self, identifier: str | Identifier) -> bool: @retry(**_RETRY_ARGS) def drop_view(self, identifier: str) -> None: + self._check_endpoint(Capability.V1_DELETE_VIEW) response = self._session.delete( self.url(Endpoints.drop_view, prefixed=True, **self._split_identifier_for_path(identifier, IdentifierKind.VIEW)), ) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 464314f3be..a9719c2c93 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -27,7 +27,7 @@ import pyiceberg from pyiceberg.catalog import PropertiesUpdateSummary, load_catalog -from pyiceberg.catalog.rest import OAUTH2_SERVER_URI, SNAPSHOT_LOADING_MODE, RestCatalog +from pyiceberg.catalog.rest import DEFAULT_ENDPOINTS, OAUTH2_SERVER_URI, SNAPSHOT_LOADING_MODE, Capability, RestCatalog from pyiceberg.exceptions import ( AuthorizationExpiredError, NamespaceAlreadyExistsError, @@ -68,6 +68,28 @@ "Content-type": "application/x-www-form-urlencoded", } +TEST_SUPPORTED_ENDPOINTS = [ + Capability.V1_LIST_NAMESPACES, + Capability.V1_LOAD_NAMESPACE, + Capability.V1_NAMESPACE_EXISTS, + Capability.V1_UPDATE_NAMESPACE, + Capability.V1_CREATE_NAMESPACE, + Capability.V1_DELETE_NAMESPACE, + Capability.V1_LIST_TABLES, + Capability.V1_LOAD_TABLE, + Capability.V1_TABLE_EXISTS, + Capability.V1_CREATE_TABLE, + Capability.V1_UPDATE_TABLE, + Capability.V1_DELETE_TABLE, + Capability.V1_RENAME_TABLE, + Capability.V1_REGISTER_TABLE, + Capability.V1_LIST_VIEWS, + Capability.V1_VIEW_EXISTS, + Capability.V1_DELETE_VIEW, + Capability.V1_SUBMIT_TABLE_SCAN_PLAN, + Capability.V1_TABLE_SCAN_PLAN_TASKS, +] + @pytest.fixture def example_table_metadata_with_snapshot_v1_rest_json(example_table_metadata_with_snapshot_v1: dict[str, Any]) -> dict[str, Any]: @@ -112,7 +134,7 @@ def rest_mock(requests_mock: Mocker) -> Mocker: """ requests_mock.get( f"{TEST_URI}v1/config", - json={"defaults": {}, "overrides": {}}, + json={"defaults": {}, "overrides": {}, "endpoints": [str(endpoint) for endpoint in TEST_SUPPORTED_ENDPOINTS]}, status_code=200, ) return requests_mock @@ -1995,21 +2017,11 @@ def test_rest_catalog_context_manager_with_exception_sigv4(self, rest_mock: Mock assert len(catalog._session.adapters) == self.EXPECTED_ADAPTERS_SIGV4 def test_rest_scan_planning_disabled_by_default(self, rest_mock: Mocker) -> None: - rest_mock.get( - f"{TEST_URI}v1/config", - json={"defaults": {}, "overrides": {}}, - status_code=200, - ) catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) assert catalog.is_rest_scan_planning_enabled() is False def test_rest_scan_planning_enabled_by_property(self, rest_mock: Mocker) -> None: - rest_mock.get( - f"{TEST_URI}v1/config", - json={"defaults": {}, "overrides": {}}, - status_code=200, - ) catalog = RestCatalog( "rest", uri=TEST_URI, @@ -2019,12 +2031,22 @@ def test_rest_scan_planning_enabled_by_property(self, rest_mock: Mocker) -> None assert catalog.is_rest_scan_planning_enabled() is True - def test_rest_scan_planning_explicitly_disabled(self, rest_mock: Mocker) -> None: - rest_mock.get( + def test_rest_scan_planning_disabled_when_endpoint_unsupported(self, requests_mock: Mocker) -> None: + requests_mock.get( f"{TEST_URI}v1/config", json={"defaults": {}, "overrides": {}}, status_code=200, ) + catalog = RestCatalog( + "rest", + uri=TEST_URI, + token=TEST_TOKEN, + **{"rest-scan-planning-enabled": "true"}, + ) + + assert catalog.is_rest_scan_planning_enabled() is False + + def test_rest_scan_planning_explicitly_disabled(self, rest_mock: Mocker) -> None: catalog = RestCatalog( "rest", uri=TEST_URI, @@ -2037,9 +2059,98 @@ def test_rest_scan_planning_explicitly_disabled(self, rest_mock: Mocker) -> None def test_rest_scan_planning_enabled_from_server_config(self, rest_mock: Mocker) -> None: rest_mock.get( f"{TEST_URI}v1/config", - json={"defaults": {"rest-scan-planning-enabled": "true"}, "overrides": {}}, + json={ + "defaults": {"rest-scan-planning-enabled": "true"}, + "overrides": {}, + "endpoints": ["POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"], + }, status_code=200, ) catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) assert catalog.is_rest_scan_planning_enabled() is True + + def test_supported_endpoint(self, requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": ["GET /v1/{prefix}/namespaces", "GET /v1/{prefix}/namespaces/{namespace}/tables"], + }, + status_code=200, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token="token") + + # Should not raise since these endpoints are in the supported set + catalog._check_endpoint(Capability.V1_LIST_NAMESPACES) + catalog._check_endpoint(Capability.V1_LIST_TABLES) + + def test_unsupported_endpoint(self, requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": ["GET /v1/{prefix}/namespaces"], + }, + status_code=200, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token="token") + + with pytest.raises(NotImplementedError, match="Server does not support endpoint"): + catalog._check_endpoint(Capability.V1_LIST_TABLES) + + def test_config_returns_invalid_endpoint(self, requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={ + "defaults": {}, + "overrides": {}, + "endpoints": ["INVALID_ENDPOINT"], + }, + status_code=200, + ) + + with pytest.raises(ValueError, match="Invalid endpoint"): + RestCatalog("rest", uri=TEST_URI, token="token") + + def test_default_endpoints_used_when_none_returned(self, requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={"defaults": {}, "overrides": {}}, + status_code=200, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token="token") + + # Should not raise for default endpoints + for endpoint in DEFAULT_ENDPOINTS: + catalog._check_endpoint(endpoint) + + def test_view_endpoints_not_included_by_default(self, requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={"defaults": {}, "overrides": {}}, + status_code=200, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token="token") + + with pytest.raises(NotImplementedError, match="Server does not support endpoint"): + catalog._check_endpoint(Capability.V1_LIST_VIEWS) + + def test_view_endpoints_enabled_with_config(self, requests_mock: Mocker) -> None: + requests_mock.get( + f"{TEST_URI}v1/config", + json={"defaults": {}, "overrides": {}}, + status_code=200, + ) + catalog = RestCatalog( + "rest", + uri=TEST_URI, + token="token", + **{"view-endpoints-supported": "true"}, + ) + + # View endpoints should be supported when enabled + catalog._check_endpoint(Capability.V1_LIST_VIEWS) + catalog._check_endpoint(Capability.V1_DELETE_VIEW)