Skip to content

Commit e45eb63

Browse files
Support replace_table_transaction for RTAS workflows
1 parent d339391 commit e45eb63

12 files changed

Lines changed: 1314 additions & 9 deletions

File tree

mkdocs/docs/api.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,21 @@ with catalog.create_table_transaction(identifier="docs_example.bids", schema=sch
185185
txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
186186
```
187187

188+
## Replace a table
189+
190+
Atomically replace an existing table's schema, partition spec, sort order, location, and properties via `replace_table_transaction`. The table UUID and history (snapshots, schemas, specs, sort orders, metadata log) are preserved; the current snapshot is cleared (the `main` branch ref is removed). Open the transaction with the new definition, stage any additional changes (writes, property updates, schema evolution), and commit — for example, an RTAS (replace-table-as-select) that swaps the schema and writes the new data atomically:
191+
192+
```python
193+
with catalog.replace_table_transaction(identifier="docs_example.bids", schema=df.schema) as txn:
194+
txn.append(df)
195+
```
196+
197+
Field IDs are reused by name from the previous schema; new columns get fresh IDs above `last-column-id`.
198+
199+
Table properties are *merged* on replace: properties you don't pass are preserved on the table. To remove a property, drop it explicitly within the transaction.
200+
201+
Pass `format-version` in `properties` to upgrade the table's format version as part of the replace.
202+
188203
## Register a table
189204

190205
To register a table using existing metadata:

pyiceberg/catalog/__init__.py

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,20 +42,25 @@
4242
)
4343
from pyiceberg.io import FileIO, load_file_io
4444
from pyiceberg.manifest import ManifestFile
45-
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
46-
from pyiceberg.schema import Schema
45+
from pyiceberg.partitioning import (
46+
UNPARTITIONED_PARTITION_SPEC,
47+
PartitionSpec,
48+
assign_fresh_partition_spec_ids_for_replace,
49+
)
50+
from pyiceberg.schema import Schema, assign_fresh_schema_ids_for_replace
4751
from pyiceberg.serializers import ToOutputFile
4852
from pyiceberg.table import (
4953
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
5054
CommitTableResponse,
5155
CreateTableTransaction,
56+
ReplaceTableTransaction,
5257
StagedTable,
5358
Table,
5459
TableProperties,
5560
)
5661
from pyiceberg.table.locations import load_location_provider
5762
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
58-
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
63+
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
5964
from pyiceberg.table.update import (
6065
TableRequirement,
6166
TableUpdate,
@@ -444,6 +449,90 @@ def create_table_if_not_exists(
444449
except TableAlreadyExistsError:
445450
return self.load_table(identifier)
446451

452+
def replace_table_transaction(
453+
self,
454+
identifier: str | Identifier,
455+
schema: Schema | pa.Schema,
456+
location: str | None = None,
457+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
458+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
459+
properties: Properties = EMPTY_DICT,
460+
) -> ReplaceTableTransaction:
461+
"""Create a ReplaceTableTransaction.
462+
463+
The transaction can be used to stage additional changes (schema evolution,
464+
partition evolution, etc.) before committing.
465+
466+
Args:
467+
identifier (str | Identifier): Table identifier.
468+
schema (Schema): New table schema.
469+
location (str | None): New table location. Defaults to the existing location.
470+
partition_spec (PartitionSpec): New partition spec.
471+
sort_order (SortOrder): New sort order.
472+
properties (Properties): Properties to apply. Merged on top of the existing
473+
table properties: keys present here override existing values; existing keys
474+
not present here are preserved. To remove a property, follow up with a
475+
transaction that removes it explicitly.
476+
477+
Returns:
478+
ReplaceTableTransaction: A transaction for the replace operation.
479+
480+
Raises:
481+
NoSuchTableError: If the table does not exist.
482+
"""
483+
existing_table = self.load_table(identifier)
484+
existing_metadata = existing_table.metadata
485+
486+
raw_format_version = properties.get(TableProperties.FORMAT_VERSION)
487+
if raw_format_version is not None:
488+
try:
489+
requested_format_version = int(raw_format_version)
490+
except (TypeError, ValueError) as exc:
491+
raise ValueError(f"Invalid format-version property: {raw_format_version!r}") from exc
492+
if requested_format_version < existing_metadata.format_version:
493+
raise ValueError(
494+
f"Cannot downgrade format-version from {existing_metadata.format_version} to {requested_format_version}"
495+
)
496+
resolved_format_version = requested_format_version
497+
else:
498+
resolved_format_version = existing_metadata.format_version
499+
iceberg_schema = self._convert_schema_if_needed(schema, cast(TableVersion, resolved_format_version))
500+
iceberg_schema.check_format_version_compatibility(cast(TableVersion, resolved_format_version))
501+
502+
fresh_schema, _ = assign_fresh_schema_ids_for_replace(
503+
iceberg_schema, existing_metadata.schema(), existing_metadata.last_column_id
504+
)
505+
fresh_partition_spec, _ = assign_fresh_partition_spec_ids_for_replace(
506+
partition_spec,
507+
iceberg_schema,
508+
fresh_schema,
509+
existing_metadata.partition_specs,
510+
existing_metadata.last_partition_id,
511+
format_version=existing_metadata.format_version,
512+
current_spec=existing_metadata.spec(),
513+
)
514+
fresh_sort_order = assign_fresh_sort_order_ids(sort_order, iceberg_schema, fresh_schema)
515+
516+
resolved_location = location.rstrip("/") if location else existing_metadata.location
517+
if not resolved_location:
518+
raise ValueError("Resolved table location must not be empty")
519+
520+
staged_table = StagedTable(
521+
identifier=existing_table.name(),
522+
metadata=existing_metadata,
523+
metadata_location=existing_table.metadata_location,
524+
io=existing_table.io,
525+
catalog=self,
526+
)
527+
return ReplaceTableTransaction(
528+
table=staged_table,
529+
new_schema=fresh_schema,
530+
new_spec=fresh_partition_spec,
531+
new_sort_order=fresh_sort_order,
532+
new_location=resolved_location,
533+
new_properties=properties,
534+
)
535+
447536
@abstractmethod
448537
def load_table(self, identifier: str | Identifier) -> Table:
449538
"""Load the table's metadata and returns the table instance.

pyiceberg/catalog/noop.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from pyiceberg.table import (
2929
CommitTableResponse,
3030
CreateTableTransaction,
31+
ReplaceTableTransaction,
3132
Table,
3233
)
3334
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -68,6 +69,18 @@ def create_table_transaction(
6869
) -> CreateTableTransaction:
6970
raise NotImplementedError
7071

72+
@override
73+
def replace_table_transaction(
74+
self,
75+
identifier: str | Identifier,
76+
schema: Schema | pa.Schema,
77+
location: str | None = None,
78+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
79+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
80+
properties: Properties = EMPTY_DICT,
81+
) -> ReplaceTableTransaction:
82+
raise NotImplementedError
83+
7184
@override
7285
def load_table(self, identifier: str | Identifier) -> Table:
7386
raise NotImplementedError

pyiceberg/catalog/rest/__init__.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,18 @@
6868
FileIO,
6969
load_file_io,
7070
)
71-
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
71+
from pyiceberg.partitioning import (
72+
UNPARTITIONED_PARTITION_SPEC,
73+
PartitionSpec,
74+
assign_fresh_partition_spec_ids,
75+
)
7276
from pyiceberg.schema import Schema, assign_fresh_schema_ids
7377
from pyiceberg.table import (
7478
CommitTableRequest,
7579
CommitTableResponse,
7680
CreateTableTransaction,
7781
FileScanTask,
82+
ReplaceTableTransaction,
7883
StagedTable,
7984
Table,
8085
TableIdentifier,
@@ -957,6 +962,19 @@ def create_table_transaction(
957962
staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response)
958963
return CreateTableTransaction(staged_table)
959964

965+
@override
966+
@retry(**_RETRY_ARGS)
967+
def replace_table_transaction(
968+
self,
969+
identifier: str | Identifier,
970+
schema: Schema | pa.Schema,
971+
location: str | None = None,
972+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
973+
sort_order: SortOrder = UNSORTED_SORT_ORDER,
974+
properties: Properties = EMPTY_DICT,
975+
) -> ReplaceTableTransaction:
976+
return super().replace_table_transaction(identifier, schema, location, partition_spec, sort_order, properties)
977+
960978
@override
961979
@retry(**_RETRY_ARGS)
962980
def create_view(

pyiceberg/partitioning.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,175 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre
335335
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)
336336

337337

338+
def assign_fresh_partition_spec_ids_for_replace(
339+
spec: PartitionSpec,
340+
old_schema: Schema,
341+
fresh_schema: Schema,
342+
existing_specs: list[PartitionSpec],
343+
last_partition_id: int | None,
344+
format_version: int = 2,
345+
current_spec: PartitionSpec | None = None,
346+
) -> tuple[PartitionSpec, int]:
347+
"""Assign partition field IDs for a replace operation, reusing IDs from existing specs.
348+
349+
- For v2+, reuse partition field IDs by `(source_id, transform)` across all existing specs.
350+
New fields get IDs starting from `last_partition_id + 1`.
351+
- For v1, the current spec's fields must be preserved (v1 specs are append-only). Fields
352+
absent from the new spec are carried forward with a `VoidTransform`. Matching new fields
353+
reuse the existing partition field ID; remaining new fields are appended with fresh IDs.
354+
355+
Args:
356+
spec: The new partition spec to assign IDs to. Its `source_id`s reference `old_schema`.
357+
old_schema: The schema that the new spec's `source_id`s reference.
358+
fresh_schema: The schema with freshly assigned field IDs.
359+
existing_specs: All partition specs from the existing table metadata.
360+
last_partition_id: The current table's `last_partition_id`.
361+
format_version: Table format version. Required to be set to 1 for v1 carry-forward.
362+
current_spec: The current default partition spec. Required when `format_version <= 1`.
363+
364+
Returns:
365+
A tuple of `(fresh_spec, new_last_partition_id)`.
366+
"""
367+
effective_last_partition_id = last_partition_id if last_partition_id is not None else PARTITION_FIELD_ID_START - 1
368+
369+
if format_version <= 1:
370+
if current_spec is None:
371+
raise ValueError("current_spec is required for v1 replace_table")
372+
return _assign_fresh_partition_spec_ids_for_replace_v1(
373+
spec, old_schema, fresh_schema, current_spec, effective_last_partition_id
374+
)
375+
376+
# v2+: reuse field IDs by (source_id, transform) across all specs. When the same
377+
# (source_id, transform) appears in multiple specs, prefer the highest field_id.
378+
transform_to_field_id: dict[tuple[int, str], int] = {}
379+
for existing_spec in existing_specs:
380+
for field in existing_spec.fields:
381+
key = (field.source_id, str(field.transform))
382+
if key not in transform_to_field_id or field.field_id > transform_to_field_id[key]:
383+
transform_to_field_id[key] = field.field_id
384+
385+
next_id = effective_last_partition_id
386+
partition_fields = []
387+
for field in spec.fields:
388+
original_column_name = old_schema.find_column_name(field.source_id)
389+
if original_column_name is None:
390+
raise ValueError(f"Could not find in old schema: {field}")
391+
fresh_field = fresh_schema.find_field(original_column_name)
392+
if fresh_field is None:
393+
raise ValueError(f"Could not find field in fresh schema: {original_column_name}")
394+
395+
validate_partition_name(field.name, field.transform, fresh_field.field_id, fresh_schema, set())
396+
397+
key = (fresh_field.field_id, str(field.transform))
398+
if key in transform_to_field_id:
399+
partition_field_id = transform_to_field_id[key]
400+
else:
401+
next_id += 1
402+
partition_field_id = next_id
403+
transform_to_field_id[key] = partition_field_id
404+
405+
partition_fields.append(
406+
PartitionField(
407+
name=field.name,
408+
source_id=fresh_field.field_id,
409+
field_id=partition_field_id,
410+
transform=field.transform,
411+
)
412+
)
413+
414+
# `next_id` starts at `effective_last_partition_id` and only increments, so it is the
415+
# new last partition id.
416+
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), next_id
417+
418+
419+
def _assign_fresh_partition_spec_ids_for_replace_v1(
420+
spec: PartitionSpec,
421+
old_schema: Schema,
422+
fresh_schema: Schema,
423+
current_spec: PartitionSpec,
424+
effective_last_partition_id: int,
425+
) -> tuple[PartitionSpec, int]:
426+
"""v1 branch of `assign_fresh_partition_spec_ids_for_replace`. See parent docstring."""
427+
# Build (fresh_source_id, transform) → (new_field, fresh_source_id) for the new spec,
428+
# in insertion order so leftover fields keep their declared order on append.
429+
new_field_by_key: dict[tuple[int, str], tuple[PartitionField, int]] = {}
430+
new_field_names: list[str] = []
431+
for new_field in spec.fields:
432+
col_name = old_schema.find_column_name(new_field.source_id)
433+
if col_name is None:
434+
raise ValueError(f"Could not find in old schema: {new_field}")
435+
fresh_field = fresh_schema.find_field(col_name)
436+
if fresh_field is None:
437+
raise ValueError(f"Could not find field in fresh schema: {col_name}")
438+
validate_partition_name(new_field.name, new_field.transform, fresh_field.field_id, fresh_schema, set())
439+
key = (fresh_field.field_id, str(new_field.transform))
440+
new_field_by_key[key] = (new_field, fresh_field.field_id)
441+
new_field_names.append(new_field.name)
442+
443+
# Walk current spec, carrying forward each field. Matching new fields consume their key;
444+
# missing fields become void transforms.
445+
used_names: set[str] = set(new_field_names)
446+
partition_fields = []
447+
for cur_field in current_spec.fields:
448+
key = (cur_field.source_id, str(cur_field.transform))
449+
match = new_field_by_key.pop(key, None)
450+
if match is not None:
451+
new_field, fresh_source_id = match
452+
partition_fields.append(
453+
PartitionField(
454+
name=new_field.name,
455+
source_id=fresh_source_id,
456+
field_id=cur_field.field_id,
457+
transform=new_field.transform,
458+
)
459+
)
460+
used_names.add(new_field.name)
461+
else:
462+
void_name = _unique_void_name(cur_field.name, cur_field.field_id, used_names)
463+
used_names.add(void_name)
464+
partition_fields.append(
465+
PartitionField(
466+
name=void_name,
467+
source_id=cur_field.source_id,
468+
field_id=cur_field.field_id,
469+
transform=VoidTransform(),
470+
)
471+
)
472+
473+
# Append remaining new fields at the end with fresh partition IDs.
474+
next_id = effective_last_partition_id
475+
for new_field, fresh_source_id in new_field_by_key.values():
476+
next_id += 1
477+
partition_fields.append(
478+
PartitionField(
479+
name=new_field.name,
480+
source_id=fresh_source_id,
481+
field_id=next_id,
482+
transform=new_field.transform,
483+
)
484+
)
485+
486+
# `next_id` starts at `effective_last_partition_id` and only increments, so it is the
487+
# new last partition id.
488+
return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID), next_id
489+
490+
491+
def _unique_void_name(base_name: str, field_id: int, used_names: set[str]) -> str:
492+
"""Pick a void-transform name that does not collide with already-used names.
493+
494+
First tries `base_name`; if taken, tries `base_name_{field_id}`; if still taken,
495+
appends `_2`, `_3`, ... until unique.
496+
"""
497+
if base_name not in used_names:
498+
return base_name
499+
candidate = f"{base_name}_{field_id}"
500+
suffix = 2
501+
while candidate in used_names:
502+
candidate = f"{base_name}_{field_id}_{suffix}"
503+
suffix += 1
504+
return candidate
505+
506+
338507
T = TypeVar("T")
339508

340509

0 commit comments

Comments
 (0)