Skip to content

Expose ref_name parameter for table scans #1765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,12 @@ The low level API `plan_files` methods returns a set of tasks that provide the f

In this case it is up to the engine itself to filter the file itself. Below, `to_arrow()` and `to_duckdb()` that already do this for you.

A scan can also run against any named branch or tag.

```python
table.scan(ref_name='v1')
```

### Apache Arrow

<!-- prettier-ignore-start -->
Expand Down
25 changes: 18 additions & 7 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,7 @@ def scan(
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
snapshot_id: Optional[int] = None,
ref_name: Optional[str] = None,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
) -> DataScan:
Expand All @@ -940,10 +941,13 @@ def scan(
A tuple of strings representing the column names
to return in the output dataframe.
case_sensitive:
If True column matching is case sensitive
If True column matching is case-sensitive.
snapshot_id:
Optional Snapshot ID to time travel to. If None,
scans the table as of the current snapshot ID.
ref_name:
Optional name of a branch or tag to read from.
If None, defaults to the main branch.
options:
Additional Table properties as a dictionary of
string key value pairs to use for this scan.
Expand All @@ -962,6 +966,7 @@ def scan(
selected_fields=selected_fields,
case_sensitive=case_sensitive,
snapshot_id=snapshot_id,
ref_name=ref_name,
options=options,
limit=limit,
)
Expand Down Expand Up @@ -1397,6 +1402,7 @@ def scan(
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
snapshot_id: Optional[int] = None,
ref_name: Optional[str] = None,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
) -> DataScan:
Expand Down Expand Up @@ -1429,6 +1435,7 @@ class TableScan(ABC):
selected_fields: Tuple[str, ...]
case_sensitive: bool
snapshot_id: Optional[int]
ref_name: Optional[str]
options: Properties
limit: Optional[int]

Expand All @@ -1440,6 +1447,7 @@ def __init__(
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
snapshot_id: Optional[int] = None,
ref_name: Optional[str] = None,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
):
Expand All @@ -1449,12 +1457,20 @@ def __init__(
self.selected_fields = selected_fields
self.case_sensitive = case_sensitive
self.snapshot_id = snapshot_id
self.ref_name = ref_name
self.options = options
self.limit = limit

def snapshot(self) -> Optional[Snapshot]:
if self.snapshot_id and self.ref_name is not None:
raise ValueError("Cannot specify both snapshot_id and ref_name.")
if self.snapshot_id:
return self.table_metadata.snapshot_by_id(self.snapshot_id)
if self.ref_name is not None:
if snapshot := self.table_metadata.snapshot_by_name(self.ref_name):
return snapshot
else:
raise ValueError(f"Cannot scan unknown ref={self.ref_name}")
return self.table_metadata.current_snapshot()

def projection(self) -> Schema:
Expand Down Expand Up @@ -1494,12 +1510,7 @@ def update(self: S, **overrides: Any) -> S:
return type(self)(**{**self.__dict__, **overrides})

def use_ref(self: S, name: str) -> S:
if self.snapshot_id:
raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}")
if snapshot := self.table_metadata.snapshot_by_name(name):
return self.update(snapshot_id=snapshot.snapshot_id)

raise ValueError(f"Cannot scan unknown ref={name}")
return self.update(ref_name=name)

def select(self: S, *field_names: str) -> S:
if "*" in self.selected_fields:
Expand Down
6 changes: 6 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,9 @@ def test_scan_tag(catalog: Catalog) -> None:
arrow_table = test_positional_mor_deletes.scan().use_ref("tag_12").to_arrow()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

arrow_table = test_positional_mor_deletes.scan(ref_name="tag_12").to_arrow()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
Expand All @@ -608,6 +611,9 @@ def test_scan_branch(catalog: Catalog) -> None:
arrow_table = test_positional_mor_deletes.scan().use_ref("without_5").to_arrow()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12]

arrow_table = test_positional_mor_deletes.scan(ref_name="without_5").to_arrow()
assert arrow_table["number"].to_pylist() == [1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12]


@pytest.mark.integration
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
Expand Down
13 changes: 11 additions & 2 deletions tests/table/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,23 @@ def test_table_scan_row_filter(table_v2: Table) -> None:

def test_table_scan_ref(table_v2: Table) -> None:
scan = table_v2.scan()
assert scan.use_ref("test").snapshot_id == 3051729675574597004
assert scan.use_ref("test").ref_name == "test"


def test_table_scan_ref_and_snapshot_id(table_v2: Table) -> None:
scan = table_v2.scan(snapshot_id=123)

with pytest.raises(ValueError) as exc_info:
_ = scan.use_ref("test").snapshot()

assert "Cannot specify both snapshot_id and ref_name" in str(exc_info.value)


def test_table_scan_ref_does_not_exists(table_v2: Table) -> None:
scan = table_v2.scan()

with pytest.raises(ValueError) as exc_info:
_ = scan.use_ref("boom")
_ = scan.use_ref("boom").snapshot()

assert "Cannot scan unknown ref=boom" in str(exc_info.value)

Expand Down