2121 Union ,
2222)
2323
24- from pydantic import Field , field_validator
24+ from pydantic import ConfigDict , Field , field_validator
2525from requests import HTTPError , Session
2626from tenacity import RetryCallState , retry , retry_if_exception_type , stop_after_attempt
2727
7676 import pyarrow as pa
7777
7878
79+ class HttpMethod (str , Enum ):
80+ GET = "GET"
81+ HEAD = "HEAD"
82+ POST = "POST"
83+ DELETE = "DELETE"
84+
85+
86+ class Endpoint (IcebergBaseModel ):
87+ model_config = ConfigDict (frozen = True )
88+
89+ http_method : HttpMethod = Field ()
90+ path : str = Field ()
91+
92+ @field_validator ("path" , mode = "before" )
93+ @classmethod
94+ def _validate_path (cls , raw_path : str ) -> str :
95+ if not raw_path :
96+ raise ValueError ("Invalid path: empty" )
97+ raw_path = raw_path .strip ()
98+ if not raw_path :
99+ raise ValueError ("Invalid path: empty" )
100+ return raw_path
101+
102+ def __str__ (self ) -> str :
103+ """Return the string representation of the Endpoint class."""
104+ return f"{ self .http_method .value } { self .path } "
105+
106+ @classmethod
107+ def from_string (cls , endpoint : str | None ) -> "Endpoint" :
108+ if endpoint is None :
109+ raise ValueError ("Invalid endpoint (must consist of 'METHOD /path'): None" )
110+ elements = endpoint .split (None , 1 )
111+ if len (elements ) != 2 :
112+ raise ValueError (f"Invalid endpoint (must consist of two elements separated by a single space): { endpoint } " )
113+ return cls (http_method = HttpMethod (elements [0 ].upper ()), path = elements [1 ])
114+
115+
79116class Endpoints :
80117 get_config : str = "config"
81118 list_namespaces : str = "namespaces"
@@ -86,7 +123,7 @@ class Endpoints:
86123 namespace_exists : str = "namespaces/{namespace}"
87124 list_tables : str = "namespaces/{namespace}/tables"
88125 create_table : str = "namespaces/{namespace}/tables"
89- register_table = "namespaces/{namespace}/register"
126+ register_table : str = "namespaces/{namespace}/register"
90127 load_table : str = "namespaces/{namespace}/tables/{table}"
91128 update_table : str = "namespaces/{namespace}/tables/{table}"
92129 drop_table : str = "namespaces/{namespace}/tables/{table}"
@@ -100,6 +137,66 @@ class Endpoints:
100137 fetch_scan_tasks : str = "namespaces/{namespace}/tables/{table}/tasks"
101138
102139
140+ class Capability :
141+ V1_LIST_NAMESPACES = Endpoint (http_method = HttpMethod .GET , path = "/v1/{prefix}/namespaces" )
142+ V1_LOAD_NAMESPACE = Endpoint (http_method = HttpMethod .GET , path = "/v1/{prefix}/namespaces/{namespace}" )
143+ V1_NAMESPACE_EXISTS = Endpoint (http_method = HttpMethod .HEAD , path = "/v1/{prefix}/namespaces/{namespace}" )
144+ V1_UPDATE_NAMESPACE = Endpoint (http_method = HttpMethod .POST , path = "/v1/{prefix}/namespaces/{namespace}/properties" )
145+ V1_CREATE_NAMESPACE = Endpoint (http_method = HttpMethod .POST , path = "/v1/{prefix}/namespaces" )
146+ V1_DELETE_NAMESPACE = Endpoint (http_method = HttpMethod .DELETE , path = "/v1/{prefix}/namespaces/{namespace}" )
147+
148+ V1_LIST_TABLES = Endpoint (http_method = HttpMethod .GET , path = "/v1/{prefix}/namespaces/{namespace}/tables" )
149+ V1_LOAD_TABLE = Endpoint (http_method = HttpMethod .GET , path = "/v1/{prefix}/namespaces/{namespace}/tables/{table}" )
150+ V1_TABLE_EXISTS = Endpoint (http_method = HttpMethod .HEAD , path = "/v1/{prefix}/namespaces/{namespace}/tables/{table}" )
151+ V1_CREATE_TABLE = Endpoint (http_method = HttpMethod .POST , path = "/v1/{prefix}/namespaces/{namespace}/tables" )
152+ V1_UPDATE_TABLE = Endpoint (http_method = HttpMethod .POST , path = "/v1/{prefix}/namespaces/{namespace}/tables/{table}" )
153+ V1_DELETE_TABLE = Endpoint (http_method = HttpMethod .DELETE , path = "/v1/{prefix}/namespaces/{namespace}/tables/{table}" )
154+ V1_RENAME_TABLE = Endpoint (http_method = HttpMethod .POST , path = "/v1/{prefix}/tables/rename" )
155+ V1_REGISTER_TABLE = Endpoint (http_method = HttpMethod .POST , path = "/v1/{prefix}/namespaces/{namespace}/register" )
156+
157+ V1_LIST_VIEWS = Endpoint (http_method = HttpMethod .GET , path = "/v1/{prefix}/namespaces/{namespace}/views" )
158+ V1_LOAD_VIEW = Endpoint (http_method = HttpMethod .GET , path = "/v1/{prefix}/namespaces/{namespace}/views/{view}" )
159+ V1_VIEW_EXISTS = Endpoint (http_method = HttpMethod .HEAD , path = "/v1/{prefix}/namespaces/{namespace}/views/{view}" )
160+ V1_CREATE_VIEW = Endpoint (http_method = HttpMethod .POST , path = "/v1/{prefix}/namespaces/{namespace}/views" )
161+ V1_UPDATE_VIEW = Endpoint (http_method = HttpMethod .POST , path = "/v1/{prefix}/namespaces/{namespace}/views/{view}" )
162+ V1_DELETE_VIEW = Endpoint (http_method = HttpMethod .DELETE , path = "/v1/{prefix}/namespaces/{namespace}/views/{view}" )
163+ V1_RENAME_VIEW = Endpoint (http_method = HttpMethod .POST , path = "/v1/{prefix}/views/rename" )
164+ V1_SUBMIT_TABLE_SCAN_PLAN = Endpoint (
165+ http_method = HttpMethod .POST , path = "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"
166+ )
167+ V1_TABLE_SCAN_PLAN_TASKS = Endpoint (
168+ http_method = HttpMethod .POST , path = "/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks"
169+ )
170+
171+
172+ # Default endpoints for backwards compatibility with legacy servers that don't return endpoints
173+ # in ConfigResponse. Only includes namespace and table endpoints.
174+ DEFAULT_ENDPOINTS : frozenset [Endpoint ] = frozenset (
175+ (
176+ Capability .V1_LIST_NAMESPACES ,
177+ Capability .V1_LOAD_NAMESPACE ,
178+ Capability .V1_CREATE_NAMESPACE ,
179+ Capability .V1_UPDATE_NAMESPACE ,
180+ Capability .V1_DELETE_NAMESPACE ,
181+ Capability .V1_LIST_TABLES ,
182+ Capability .V1_LOAD_TABLE ,
183+ Capability .V1_CREATE_TABLE ,
184+ Capability .V1_UPDATE_TABLE ,
185+ Capability .V1_DELETE_TABLE ,
186+ Capability .V1_RENAME_TABLE ,
187+ Capability .V1_REGISTER_TABLE ,
188+ )
189+ )
190+
191+ # View endpoints conditionally added based on VIEW_ENDPOINTS_SUPPORTED property.
192+ VIEW_ENDPOINTS : frozenset [Endpoint ] = frozenset (
193+ (
194+ Capability .V1_LIST_VIEWS ,
195+ Capability .V1_DELETE_VIEW ,
196+ )
197+ )
198+
199+
103200class IdentifierKind (Enum ):
104201 TABLE = "table"
105202 VIEW = "view"
@@ -134,6 +231,8 @@ class IdentifierKind(Enum):
134231CUSTOM = "custom"
135232REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled"
136233REST_SCAN_PLANNING_ENABLED_DEFAULT = False
234+ VIEW_ENDPOINTS_SUPPORTED = "view-endpoints-supported"
235+ VIEW_ENDPOINTS_SUPPORTED_DEFAULT = False
137236
138237NAMESPACE_SEPARATOR = b"\x1f " .decode (UTF8 )
139238
@@ -180,6 +279,14 @@ class RegisterTableRequest(IcebergBaseModel):
180279class ConfigResponse (IcebergBaseModel ):
181280 defaults : Properties | None = Field (default_factory = dict )
182281 overrides : Properties | None = Field (default_factory = dict )
282+ endpoints : set [Endpoint ] | None = Field (default = None )
283+
284+ @field_validator ("endpoints" , mode = "before" )
285+ @classmethod
286+ def _parse_endpoints (cls , v : list [str ] | None ) -> set [Endpoint ] | None :
287+ if v is None :
288+ return None
289+ return {Endpoint .from_string (s ) for s in v }
183290
184291
185292class ListNamespaceResponse (IcebergBaseModel ):
@@ -218,6 +325,7 @@ class ListViewsResponse(IcebergBaseModel):
218325class RestCatalog (Catalog ):
219326 uri : str
220327 _session : Session
328+ _supported_endpoints : set [Endpoint ]
221329
222330 def __init__ (self , name : str , ** properties : str ):
223331 """Rest Catalog.
@@ -279,7 +387,9 @@ def is_rest_scan_planning_enabled(self) -> bool:
279387 Returns:
280388 True if enabled, False otherwise.
281389 """
282- return property_as_bool (self .properties , REST_SCAN_PLANNING_ENABLED , REST_SCAN_PLANNING_ENABLED_DEFAULT )
390+ return Capability .V1_SUBMIT_TABLE_SCAN_PLAN in self ._supported_endpoints and property_as_bool (
391+ self .properties , REST_SCAN_PLANNING_ENABLED , REST_SCAN_PLANNING_ENABLED_DEFAULT
392+ )
283393
284394 def _create_legacy_oauth2_auth_manager (self , session : Session ) -> AuthManager :
285395 """Create the LegacyOAuth2AuthManager by fetching required properties.
@@ -327,6 +437,18 @@ def url(self, endpoint: str, prefixed: bool = True, **kwargs: Any) -> str:
327437
328438 return url + endpoint .format (** kwargs )
329439
440+ def _check_endpoint (self , endpoint : Endpoint ) -> None :
441+ """Check if an endpoint is supported by the server.
442+
443+ Args:
444+ endpoint: The endpoint to check against the set of supported endpoints
445+
446+ Raises:
447+ NotImplementedError: If the endpoint is not supported.
448+ """
449+ if endpoint not in self ._supported_endpoints :
450+ raise NotImplementedError (f"Server does not support endpoint: { endpoint } " )
451+
330452 @property
331453 def auth_url (self ) -> str :
332454 self ._warn_oauth_tokens_deprecation ()
@@ -384,6 +506,17 @@ def _fetch_config(self) -> None:
384506 # Update URI based on overrides
385507 self .uri = config [URI ]
386508
509+ # Determine supported endpoints
510+ endpoints = config_response .endpoints
511+ if endpoints :
512+ self ._supported_endpoints = set (endpoints )
513+ else :
514+ # Use default endpoints for legacy servers that don't return endpoints
515+ self ._supported_endpoints = set (DEFAULT_ENDPOINTS )
516+ # Conditionally add view endpoints based on config
517+ if property_as_bool (self .properties , VIEW_ENDPOINTS_SUPPORTED , VIEW_ENDPOINTS_SUPPORTED_DEFAULT ):
518+ self ._supported_endpoints .update (VIEW_ENDPOINTS )
519+
387520 def _identifier_to_validated_tuple (self , identifier : str | Identifier ) -> Identifier :
388521 identifier_tuple = self .identifier_to_tuple (identifier )
389522 if len (identifier_tuple ) <= 1 :
@@ -503,6 +636,7 @@ def _create_table(
503636 properties : Properties = EMPTY_DICT ,
504637 stage_create : bool = False ,
505638 ) -> TableResponse :
639+ self ._check_endpoint (Capability .V1_CREATE_TABLE )
506640 iceberg_schema = self ._convert_schema_if_needed (
507641 schema ,
508642 int (properties .get (TableProperties .FORMAT_VERSION , TableProperties .DEFAULT_FORMAT_VERSION )), # type: ignore
@@ -591,6 +725,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) -
591725 Raises:
592726 TableAlreadyExistsError: If the table already exists
593727 """
728+ self ._check_endpoint (Capability .V1_REGISTER_TABLE )
594729 namespace_and_table = self ._split_identifier_for_path (identifier )
595730 request = RegisterTableRequest (
596731 name = namespace_and_table ["table" ],
@@ -611,6 +746,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) -
611746
612747 @retry (** _RETRY_ARGS )
613748 def list_tables (self , namespace : str | Identifier ) -> list [Identifier ]:
749+ self ._check_endpoint (Capability .V1_LIST_TABLES )
614750 namespace_tuple = self ._check_valid_namespace_identifier (namespace )
615751 namespace_concat = NAMESPACE_SEPARATOR .join (namespace_tuple )
616752 response = self ._session .get (self .url (Endpoints .list_tables , namespace = namespace_concat ))
@@ -622,6 +758,7 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
622758
623759 @retry (** _RETRY_ARGS )
624760 def load_table (self , identifier : str | Identifier ) -> Table :
761+ self ._check_endpoint (Capability .V1_LOAD_TABLE )
625762 params = {}
626763 if mode := self .properties .get (SNAPSHOT_LOADING_MODE ):
627764 if mode in {"all" , "refs" }:
@@ -642,6 +779,7 @@ def load_table(self, identifier: str | Identifier) -> Table:
642779
643780 @retry (** _RETRY_ARGS )
644781 def drop_table (self , identifier : str | Identifier , purge_requested : bool = False ) -> None :
782+ self ._check_endpoint (Capability .V1_DELETE_TABLE )
645783 response = self ._session .delete (
646784 self .url (Endpoints .drop_table , prefixed = True , ** self ._split_identifier_for_path (identifier )),
647785 params = {"purgeRequested" : purge_requested },
@@ -657,6 +795,7 @@ def purge_table(self, identifier: str | Identifier) -> None:
657795
658796 @retry (** _RETRY_ARGS )
659797 def rename_table (self , from_identifier : str | Identifier , to_identifier : str | Identifier ) -> Table :
798+ self ._check_endpoint (Capability .V1_RENAME_TABLE )
660799 payload = {
661800 "source" : self ._split_identifier_for_json (from_identifier ),
662801 "destination" : self ._split_identifier_for_json (to_identifier ),
@@ -692,6 +831,8 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm
692831
693832 @retry (** _RETRY_ARGS )
694833 def list_views (self , namespace : str | Identifier ) -> list [Identifier ]:
834+ if Capability .V1_LIST_VIEWS not in self ._supported_endpoints :
835+ return []
695836 namespace_tuple = self ._check_valid_namespace_identifier (namespace )
696837 namespace_concat = NAMESPACE_SEPARATOR .join (namespace_tuple )
697838 response = self ._session .get (self .url (Endpoints .list_views , namespace = namespace_concat ))
@@ -720,6 +861,7 @@ def commit_table(
720861 CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
721862 CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
722863 """
864+ self ._check_endpoint (Capability .V1_UPDATE_TABLE )
723865 identifier = table .name ()
724866 table_identifier = TableIdentifier (namespace = identifier [:- 1 ], name = identifier [- 1 ])
725867 table_request = CommitTableRequest (identifier = table_identifier , requirements = requirements , updates = updates )
@@ -749,6 +891,7 @@ def commit_table(
749891
750892 @retry (** _RETRY_ARGS )
751893 def create_namespace (self , namespace : str | Identifier , properties : Properties = EMPTY_DICT ) -> None :
894+ self ._check_endpoint (Capability .V1_CREATE_NAMESPACE )
752895 namespace_tuple = self ._check_valid_namespace_identifier (namespace )
753896 payload = {"namespace" : namespace_tuple , "properties" : properties }
754897 response = self ._session .post (self .url (Endpoints .create_namespace ), json = payload )
@@ -759,6 +902,7 @@ def create_namespace(self, namespace: str | Identifier, properties: Properties =
759902
760903 @retry (** _RETRY_ARGS )
761904 def drop_namespace (self , namespace : str | Identifier ) -> None :
905+ self ._check_endpoint (Capability .V1_DELETE_NAMESPACE )
762906 namespace_tuple = self ._check_valid_namespace_identifier (namespace )
763907 namespace = NAMESPACE_SEPARATOR .join (namespace_tuple )
764908 response = self ._session .delete (self .url (Endpoints .drop_namespace , namespace = namespace ))
@@ -769,6 +913,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
769913
770914 @retry (** _RETRY_ARGS )
771915 def list_namespaces (self , namespace : str | Identifier = ()) -> list [Identifier ]:
916+ self ._check_endpoint (Capability .V1_LIST_NAMESPACES )
772917 namespace_tuple = self .identifier_to_tuple (namespace )
773918 response = self ._session .get (
774919 self .url (
@@ -786,6 +931,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
786931
787932 @retry (** _RETRY_ARGS )
788933 def load_namespace_properties (self , namespace : str | Identifier ) -> Properties :
934+ self ._check_endpoint (Capability .V1_LOAD_NAMESPACE )
789935 namespace_tuple = self ._check_valid_namespace_identifier (namespace )
790936 namespace = NAMESPACE_SEPARATOR .join (namespace_tuple )
791937 response = self ._session .get (self .url (Endpoints .load_namespace_metadata , namespace = namespace ))
@@ -800,6 +946,7 @@ def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
800946 def update_namespace_properties (
801947 self , namespace : str | Identifier , removals : set [str ] | None = None , updates : Properties = EMPTY_DICT
802948 ) -> PropertiesUpdateSummary :
949+ self ._check_endpoint (Capability .V1_UPDATE_NAMESPACE )
803950 namespace_tuple = self ._check_valid_namespace_identifier (namespace )
804951 namespace = NAMESPACE_SEPARATOR .join (namespace_tuple )
805952 payload = {"removals" : list (removals or []), "updates" : updates }
@@ -819,6 +966,14 @@ def update_namespace_properties(
819966 def namespace_exists (self , namespace : str | Identifier ) -> bool :
820967 namespace_tuple = self ._check_valid_namespace_identifier (namespace )
821968 namespace = NAMESPACE_SEPARATOR .join (namespace_tuple )
969+
970+ if Capability .V1_NAMESPACE_EXISTS not in self ._supported_endpoints :
971+ try :
972+ self .load_namespace_properties (namespace_tuple )
973+ return True
974+ except NoSuchNamespaceError :
975+ return False
976+
822977 response = self ._session .head (self .url (Endpoints .namespace_exists , namespace = namespace ))
823978
824979 if response .status_code == 404 :
@@ -843,6 +998,13 @@ def table_exists(self, identifier: str | Identifier) -> bool:
843998 Returns:
844999 bool: True if the table exists, False otherwise.
8451000 """
1001+ if Capability .V1_TABLE_EXISTS not in self ._supported_endpoints :
1002+ try :
1003+ self .load_table (identifier )
1004+ return True
1005+ except NoSuchTableError :
1006+ return False
1007+
8461008 response = self ._session .head (
8471009 self .url (Endpoints .load_table , prefixed = True , ** self ._split_identifier_for_path (identifier ))
8481010 )
@@ -886,6 +1048,7 @@ def view_exists(self, identifier: str | Identifier) -> bool:
8861048
8871049 @retry (** _RETRY_ARGS )
8881050 def drop_view (self , identifier : str ) -> None :
1051+ self ._check_endpoint (Capability .V1_DELETE_VIEW )
8891052 response = self ._session .delete (
8901053 self .url (Endpoints .drop_view , prefixed = True , ** self ._split_identifier_for_path (identifier , IdentifierKind .VIEW )),
8911054 )
0 commit comments