Skip to content

Commit 9fcb77d

Browse files
authored
Merge branch 'apache:main' into iceberg-mirror-sa1
2 parents acf4dfe + 7d5c58d commit 9fcb77d

File tree

14 files changed

+642
-366
lines changed

14 files changed

+642
-366
lines changed

poetry.lock

Lines changed: 279 additions & 291 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/catalog/rest/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ def _create_session(self) -> Session:
238238
"""Create a request session with provided catalog configuration."""
239239
session = Session()
240240

241+
# Set HTTP headers
242+
self._config_headers(session)
243+
241244
# Sets the client side and server side SSL cert verification, if provided as properties.
242245
if ssl_config := self.properties.get(SSL):
243246
if ssl_ca_bundle := ssl_config.get(CA_BUNDLE):
@@ -265,9 +268,6 @@ def _create_session(self) -> Session:
265268
else:
266269
session.auth = AuthManagerAdapter(self._create_legacy_oauth2_auth_manager(session))
267270

268-
# Set HTTP headers
269-
self._config_headers(session)
270-
271271
# Configure SigV4 Request Signing
272272
if property_as_bool(self.properties, SIGV4, False):
273273
self._init_sigv4(session)

pyiceberg/expressions/__init__.py

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
)
3434
from typing import Literal as TypingLiteral
3535

36-
from pydantic import Field
36+
from pydantic import ConfigDict, Field
3737

