From a2ac3d7fa867eeb3b4dd0e3b408df8361ca312af Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sun, 24 May 2026 23:06:37 -0500 Subject: [PATCH 1/2] feat(python): bind table scan planning --- bindings/python/src/scan.rs | 129 ++++++++++++++++++++++++++++- bindings/python/tests/test_scan.py | 74 +++++++++++++++++ 2 files changed, 201 insertions(+), 2 deletions(-) diff --git a/bindings/python/src/scan.rs b/bindings/python/src/scan.rs index 61c1e42f40..434fb799b8 100644 --- a/bindings/python/src/scan.rs +++ b/bindings/python/src/scan.rs @@ -21,7 +21,7 @@ use arrow::datatypes::SchemaRef as ArrowSchemaRef; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::pyarrow::IntoPyArrow; use arrow::record_batch::{RecordBatch, RecordBatchReader}; -use futures::{StreamExt, stream}; +use futures::{StreamExt, TryStreamExt, stream}; use iceberg::arrow::{ArrowReaderBuilder, arrow_schema_for_file_scan_task, schema_to_arrow_schema}; use iceberg::expr::Bind; use iceberg::metadata_columns::is_metadata_field; @@ -34,7 +34,7 @@ use iceberg::spec::{ }; use pyo3::exceptions::{PyTypeError, PyValueError}; use pyo3::prelude::*; -use pyo3::types::{PyAny, PyBool, PyBytes, PyFloat, PyInt, PySequence, PyString}; +use pyo3::types::{PyAny, PyBool, PyBytes, PyFloat, PyInt, PySequence, PyString, PyType}; use serde_json::{Number as JsonNumber, Value as JsonValue}; use crate::expression::PyPredicate; @@ -686,11 +686,136 @@ impl PyArrowReader { } } +#[pyclass( + name = "Table", + module = "pyiceberg_core.scan", + skip_from_py_object +)] +#[derive(Clone)] +pub struct PyTable { + pub(crate) inner: iceberg::table::Table, +} + +#[pymethods] +impl PyTable { + #[classmethod] + #[pyo3(signature = ( + file_io, + identifier, + metadata_json, + *, + metadata_location = None, + disable_cache = false, + cache_size_bytes = None + ))] + fn from_metadata_json( + _cls: &Bound<'_, PyType>, + file_io: &PyFileIO, + identifier: Vec, + metadata_json: String, + metadata_location: Option, + disable_cache: bool, + cache_size_bytes: Option, + ) -> PyResult { + let ident = iceberg::TableIdent::from_strs(identifier).map_err(crate::error::to_py_err)?; + let metadata: iceberg::spec::TableMetadata = serde_json::from_str(&metadata_json) + .map_err(|e| PyValueError::new_err(format!("Failed to parse metadata JSON: {}", e)))?; + + let mut builder = iceberg::table::Table::builder() + .file_io(file_io.inner.clone()) + .metadata(metadata) + .identifier(ident) + .runtime(iceberg_runtime()) + .readonly(true); + + if disable_cache { + builder = builder.disable_cache(); + } + if let Some(size) = cache_size_bytes { + builder = builder.cache_size_bytes(size); + } + if let Some(loc) = metadata_location { + builder = builder.metadata_location(loc); + } + + let table = builder.build().map_err(crate::error::to_py_err)?; + Ok(Self { inner: table }) + } + + #[pyo3(signature = ( + *, + selected_fields = None, + predicate = None, + snapshot_id = None, + case_sensitive = true, + concurrency_limit = None, + manifest_entry_concurrency_limit = None + ))] + fn plan_files( + &self, + selected_fields: Option>, + predicate: Option<&Bound<'_, PyPredicate>>, + snapshot_id: Option, + case_sensitive: bool, + concurrency_limit: Option, + manifest_entry_concurrency_limit: Option, + ) -> PyResult> { + let mut scan_builder = self.inner.scan(); + + if let Some(fields) = selected_fields { + if fields.is_empty() { + scan_builder = scan_builder.select_empty(); + } else { + scan_builder = scan_builder.select(fields); + } + } else { + scan_builder = scan_builder.select_all(); + } + + if let Some(pred_bound) = predicate { + let pred = pred_bound.extract::>()?; + scan_builder = scan_builder.with_filter(pred.inner.clone()); + } + + if let Some(snap_id) = snapshot_id { + scan_builder = scan_builder.snapshot_id(snap_id); + } + + scan_builder = scan_builder.with_case_sensitive(case_sensitive); + + if let Some(limit) = concurrency_limit { + scan_builder = scan_builder.with_concurrency_limit(limit); + } + + if let Some(limit) = manifest_entry_concurrency_limit { + scan_builder = scan_builder.with_manifest_entry_concurrency_limit(limit); + } + + let scan = scan_builder.build().map_err(crate::error::to_py_err)?; + + let task_stream = runtime() + .block_on(async { scan.plan_files().await }) + .map_err(crate::error::to_py_err)?; + + let tasks = runtime() + .block_on(async { task_stream.try_collect::>().await }) + .map_err(crate::error::to_py_err)?; + + let py_tasks = tasks + .into_iter() + .map(|inner| PyFileScanTask { inner }) + .collect(); + + Ok(py_tasks) + } +} + pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { let this = PyModule::new(py, "scan")?; this.add_class::()?; this.add_class::()?; this.add_class::()?; + this.add_class::()?; m.add_submodule(&this)?; py.import("sys")? .getattr("modules")? diff --git a/bindings/python/tests/test_scan.py b/bindings/python/tests/test_scan.py index 760acb0d13..3663a2b3b7 100644 --- a/bindings/python/tests/test_scan.py +++ b/bindings/python/tests/test_scan.py @@ -476,3 +476,77 @@ def test_arrow_reader_with_real_parquet_and_limits(tmp_path): batch_reader_limit_0 = reader.read(schema(), [task_limit_0], max_rows=0) res_table_limit_0 = batch_reader_limit_0.read_all() assert len(res_table_limit_0) == 0 + + +TABLE_METADATA_JSON = json.dumps( + { + "format-version": 2, + "table-uuid": "fb070e82-2d1f-4ef6-8ab6-c4d12c6ed490", + "location": "s3://bucket/table", + "last-sequence-number": 1, + "last-updated-ms": 1600000000000, + "last-column-id": 2, + "schemas": [ + { + "schema-id": 1, + "type": "struct", + "fields": [ + {"id": 1, "name": "id", "required": True, "type": "long"}, + {"id": 2, "name": "name", "required": False, "type": "string"}, + ], + } + ], + "current-schema-id": 1, + "partition-specs": [{"spec-id": 0, "fields": []}], + "default-spec-id": 0, + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [], + } +) + + +def test_table_from_metadata_json_validation(): + from pyiceberg_core.scan import Table + + # 1. Test metadata JSON validation error + with pytest.raises(ValueError, match="Failed to parse metadata JSON"): + Table.from_metadata_json(FileIO.from_props({}), ["ns", "tbl"], "{invalid_json}") + + # 2. Test invalid identifier (empty sequence) + with pytest.raises(ValueError, match="Table identifier can't be empty"): + Table.from_metadata_json(FileIO.from_props({}), [], TABLE_METADATA_JSON) + + +def test_table_empty_table_planning(): + from pyiceberg_core.scan import Table + + # 3. Test successful parsing and empty table planning + table = Table.from_metadata_json( + FileIO.from_props({}), + ["ns", "tbl"], + TABLE_METADATA_JSON, + ) + + # 4. Test planning files on empty table returns empty list + tasks = table.plan_files() + assert isinstance(tasks, list) + assert len(tasks) == 0 + + # 5. Test selected field planning works (does not raise error) + tasks_projected = table.plan_files(selected_fields=["id"]) + assert len(tasks_projected) == 0 + + tasks_empty = table.plan_files(selected_fields=[]) + assert len(tasks_empty) == 0 + + # 6. Test predicate argument acceptance + from pyiceberg_core.expression import Reference + + tasks_pred = table.plan_files(predicate=Reference("id").eq(5)) + assert len(tasks_pred) == 0 From 717f0b0a94cfda9163e4e25b0a1b1dc5ec2e7708 Mon Sep 17 00:00:00 2001 From: Abanoub Doss Date: Sun, 24 May 2026 23:39:15 -0500 Subject: [PATCH 2/2] python: release gil while planning table scans --- bindings/python/src/scan.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/bindings/python/src/scan.rs b/bindings/python/src/scan.rs index 434fb799b8..a3998aea84 100644 --- a/bindings/python/src/scan.rs +++ b/bindings/python/src/scan.rs @@ -753,6 +753,7 @@ impl PyTable { ))] fn plan_files( &self, + py: Python<'_>, selected_fields: Option>, predicate: Option<&Bound<'_, PyPredicate>>, snapshot_id: Option, @@ -793,13 +794,15 @@ impl PyTable { let scan = scan_builder.build().map_err(crate::error::to_py_err)?; - let task_stream = runtime() - .block_on(async { scan.plan_files().await }) - .map_err(crate::error::to_py_err)?; + let tasks = py.detach(|| { + let task_stream = runtime() + .block_on(async { scan.plan_files().await }) + .map_err(crate::error::to_py_err)?; - let tasks = runtime() - .block_on(async { task_stream.try_collect::>().await }) - .map_err(crate::error::to_py_err)?; + runtime() + .block_on(async { task_stream.try_collect::>().await }) + .map_err(crate::error::to_py_err) + })?; let py_tasks = tasks .into_iter()