Skip to content

Commit 6710786

Browse files
author
Somasundaram Sekar
committed
feat: Add snapshot_properties support to upsert operation
Add snapshot_properties parameter to both Transaction.upsert() and Table.upsert() methods, allowing custom properties to be added to the snapshot summary during upsert operations. The snapshot_properties are passed to both the underlying overwrite() and append() operations, so they are applied to all snapshots created by the upsert. Closes #2659
1 parent e07296e commit 6710786

File tree

2 files changed

+58
-1
lines changed

2 files changed

+58
-1
lines changed

pyiceberg/table/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -734,6 +734,7 @@ def upsert(
734734
when_not_matched_insert_all: bool = True,
735735
case_sensitive: bool = True,
736736
branch: str | None = MAIN_BRANCH,
737+
snapshot_properties: dict[str, str] = EMPTY_DICT,
737738
) -> UpsertResult:
738739
"""Shorthand API for performing an upsert to an iceberg table.
739740
@@ -745,6 +746,7 @@ def upsert(
745746
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
746747
case_sensitive: Bool indicating if the match should be case-sensitive
747748
branch: Branch Reference to run the upsert operation
749+
snapshot_properties: Custom properties to be added to the snapshot summary
748750
749751
To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids
750752
@@ -861,12 +863,13 @@ def upsert(
861863
rows_to_update,
862864
overwrite_filter=Or(*overwrite_predicates) if len(overwrite_predicates) > 1 else overwrite_predicates[0],
863865
branch=branch,
866+
snapshot_properties=snapshot_properties,
864867
)
865868

866869
if when_not_matched_insert_all:
867870
insert_row_cnt = len(rows_to_insert)
868871
if rows_to_insert:
869-
self.append(rows_to_insert, branch=branch)
872+
self.append(rows_to_insert, branch=branch, snapshot_properties=snapshot_properties)
870873

871874
return UpsertResult(rows_updated=update_row_cnt, rows_inserted=insert_row_cnt)
872875

@@ -1327,6 +1330,7 @@ def upsert(
13271330
when_not_matched_insert_all: bool = True,
13281331
case_sensitive: bool = True,
13291332
branch: str | None = MAIN_BRANCH,
1333+
snapshot_properties: dict[str, str] = EMPTY_DICT,
13301334
) -> UpsertResult:
13311335
"""Shorthand API for performing an upsert to an iceberg table.
13321336
@@ -1338,6 +1342,7 @@ def upsert(
13381342
when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
13391343
case_sensitive: Bool indicating if the match should be case-sensitive
13401344
branch: Branch Reference to run the upsert operation
1345+
snapshot_properties: Custom properties to be added to the snapshot summary
13411346
13421347
To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids
13431348
@@ -1371,6 +1376,7 @@ def upsert(
13711376
when_not_matched_insert_all=when_not_matched_insert_all,
13721377
case_sensitive=case_sensitive,
13731378
branch=branch,
1379+
snapshot_properties=snapshot_properties,
13741380
)
13751381

13761382
def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH) -> None:

tests/table/test_upsert.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,3 +834,54 @@ def test_stage_only_upsert(catalog: Catalog) -> None:
834834
assert operations == ["append", "append", "append"]
835835
# both subsequent parent id should be the first snapshot id
836836
assert parent_snapshot_id == [None, current_snapshot, current_snapshot]
837+
838+
839+
def test_upsert_snapshot_properties(catalog: Catalog) -> None:
840+
"""Test that snapshot_properties are applied to snapshots created by upsert."""
841+
identifier = "default.test_upsert_snapshot_properties"
842+
_drop_table(catalog, identifier)
843+
844+
schema = Schema(
845+
NestedField(1, "city", StringType(), required=True),
846+
NestedField(2, "population", IntegerType(), required=True),
847+
identifier_field_ids=[1],
848+
)
849+
850+
tbl = catalog.create_table(identifier, schema=schema)
851+
arrow_schema = pa.schema(
852+
[
853+
pa.field("city", pa.string(), nullable=False),
854+
pa.field("population", pa.int32(), nullable=False),
855+
]
856+
)
857+
858+
# Initial data
859+
df = pa.Table.from_pylist(
860+
[{"city": "Amsterdam", "population": 921402}],
861+
schema=arrow_schema,
862+
)
863+
tbl.append(df)
864+
initial_snapshot_count = len(list(tbl.snapshots()))
865+
866+
# Upsert with snapshot_properties (both update and insert)
867+
df = pa.Table.from_pylist(
868+
[
869+
{"city": "Amsterdam", "population": 950000}, # Update
870+
{"city": "Berlin", "population": 3432000}, # Insert
871+
],
872+
schema=arrow_schema,
873+
)
874+
result = tbl.upsert(df, snapshot_properties={"test_prop": "test_value"})
875+
876+
assert result.rows_updated == 1
877+
assert result.rows_inserted == 1
878+
879+
# Verify properties are on the snapshots created by upsert
880+
snapshots = list(tbl.snapshots())
881+
# Upsert should have created additional snapshots (overwrite + append)
882+
assert len(snapshots) > initial_snapshot_count
883+
884+
# Check that all new snapshots have the snapshot_properties
885+
for snapshot in snapshots[initial_snapshot_count:]:
886+
assert snapshot.summary is not None
887+
assert snapshot.summary.additional_properties.get("test_prop") == "test_value"

0 commit comments

Comments
 (0)