Skip to content

Commit 5273b95

Browse files
authored
Merge branch 'apache:main' into fix/hive-client-does-not-update-table-properties
2 parents c22470c + ea57cbb commit 5273b95

22 files changed

+2329
-1767
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ coverage.xml
3535
.project
3636
.settings
3737
bin/
38+
.vscode/
3839

3940
# Hive/metastore files
4041
metastore_db/

dev/docker-compose-integration.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ services:
8181
- AWS_REGION=us-east-1
8282
entrypoint: >
8383
/bin/sh -c "
84-
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
84+
until (/usr/bin/mc alias set minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
8585
/usr/bin/mc mb minio/warehouse;
8686
/usr/bin/mc policy set public minio/warehouse;
8787
tail -f /dev/null

dev/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ services:
3838
- AWS_REGION=us-east-1
3939
entrypoint: >
4040
/bin/sh -c "
41-
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
41+
until (/usr/bin/mc alias set minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
4242
/usr/bin/mc rm -r --force minio/warehouse;
4343
/usr/bin/mc mb minio/warehouse;
4444
/usr/bin/mc policy set public minio/warehouse;

mkdocs/docs/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ For the FileIO there are several configuration options available:
155155
| adls.tenant-id | ad667be4-b811-11ed-afa1-0242ac120002 | The tenant-id |
156156
| adls.client-id | ad667be4-b811-11ed-afa1-0242ac120002 | The client-id |
157157
| adls.client-secret | oCA3R6P\*ka#oa1Sms2J74z... | The client-secret |
158+
| adls.account-host | accountname1.blob.core.windows.net | The storage account host. See [AzureBlobFileSystem](https://github.com/fsspec/adlfs/blob/adb9c53b74a0d420625b86dd00fbe615b43201d2/adlfs/spec.py#L125) for reference |
158159

159160
<!-- markdown-link-check-enable-->
160161

poetry.lock

Lines changed: 1823 additions & 1548 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/catalog/dynamodb.py

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666

6767
if TYPE_CHECKING:
6868
import pyarrow as pa
69+
from mypy_boto3_dynamodb.client import DynamoDBClient
70+
6971

7072
DYNAMODB_CLIENT = "dynamodb"
7173

@@ -94,18 +96,28 @@
9496

9597

9698
class DynamoDbCatalog(MetastoreCatalog):
97-
def __init__(self, name: str, **properties: str):
99+
def __init__(self, name: str, client: Optional["DynamoDBClient"] = None, **properties: str):
100+
"""Dynamodb catalog.
101+
102+
Args:
103+
name: Name to identify the catalog.
104+
client: An optional boto3 dynamodb client.
105+
properties: Properties for dynamodb client construction and configuration.
106+
"""
98107
super().__init__(name, **properties)
108+
if client is not None:
109+
self.dynamodb = client
110+
else:
111+
session = boto3.Session(
112+
profile_name=properties.get(DYNAMODB_PROFILE_NAME),
113+
region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION),
114+
botocore_session=properties.get(BOTOCORE_SESSION),
115+
aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
116+
aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
117+
aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN),
118+
)
119+
self.dynamodb = session.client(DYNAMODB_CLIENT)
99120

100-
session = boto3.Session(
101-
profile_name=properties.get(DYNAMODB_PROFILE_NAME),
102-
region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION),
103-
botocore_session=properties.get(BOTOCORE_SESSION),
104-
aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
105-
aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
106-
aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN),
107-
)
108-
self.dynamodb = session.client(DYNAMODB_CLIENT)
109121
self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT)
110122
self._ensure_catalog_table_exists_or_create()
111123