3838
from pyiceberg.expressions.literals import (
3939
AboveMax,
@@ -302,12 +302,19 @@ def __getnewargs__(self) -> Tuple[BooleanExpression, BooleanExpression]:
302302
return (self.left, self.right)
303303

304304

305-
class Or(BooleanExpression):
305+
class Or(IcebergBaseModel, BooleanExpression):
306306
"""OR operation expression - logical disjunction."""
307307

308+
model_config = ConfigDict(arbitrary_types_allowed=True)
309+
310+
type: TypingLiteral["or"] = Field(default="or", alias="type")
308311
left: BooleanExpression
309312
right: BooleanExpression
310313

314+
def __init__(self, left: BooleanExpression, right: BooleanExpression, *rest: BooleanExpression) -> None:
315+
if isinstance(self, Or) and not hasattr(self, "left") and not hasattr(self, "right"):
316+
super().__init__(left=left, right=right)
317+
311318
def __new__(cls, left: BooleanExpression, right: BooleanExpression, *rest: BooleanExpression) -> BooleanExpression: # type: ignore
312319
if rest:
313320
return _build_balanced_tree(Or, (left, right, *rest))
@@ -319,10 +326,12 @@ def __new__(cls, left: BooleanExpression, right: BooleanExpression, *rest: Boole
319326
return left
320327
else:
321328
obj = super().__new__(cls)
322-
obj.left = left
323-
obj.right = right
324329
return obj
325330

331+
def __str__(self) -> str:
332+
"""Return the string representation of the Or class."""
333+
return f"{str(self.__class__.__name__)}(left={repr(self.left)}, right={repr(self.right)})"
334+
326335
def __eq__(self, other: Any) -> bool:
327336
"""Return the equality of two instances of the Or class."""
328337
return self.left == other.left and self.right == other.right if isinstance(other, Or) else False
@@ -341,22 +350,31 @@ def __getnewargs__(self) -> Tuple[BooleanExpression, BooleanExpression]:
341350
return (self.left, self.right)
342351

343352

344-
class Not(BooleanExpression):
353+
class Not(IcebergBaseModel, BooleanExpression):
345354
"""NOT operation expression - logical negation."""
346355

347-
child: BooleanExpression
356+
model_config = ConfigDict(arbitrary_types_allowed=True)
357+
358+
type: TypingLiteral["not"] = Field(default="not")
359+
child: BooleanExpression = Field()
360+
361+
def __init__(self, child: BooleanExpression, **_: Any) -> None:
362+
super().__init__(child=child)
348363

349-
def __new__(cls, child: BooleanExpression) -> BooleanExpression: # type: ignore
364+
def __new__(cls, child: BooleanExpression, **_: Any) -> BooleanExpression: # type: ignore
350365
if child is AlwaysTrue():
351366
return AlwaysFalse()
352367
elif child is AlwaysFalse():
353368
return AlwaysTrue()
354369
elif isinstance(child, Not):
355370
return child.child
356371
obj = super().__new__(cls)
357-
obj.child = child
358372
return obj
359373

374+
def __str__(self) -> str:
375+
"""Return the string representation of the Not class."""
376+
return f"Not(child={self.child})"
377+
360378
def __repr__(self) -> str:
361379
"""Return the string representation of the Not class."""
362380
return f"Not(child={repr(self.child)})"
@@ -373,8 +391,6 @@ def __getnewargs__(self) -> Tuple[BooleanExpression]:
373391
"""Pickle the Not class."""
374392
return (self.child,)
375393

376-
"""TRUE expression."""
377-
378394

379395
class AlwaysTrue(BooleanExpression, Singleton, IcebergRootModel[str]):
380396
"""TRUE expression."""
@@ -447,7 +463,20 @@ def bind(self, schema: Schema, case_sensitive: bool = True) -> BooleanExpression
447463
def as_bound(self) -> Type[BoundPredicate[L]]: ...
448464

449465

450-
class UnaryPredicate(UnboundPredicate[Any], ABC):
466+
class UnaryPredicate(IcebergBaseModel, UnboundPredicate[Any], ABC):
467+
type: str
468+
469+
model_config = {"arbitrary_types_allowed": True}
470+
471+
def __init__(self, term: Union[str, UnboundTerm[Any]]):
472+
unbound = _to_unbound_term(term)
473+
super().__init__(term=unbound)
474+
475+
def __str__(self) -> str:
476+
"""Return the string representation of the UnaryPredicate class."""
477+
# Sort to make it deterministic
478+
return f"{str(self.__class__.__name__)}(term={str(self.term)})"
479+
451480
def bind(self, schema: Schema, case_sensitive: bool = True) -> BoundUnaryPredicate[Any]:
452481
bound_term = self.term.bind(schema, case_sensitive)
453482
return self.as_bound(bound_term)
@@ -506,6 +535,8 @@ def as_unbound(self) -> Type[NotNull]:
506535

507536

508537
class IsNull(UnaryPredicate):
538+
type: str = "is-null"
539+
509540
def __invert__(self) -> NotNull:
510541
"""Transform the Expression into its negated version."""
511542
return NotNull(self.term)
@@ -516,6 +547,8 @@ def as_bound(self) -> Type[BoundIsNull[L]]:
516547

517548

518549
class NotNull(UnaryPredicate):
550+
type: str = "not-null"
551+
519552
def __invert__(self) -> IsNull:
520553
"""Transform the Expression into its negated version."""
521554
return IsNull(self.term)
@@ -558,6 +591,8 @@ def as_unbound(self) -> Type[NotNaN]:
558591

559592

560593
class IsNaN(UnaryPredicate):
594+
type: str = "is-nan"
595+
561596
def __invert__(self) -> NotNaN:
562597
"""Transform the Expression into its negated version."""
563598
return NotNaN(self.term)
@@ -568,6 +603,8 @@ def as_bound(self) -> Type[BoundIsNaN[L]]:
568603

569604

570605
class NotNaN(UnaryPredicate):
606+
type: str = "not-nan"
607+
571608
def __invert__(self) -> IsNaN:
572609
"""Transform the Expression into its negated version."""
573610
return IsNaN(self.term)

pyiceberg/table/snapshots.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,12 @@ class Snapshot(IcebergBaseModel):
244244
manifest_list: str = Field(alias="manifest-list", description="Location of the snapshot's manifest list file")
245245
summary: Optional[Summary] = Field(default=None)
246246
schema_id: Optional[int] = Field(alias="schema-id", default=None)
247+
first_row_id: Optional[int] = Field(
248+
alias="first-row-id", default=None, description="assigned to the first row in the first data file in the first manifest"
249+
)
250+
added_rows: Optional[int] = Field(
251+
alias="added-rows", default=None, description="The upper bound of the number of rows with assigned row IDs"
252+
)
247253

248254
def __str__(self) -> str:
249255
"""Return the string representation of the Snapshot class."""
@@ -253,6 +259,22 @@ def __str__(self) -> str:
253259
result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}"
254260
return result_str
255261

262+
def __repr__(self) -> str:
263+
"""Return the string representation of the Snapshot class."""
264+
fields = [
265+
f"snapshot_id={self.snapshot_id}",
266+
f"parent_snapshot_id={self.parent_snapshot_id}",
267+
f"sequence_number={self.sequence_number}",
268+
f"timestamp_ms={self.timestamp_ms}",
269+
f"manifest_list='{self.manifest_list}'",
270+
f"summary={repr(self.summary)}" if self.summary else None,
271+
f"schema_id={self.schema_id}" if self.schema_id is not None else None,
272+
f"first_row_id={self.first_row_id}" if self.first_row_id is not None else None,
273+
f"added_rows={self.added_rows}" if self.added_rows is not None else None,
274+
]
275+
filtered_fields = [field for field in fields if field is not None]
276+
return f"Snapshot({', '.join(filtered_fields)})"
277+
256278
def manifests(self, io: FileIO) -> List[ManifestFile]:
257279
"""Return the manifests for the given snapshot."""
258280
return list(_manifests(io, self.manifest_list))

pyiceberg/table/update/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,13 +437,29 @@ def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMe
437437
f"Cannot add snapshot with sequence number {update.snapshot.sequence_number} "
438438
f"older than last sequence number {base_metadata.last_sequence_number}"
439439
)
440+
elif base_metadata.format_version >= 3 and update.snapshot.first_row_id is None:
441+
raise ValueError("Cannot add snapshot without first row id")
442+
elif (
443+
base_metadata.format_version >= 3
444+
and update.snapshot.first_row_id is not None
445+
and base_metadata.next_row_id is not None
446+
and update.snapshot.first_row_id < base_metadata.next_row_id
447+
):
448+
raise ValueError(
449+
f"Cannot add a snapshot with first row id smaller than the table's next-row-id {update.snapshot.first_row_id} < {base_metadata.next_row_id}"
450+
)
440451

