Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 130 additions & 2 deletions bindings/python/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::pyarrow::IntoPyArrow;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use futures::{StreamExt, stream};
use futures::{StreamExt, TryStreamExt, stream};
use iceberg::arrow::{ArrowReaderBuilder, arrow_schema_for_file_scan_task, schema_to_arrow_schema};
use iceberg::expr::Bind;
use iceberg::metadata_columns::is_metadata_field;
Expand All @@ -34,7 +34,7 @@ use iceberg::spec::{
};
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyAny, PyBool, PyBytes, PyFloat, PyInt, PySequence, PyString};
use pyo3::types::{PyAny, PyBool, PyBytes, PyFloat, PyInt, PySequence, PyString, PyType};
use serde_json::{Number as JsonNumber, Value as JsonValue};

use crate::expression::PyPredicate;
Expand Down Expand Up @@ -686,11 +686,139 @@ impl PyArrowReader {
}
}

#[pyclass(
name = "Table",
module = "pyiceberg_core.scan",
skip_from_py_object
)]
#[derive(Clone)]
pub struct PyTable {
pub(crate) inner: iceberg::table::Table,
}

#[pymethods]
impl PyTable {
#[classmethod]
#[pyo3(signature = (
file_io,
identifier,
metadata_json,
*,
metadata_location = None,
disable_cache = false,
cache_size_bytes = None
))]
fn from_metadata_json(
_cls: &Bound<'_, PyType>,
file_io: &PyFileIO,
identifier: Vec<String>,
metadata_json: String,
metadata_location: Option<String>,
disable_cache: bool,
cache_size_bytes: Option<u64>,
) -> PyResult<Self> {
let ident = iceberg::TableIdent::from_strs(identifier).map_err(crate::error::to_py_err)?;
let metadata: iceberg::spec::TableMetadata = serde_json::from_str(&metadata_json)
.map_err(|e| PyValueError::new_err(format!("Failed to parse metadata JSON: {}", e)))?;

let mut builder = iceberg::table::Table::builder()
.file_io(file_io.inner.clone())
.metadata(metadata)
.identifier(ident)
.runtime(iceberg_runtime())
.readonly(true);

if disable_cache {
builder = builder.disable_cache();
}
if let Some(size) = cache_size_bytes {
builder = builder.cache_size_bytes(size);
}
if let Some(loc) = metadata_location {
builder = builder.metadata_location(loc);
}

let table = builder.build().map_err(crate::error::to_py_err)?;
Ok(Self { inner: table })
}

#[pyo3(signature = (
*,
selected_fields = None,
predicate = None,
snapshot_id = None,
case_sensitive = true,
concurrency_limit = None,
manifest_entry_concurrency_limit = None
))]
fn plan_files(
&self,
py: Python<'_>,
selected_fields: Option<Vec<String>>,
predicate: Option<&Bound<'_, PyPredicate>>,
snapshot_id: Option<i64>,
case_sensitive: bool,
concurrency_limit: Option<usize>,
manifest_entry_concurrency_limit: Option<usize>,
) -> PyResult<Vec<PyFileScanTask>> {
let mut scan_builder = self.inner.scan();

if let Some(fields) = selected_fields {
if fields.is_empty() {
scan_builder = scan_builder.select_empty();
} else {
scan_builder = scan_builder.select(fields);
}
} else {
scan_builder = scan_builder.select_all();
}

if let Some(pred_bound) = predicate {
let pred = pred_bound.extract::<PyRef<'_, PyPredicate>>()?;
scan_builder = scan_builder.with_filter(pred.inner.clone());
}

if let Some(snap_id) = snapshot_id {
scan_builder = scan_builder.snapshot_id(snap_id);
}

scan_builder = scan_builder.with_case_sensitive(case_sensitive);

if let Some(limit) = concurrency_limit {
scan_builder = scan_builder.with_concurrency_limit(limit);
}

if let Some(limit) = manifest_entry_concurrency_limit {
scan_builder = scan_builder.with_manifest_entry_concurrency_limit(limit);
}

let scan = scan_builder.build().map_err(crate::error::to_py_err)?;

let tasks = py.detach(|| {
let task_stream = runtime()
.block_on(async { scan.plan_files().await })
.map_err(crate::error::to_py_err)?;

runtime()
.block_on(async { task_stream.try_collect::<Vec<FileScanTask>>().await })
.map_err(crate::error::to_py_err)
})?;

let py_tasks = tasks
.into_iter()
.map(|inner| PyFileScanTask { inner })
.collect();

Ok(py_tasks)
}
}

pub fn register_module(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
let this = PyModule::new(py, "scan")?;
this.add_class::<PyDeleteFile>()?;
this.add_class::<PyFileScanTask>()?;
this.add_class::<PyArrowReader>()?;
this.add_class::<PyTable>()?;
m.add_submodule(&this)?;
py.import("sys")?
.getattr("modules")?
Expand Down
74 changes: 74 additions & 0 deletions bindings/python/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,3 +476,77 @@ def test_arrow_reader_with_real_parquet_and_limits(tmp_path):
batch_reader_limit_0 = reader.read(schema(), [task_limit_0], max_rows=0)
res_table_limit_0 = batch_reader_limit_0.read_all()
assert len(res_table_limit_0) == 0


TABLE_METADATA_JSON = json.dumps(
{
"format-version": 2,
"table-uuid": "fb070e82-2d1f-4ef6-8ab6-c4d12c6ed490",
"location": "s3://bucket/table",
"last-sequence-number": 1,
"last-updated-ms": 1600000000000,
"last-column-id": 2,
"schemas": [
{
"schema-id": 1,
"type": "struct",
"fields": [
{"id": 1, "name": "id", "required": True, "type": "long"},
{"id": 2, "name": "name", "required": False, "type": "string"},
],
}
],
"current-schema-id": 1,
"partition-specs": [{"spec-id": 0, "fields": []}],
"default-spec-id": 0,
"last-partition-id": 1000,
"default-sort-order-id": 0,
"sort-orders": [{"order-id": 0, "fields": []}],
"properties": {},
"current-snapshot-id": -1,
"snapshots": [],
"snapshot-log": [],
"metadata-log": [],
}
)


def test_table_from_metadata_json_validation():
from pyiceberg_core.scan import Table

# 1. Test metadata JSON validation error
with pytest.raises(ValueError, match="Failed to parse metadata JSON"):
Table.from_metadata_json(FileIO.from_props({}), ["ns", "tbl"], "{invalid_json}")

# 2. Test invalid identifier (empty sequence)
with pytest.raises(ValueError, match="Table identifier can't be empty"):
Table.from_metadata_json(FileIO.from_props({}), [], TABLE_METADATA_JSON)


def test_table_empty_table_planning():
from pyiceberg_core.scan import Table

# 3. Test successful parsing and empty table planning
table = Table.from_metadata_json(
FileIO.from_props({}),
["ns", "tbl"],
TABLE_METADATA_JSON,
)

# 4. Test planning files on empty table returns empty list
tasks = table.plan_files()
assert isinstance(tasks, list)
assert len(tasks) == 0

# 5. Test selected field planning works (does not raise error)
tasks_projected = table.plan_files(selected_fields=["id"])
assert len(tasks_projected) == 0

tasks_empty = table.plan_files(selected_fields=[])
assert len(tasks_empty) == 0

# 6. Test predicate argument acceptance
from pyiceberg_core.expression import Reference

tasks_pred = table.plan_files(predicate=Reference("id").eq(5))
assert len(tasks_pred) == 0