@@ -824,7 +836,9 @@ def _convert_dynamo_item_to_regular_dict(dynamo_json: Dict[str, Any]) -> Dict[st
824836
raise ValueError("Only S and N data types are supported.")
825837

826838
values = list(val_dict.values())
827-
assert len(values) == 1
839+
if len(values) != 1:
840+
raise ValueError(f"Expecting only 1 value: {values}")
841+
828842
column_value = values[0]
829843
regular_json[column_name] = column_value
830844

pyiceberg/catalog/glue.py

Lines changed: 54 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,6 @@
3030

3131
import boto3
3232
from botocore.config import Config
33-
from mypy_boto3_glue.client import GlueClient
34-
from mypy_boto3_glue.type_defs import (
35-
ColumnTypeDef,
36-
DatabaseInputTypeDef,
37-
DatabaseTypeDef,
38-
StorageDescriptorTypeDef,
39-
TableInputTypeDef,
40-
TableTypeDef,
41-
)
4233

4334
from pyiceberg.catalog import (
4435
BOTOCORE_SESSION,
@@ -101,6 +92,15 @@
10192

10293
if TYPE_CHECKING:
10394
import pyarrow as pa
95+
from mypy_boto3_glue.client import GlueClient
96+
from mypy_boto3_glue.type_defs import (
97+
ColumnTypeDef,
98+
DatabaseInputTypeDef,
99+
DatabaseTypeDef,
100+
StorageDescriptorTypeDef,
101+
TableInputTypeDef,
102+
TableTypeDef,
103+
)
104104

105105

106106
# There is a unique Glue metastore in each AWS account and each AWS region. By default, GlueCatalog chooses the Glue
@@ -140,7 +140,7 @@
140140

141141

142142
def _construct_parameters(
143-
metadata_location: str, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None
143+
metadata_location: str, glue_table: Optional["TableTypeDef"] = None, prev_metadata_location: Optional[str] = None
144144
) -> Properties:
145145
new_parameters = glue_table.get("Parameters", {}) if glue_table else {}
146146
new_parameters.update({TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location})
@@ -190,15 +190,15 @@ def primitive(self, primitive: PrimitiveType) -> str:
190190
return GLUE_PRIMITIVE_TYPES[primitive_type]
191191

192192

193-
def _to_columns(metadata: TableMetadata) -> List[ColumnTypeDef]:
194-
results: Dict[str, ColumnTypeDef] = {}
193+
def _to_columns(metadata: TableMetadata) -> List["ColumnTypeDef"]:
194+
results: Dict[str, "ColumnTypeDef"] = {}
195195

196196
def _append_to_results(field: NestedField, is_current: bool) -> None:
197197
if field.name in results:
198198
return
199199

200200
results[field.name] = cast(
201-
ColumnTypeDef,
201+
"ColumnTypeDef",
202202
{
203203
"Name": field.name,
204204
"Type": visit(field.field_type, _IcebergSchemaToGlueType()),
@@ -230,10 +230,10 @@ def _construct_table_input(
230230
metadata_location: str,
231231
properties: Properties,
232232
metadata: TableMetadata,
233-
glue_table: Optional[TableTypeDef] = None,
233+
glue_table: Optional["TableTypeDef"] = None,
234234
prev_metadata_location: Optional[str] = None,
235-
) -> TableInputTypeDef:
236-
table_input: TableInputTypeDef = {
235+
) -> "TableInputTypeDef":
236+
table_input: "TableInputTypeDef" = {
237237
"Name": table_name,
238238
"TableType": EXTERNAL_TABLE,
239239
"Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location),
@@ -249,10 +249,12 @@ def _construct_table_input(
249249
return table_input
250250

251251

252-
def _construct_rename_table_input(to_table_name: str, glue_table: TableTypeDef) -> TableInputTypeDef:
253-
rename_table_input: TableInputTypeDef = {"Name": to_table_name}
252+
def _construct_rename_table_input(to_table_name: str, glue_table: "TableTypeDef") -> "TableInputTypeDef":
253+
rename_table_input: "TableInputTypeDef" = {"Name": to_table_name}
254254
# use the same Glue info to create the new table, pointing to the old metadata
255-
assert glue_table["TableType"]
255+
if not glue_table["TableType"]:
256+
raise ValueError("Glue table type is missing, cannot rename table")
257+
256258
rename_table_input["TableType"] = glue_table["TableType"]
257259
if "Owner" in glue_table:
258260
rename_table_input["Owner"] = glue_table["Owner"]
@@ -264,16 +266,16 @@ def _construct_rename_table_input(to_table_name: str, glue_table: TableTypeDef)
264266
# It turns out the output of StorageDescriptor is not the same as the input type
265267
# because the Column can have a different type, but for now it seems to work, so
266268
# silence the type error.
267-
rename_table_input["StorageDescriptor"] = cast(StorageDescriptorTypeDef, glue_table["StorageDescriptor"])
269+
rename_table_input["StorageDescriptor"] = cast("StorageDescriptorTypeDef", glue_table["StorageDescriptor"])
268270

269271
if "Description" in glue_table:
270272
rename_table_input["Description"] = glue_table["Description"]
271273

272274
return rename_table_input
273275

274276

275-
def _construct_database_input(database_name: str, properties: Properties) -> DatabaseInputTypeDef:
276-
database_input: DatabaseInputTypeDef = {"Name": database_name}
277+
def _construct_database_input(database_name: str, properties: Properties) -> "DatabaseInputTypeDef":
278+
database_input: "DatabaseInputTypeDef" = {"Name": database_name}
277279
parameters = {}
278280
for k, v in properties.items():
279281
if k == "Description":
@@ -286,7 +288,7 @@ def _construct_database_input(database_name: str, properties: Properties) -> Dat
286288
return database_input
287289

288290

289-
def _register_glue_catalog_id_with_glue_client(glue: GlueClient, glue_catalog_id: str) -> None:
291+
def _register_glue_catalog_id_with_glue_client(glue: "GlueClient", glue_catalog_id: str) -> None:
290292
"""
291293
Register the Glue Catalog ID (AWS Account ID) as a parameter on all Glue client methods.
292294
@@ -303,9 +305,9 @@ def add_glue_catalog_id(params: Dict[str, str], **kwargs: Any) -> None:
303305

304306

305307
class GlueCatalog(MetastoreCatalog):
306-
glue: GlueClient
308+
glue: "GlueClient"
307309

308-
def __init__(self, name: str, client: Optional[GlueClient] = None, **properties: Any):
310+
def __init__(self, name: str, client: Optional["GlueClient"] = None, **properties: Any):
309311
"""Glue Catalog.
310312
311313
You either need to provide a boto3 glue client, or one will be constructed from the properties.
@@ -317,7 +319,7 @@ def __init__(self, name: str, client: Optional[GlueClient] = None, **properties:
317319
"""
318320
super().__init__(name, **properties)
319321

320-
if client:
322+
if client is not None:
321323
self.glue = client
322324
else:
323325
retry_mode_prop_value = get_first_property_value(properties, GLUE_RETRY_MODE)
@@ -344,12 +346,17 @@ def __init__(self, name: str, client: Optional[GlueClient] = None, **properties:
344346
if glue_catalog_id := properties.get(GLUE_ID):
345347
_register_glue_catalog_id_with_glue_client(self.glue, glue_catalog_id)
346348

347-
def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
349+
def _convert_glue_to_iceberg(self, glue_table: "TableTypeDef") -> Table:
348350
properties: Properties = glue_table["Parameters"]
349351

350-
assert glue_table["DatabaseName"]
351-
assert glue_table["Parameters"]
352-
database_name = glue_table["DatabaseName"]
352+
database_name = glue_table.get("DatabaseName", None)
353+
if database_name is None:
354+
raise ValueError("Glue table is missing DatabaseName property")
355+
356+
parameters = glue_table.get("Parameters", None)
357+
if parameters is None:
358+
raise ValueError("Glue table is missing Parameters property")
359+
353360
table_name = glue_table["Name"]
354361

355362
if TABLE_TYPE not in properties:
@@ -380,15 +387,15 @@ def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
380387
catalog=self,
381388
)
382389

383-
def _create_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef) -> None:
390+
def _create_glue_table(self, database_name: str, table_name: str, table_input: "TableInputTypeDef") -> None:
384391
try:
385392
self.glue.create_table(DatabaseName=database_name, TableInput=table_input)
386393
except self.glue.exceptions.AlreadyExistsException as e:
387394
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
388395
except self.glue.exceptions.EntityNotFoundException as e:
389396
raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e
390397

391-
def _update_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef, version_id: str) -> None:
398+
def _update_glue_table(self, database_name: str, table_name: str, table_input: "TableInputTypeDef", version_id: str) -> None:
392399
try:
393400
self.glue.update_table(
394401
DatabaseName=database_name,
@@ -403,7 +410,7 @@ def _update_glue_table(self, database_name: str, table_name: str, table_input: T
403410
f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update to table version {version_id}"
404411
) from e
405412

406-
def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef:
413+
def _get_glue_table(self, database_name: str, table_name: str) -> "TableTypeDef":
407414
try:
408415
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
409416
return load_table_response["Table"]
@@ -496,7 +503,7 @@ def commit_table(
496503
table_identifier = table.name()
497504
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
498505

499-
current_glue_table: Optional[TableTypeDef]
506+
current_glue_table: Optional["TableTypeDef"]
500507
glue_table_version_id: Optional[str]
501508
current_table: Optional[Table]
502509
try:
@@ -680,13 +687,19 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
680687
"""
681688
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
682689
try:
683-
table_list = self.list_tables(namespace=database_name)
684-
except NoSuchNamespaceError as e:
690+
table_list_response = self.glue.get_tables(DatabaseName=database_name)
691+
table_list = table_list_response["TableList"]
692+
except self.glue.exceptions.EntityNotFoundException as e:
685693
raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e
686694

687695
if len(table_list) > 0:
688-
raise NamespaceNotEmptyError(f"Database {database_name} is not empty")
689-
696+
first_table = table_list[0]
697+
if self.__is_iceberg_table(first_table):
698+
raise NamespaceNotEmptyError(f"Cannot drop namespace {database_name} because it still contains Iceberg tables")
699+
else:
700+
raise NamespaceNotEmptyError(
701+
f"Cannot drop namespace {database_name} because it still contains non-Iceberg tables"
702+
)
690703
self.glue.delete_database(Name=database_name)
691704

692705
def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
@@ -702,7 +715,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
702715
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
703716
"""
704717
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
705-
table_list: List[TableTypeDef] = []
718+
table_list: List["TableTypeDef"] = []
706719
next_token: Optional[str] = None
707720
try:
708721
while True:
@@ -730,7 +743,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi
730743
if namespace:
731744
return []
732745

733-
database_list: List[DatabaseTypeDef] = []
746+
database_list: List["DatabaseTypeDef"] = []
734747
next_token: Optional[str] = None
735748

736749
while True:
@@ -806,5 +819,5 @@ def view_exists(self, identifier: Union[str, Identifier]) -> bool:
806819
raise NotImplementedError
807820

808821
@staticmethod
809-
def __is_iceberg_table(table: TableTypeDef) -> bool:
822+
def __is_iceberg_table(table: "TableTypeDef") -> bool:
810823
return table.get("Parameters", {}).get(TABLE_TYPE, "").lower() == ICEBERG

pyiceberg/catalog/rest/__init__.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
from pyiceberg import __version__
3434
from pyiceberg.catalog import (
35+
BOTOCORE_SESSION,
3536
TOKEN,
3637
URI,
3738
WAREHOUSE_LOCATION,
@@ -53,6 +54,7 @@
5354
TableAlreadyExistsError,
5455
UnauthorizedError,
5556
)
57+
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
5658
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
5759
from pyiceberg.schema import Schema, assign_fresh_schema_ids
5860
from pyiceberg.table import (
@@ -72,7 +74,7 @@
7274
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties
7375
from pyiceberg.types import transform_dict_value_to_str
7476
from pyiceberg.utils.deprecated import deprecation_message
75-
from pyiceberg.utils.properties import get_header_properties, property_as_bool
77+
from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool
7678

7779
if TYPE_CHECKING:
7880
import pyarrow as pa
@@ -390,11 +392,17 @@ class SigV4Adapter(HTTPAdapter):
390392
def __init__(self, **properties: str):
391393
super().__init__()
392394
self._properties = properties
395+
self._boto_session = boto3.Session(
396+
region_name=get_first_property_value(self._properties, AWS_REGION),
397+
botocore_session=self._properties.get(BOTOCORE_SESSION),
398+
aws_access_key_id=get_first_property_value(self._properties, AWS_ACCESS_KEY_ID),
399+
aws_secret_access_key=get_first_property_value(self._properties, AWS_SECRET_ACCESS_KEY),
400+
aws_session_token=get_first_property_value(self._properties, AWS_SESSION_TOKEN),
401+
)
393402

394403
def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylint: disable=W0613
395-
boto_session = boto3.Session()
396-
credentials = boto_session.get_credentials().get_frozen_credentials()
397-
region = self._properties.get(SIGV4_REGION, boto_session.region_name)
404+
credentials = self._boto_session.get_credentials().get_frozen_credentials()
405+
region = self._properties.get(SIGV4_REGION, self._boto_session.region_name)
398406
service = self._properties.get(SIGV4_SERVICE, "execute-api")
399407

400408
url = str(request.url).split("?")[0]

0 commit comments

Comments
 (0)