441452
context.add_update(update)
442453
return base_metadata.model_copy(
443454
update={
444455
"last_updated_ms": update.snapshot.timestamp_ms,
445456
"last_sequence_number": update.snapshot.sequence_number,
446457
"snapshots": base_metadata.snapshots + [update.snapshot],
458+
"next_row_id": base_metadata.next_row_id + update.snapshot.added_rows
459+
if base_metadata.format_version >= 3
460+
and base_metadata.next_row_id is not None
461+
and update.snapshot.added_rows is not None
462+
else None,
447463
}
448464
)
449465

pyiceberg/table/update/snapshot.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,19 @@ def delete_data_file(self, data_file: DataFile) -> _SnapshotProducer[U]:
157157
self._deleted_data_files.add(data_file)
158158
return self
159159

160+
def _calculate_added_rows(self, manifests: List[ManifestFile]) -> int:
161+
"""Calculate the number of added rows from a list of manifest files."""
162+
added_rows = 0
163+
for manifest in manifests:
164+
if manifest.added_snapshot_id is None or manifest.added_snapshot_id == self._snapshot_id:
165+
if manifest.added_rows_count is None:
166+
raise ValueError(
167+
"Cannot determine number of added rows in snapshot because "
168+
f"the entry for manifest {manifest.manifest_path} is missing the field `added-rows-count`"
169+
)
170+
added_rows += manifest.added_rows_count
171+
return added_rows
172+
160173
@abstractmethod
161174
def _deleted_entries(self) -> List[ManifestEntry]: ...
162175

@@ -284,13 +297,19 @@ def _commit(self) -> UpdatesAndRequirements:
284297
) as writer:
285298
writer.add_manifests(new_manifests)
286299

300+
first_row_id: Optional[int] = None
301+
302+
if self._transaction.table_metadata.format_version >= 3:
303+
first_row_id = self._transaction.table_metadata.next_row_id
304+
287305
snapshot = Snapshot(
288306
snapshot_id=self._snapshot_id,
289307
parent_snapshot_id=self._parent_snapshot_id,
290308
manifest_list=manifest_list_file_path,
291309
sequence_number=next_sequence_number,
292310
summary=summary,
293311
schema_id=self._transaction.table_metadata.current_schema_id,
312+
first_row_id=first_row_id,
294313
)
295314

296315
add_snapshot_update = AddSnapshotUpdate(snapshot=snapshot)

pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ moto = { version = "^5.0.2", extras = ["server"] }
102102
typing-extensions = "4.15.0"
103103
pytest-mock = "3.15.1"
104104
pyspark = { version = "4.0.1", extras = ["connect"] }
105-
protobuf = "5.29.5" # match Spark Connect's gencode
106-
cython = "3.1.4"
105+
protobuf = "6.33.0" # match Spark Connect's gencode
106+
cython = "3.1.5"
107107
deptry = ">=0.14,<0.24"
108108
docutils = "!=0.21.post1" # https://github.com/python-poetry/poetry/issues/9248#issuecomment-2026240520
109109
mypy-boto3-glue = ">=1.28.18"
@@ -119,7 +119,7 @@ mkdocstrings-python = "1.18.2"
119119
mkdocs-literate-nav = "0.6.2"
120120
mkdocs-autorefs = "1.4.3"
121121
mkdocs-gen-files = "0.5.0"
122-
mkdocs-material = "9.6.21"
122+
mkdocs-material = "9.6.22"
123123
mkdocs-material-extensions = "1.3.1"
124124
mkdocs-section-index = "0.3.10"
125125

tests/catalog/test_rest.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1899,6 +1899,28 @@ def test_rest_catalog_with_google_credentials_path(
18991899
assert actual_headers["Authorization"] == expected_auth_header
19001900

19011901

1902+
@pytest.mark.filterwarnings(
1903+
"ignore:Deprecated in 0.8.0, will be removed in 1.0.0. Iceberg REST client is missing the OAuth2 server URI:DeprecationWarning"
1904+
)
1905+
def test_auth_header(rest_mock: Mocker) -> None:
1906+
mock_request = rest_mock.post(
1907+
f"{TEST_URI}v1/oauth/tokens",
1908+
json={
1909+
"access_token": TEST_TOKEN,
1910+
"token_type": "Bearer",
1911+
"expires_in": 86400,
1912+
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
1913+
"scope": "openid offline",
1914+
"refresh_token": "refresh_token",
1915+
},
1916+
status_code=200,
1917+
request_headers={**OAUTH_TEST_HEADERS, "Custom": "Value"},
1918+
)
1919+
1920+
RestCatalog("rest", uri=TEST_URI, credential=TEST_CREDENTIALS, audience="", resource="", **{"header.Custom": "Value"})
1921+
assert mock_request.last_request.text == "grant_type=client_credentials&client_id=client&client_secret=secret&scope=catalog"
1922+
1923+
19021924
class TestRestCatalogClose:
19031925
"""Tests RestCatalog close functionality"""
19041926

tests/conftest.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
from pyiceberg.schema import Accessor, Schema
7373
from pyiceberg.serializers import ToOutputFile
7474
from pyiceberg.table import FileScanTask, Table
75-
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2
75+
from pyiceberg.table.metadata import TableMetadataV1, TableMetadataV2, TableMetadataV3
7676
from pyiceberg.transforms import DayTransform, IdentityTransform
7777
from pyiceberg.types import (
7878
BinaryType,
@@ -920,6 +920,7 @@ def generate_snapshot(
920920
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
921921
"location": "s3://bucket/test/location",
922922
"last-sequence-number": 34,
923+
"next-row-id": 1,
923924
"last-updated-ms": 1602638573590,
924925
"last-column-id": 3,
925926
"current-schema-id": 1,
@@ -2489,6 +2490,18 @@ def table_v2(example_table_metadata_v2: Dict[str, Any]) -> Table:
24892490
)
24902491

24912492

2493+
@pytest.fixture
2494+
def table_v3(example_table_metadata_v3: Dict[str, Any]) -> Table:
2495+
table_metadata = TableMetadataV3(**example_table_metadata_v3)
2496+
return Table(
2497+
identifier=("database", "table"),
2498+
metadata=table_metadata,
2499+
metadata_location=f"{table_metadata.location}/uuid.metadata.json",
2500+
io=load_file_io(),
2501+
catalog=NoopCatalog("NoopCatalog"),
2502+
)
2503+
2504+
24922505
@pytest.fixture
24932506
def table_v2_orc(example_table_metadata_v2: Dict[str, Any]) -> Table:
24942507
import copy

0 commit comments

Comments
 (0)