Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 202 additions & 6 deletions bindings/python/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use iceberg::expr::Bind;
use iceberg::scan::{FileScanTask, FileScanTaskDeleteFile};
use iceberg::spec::{DataContentType, DataFileFormat};
use iceberg::spec::{
DataContentType, DataFileFormat, Literal, NameMapping, PartitionSpec, Struct,
UnboundPartitionSpec,
};
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyAny, PySequence};
use pyo3::types::{PyAny, PyBool, PyBytes, PyFloat, PyInt, PySequence, PyString};
use serde_json::{Number as JsonNumber, Value as JsonValue};

use crate::expression::PyPredicate;
use crate::schema::PySchema;
Expand Down Expand Up @@ -86,6 +92,170 @@ fn py_deletes_to_rust(values: Option<&Bound<'_, PyAny>>) -> PyResult<Vec<FileSca
Ok(out)
}

fn parse_partition_spec(
value: Option<&str>,
schema: &PySchema,
) -> PyResult<Option<Arc<PartitionSpec>>> {
value
.map(|value| {
let spec: UnboundPartitionSpec = serde_json::from_str(value).map_err(|e| {
PyValueError::new_err(format!("Failed to parse partition_spec JSON: {e}"))
})?;
spec.bind(schema.inner.clone())
.map(Arc::new)
.map_err(crate::error::to_py_err)
})
.transpose()
}

fn parse_name_mapping(value: Option<&str>) -> PyResult<Option<Arc<NameMapping>>> {
value
.map(|value| {
serde_json::from_str(value).map(Arc::new).map_err(|e| {
PyValueError::new_err(format!("Failed to parse name_mapping JSON: {e}"))
})
})
.transpose()
}

fn bytes_to_hex(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut out = String::with_capacity(bytes.len() * 2);
for byte in bytes {
out.push(HEX[(byte >> 4) as usize] as char);
out.push(HEX[(byte & 0x0f) as usize] as char);
}
out
}

fn py_to_json_value(value: &Bound<'_, PyAny>) -> PyResult<JsonValue> {
if value.is_none() {
return Ok(JsonValue::Null);
}
if value.is_instance_of::<PyBool>() {
return Ok(JsonValue::Bool(value.extract()?));
}
if value.is_instance_of::<PyInt>() {
let v = value.extract::<i64>().map_err(|_| {
PyValueError::new_err(format!(
"integer {} exceeds i64 range; partition values require JSON-compatible integers",
value
.str()
.and_then(|s| s.extract::<String>())
.unwrap_or_else(|_| "<unprintable>".to_string()),
))
})?;
return Ok(JsonValue::Number(v.into()));
}
if value.is_instance_of::<PyFloat>() {
let v = value.extract::<f64>()?;
let number = JsonNumber::from_f64(v)
.ok_or_else(|| PyValueError::new_err("partition float values must be finite"))?;
return Ok(JsonValue::Number(number));
}
if let Ok(v) = value.extract::<String>() {
return Ok(JsonValue::String(v));
}
if let Ok(bytes) = value.cast::<PyBytes>() {
return Ok(JsonValue::String(bytes_to_hex(bytes.as_bytes())));
}
Err(PyTypeError::new_err(format!(
"Cannot convert partition value to Iceberg JSON value: {}",
value.repr()?.to_str()?
)))
}

fn partition_values_to_json_array(values: &Bound<'_, PyAny>) -> PyResult<Vec<JsonValue>> {
if values.is_instance_of::<PyString>() {
match serde_json::from_str::<JsonValue>(&values.extract::<String>()?) {
Ok(JsonValue::Array(values)) => return Ok(values),
Ok(_) => {
return Err(PyTypeError::new_err(
"partition_data JSON string must contain an array",
));
}
Err(e) => {
return Err(PyValueError::new_err(format!(
"Failed to parse partition_data JSON: {e}"
)));
}
}
}
if values.is_instance_of::<PyBytes>() {
return Err(PyTypeError::new_err(
"partition_data must be a sequence of values or a JSON array string, not bytes",
));
}

let seq = values.cast::<PySequence>().map_err(|_| {
PyTypeError::new_err("partition_data must be a sequence of values or a JSON array string")
})?;
let len = seq.len()?;
let mut out = Vec::with_capacity(len);
for i in 0..len {
out.push(py_to_json_value(&seq.get_item(i)?)?);
}
Ok(out)
}

fn parse_partition_data(
value: Option<&Bound<'_, PyAny>>,
partition_spec: Option<&Arc<PartitionSpec>>,
schema: &PySchema,
) -> PyResult<Option<Struct>> {
match (value, partition_spec) {
(None, None) => Ok(None),
(None, Some(_)) => Ok(None),
(Some(_), None) => Err(PyValueError::new_err(
"partition_spec is required when partition_data is provided",
)),
(Some(value), Some(partition_spec)) => {
let values = partition_values_to_json_array(value)?;
let partition_type = partition_spec
.partition_type(schema.inner.as_ref())
.map_err(crate::error::to_py_err)?;
let fields = partition_type.fields();
if values.len() != fields.len() {
return Err(PyValueError::new_err(format!(
"partition_data length {} does not match partition_spec field count {}",
values.len(),
fields.len()
)));
}

let literals = values
.into_iter()
.zip(fields.iter())
.map(|(value, field)| {
Literal::try_from_json(value, &field.field_type)
.map_err(crate::error::to_py_err)
})
.collect::<PyResult<Vec<_>>>()?;
Ok(Some(Struct::from_iter(literals)))
}
}
}

fn validate_scan_range(file_size_in_bytes: u64, start: u64, length: Option<u64>) -> PyResult<u64> {
if start > file_size_in_bytes {
return Err(PyValueError::new_err(format!(
"start ({start}) must be less than or equal to file_size_in_bytes ({file_size_in_bytes})"
)));
}

let length = length.unwrap_or(file_size_in_bytes - start);
let end = start.checked_add(length).ok_or_else(|| {
PyValueError::new_err(format!("start ({start}) + length ({length}) overflows u64"))
})?;
if end > file_size_in_bytes {
return Err(PyValueError::new_err(format!(
"start ({start}) + length ({length}) must be less than or equal to file_size_in_bytes ({file_size_in_bytes})"
)));
}

Ok(length)
}

#[pyclass(
name = "DeleteFile",
module = "pyiceberg_core.scan",
Expand Down Expand Up @@ -185,6 +355,9 @@ impl PyFileScanTask {
data_file_format = "parquet",
predicate = None,
deletes = None,
partition_data = None,
partition_spec = None,
name_mapping = None,
case_sensitive = true
))]
fn new(
Expand All @@ -198,13 +371,21 @@ impl PyFileScanTask {
data_file_format: &str,
predicate: Option<&PyPredicate>,
deletes: Option<&Bound<'_, PyAny>>,
partition_data: Option<&Bound<'_, PyAny>>,
partition_spec: Option<&str>,
name_mapping: Option<&str>,
case_sensitive: bool,
) -> PyResult<Self> {
let partition_spec = parse_partition_spec(partition_spec, schema)?;
let partition = parse_partition_data(partition_data, partition_spec.as_ref(), schema)?;
let name_mapping = parse_name_mapping(name_mapping)?;
let length = validate_scan_range(file_size_in_bytes, start, length)?;

Ok(Self {
inner: FileScanTask {
file_size_in_bytes,
start,
length: length.unwrap_or(file_size_in_bytes),
length,
record_count,
data_file_path,
data_file_format: parse_data_file_format(data_file_format)?,
Expand All @@ -215,9 +396,9 @@ impl PyFileScanTask {
.transpose()
.map_err(crate::error::to_py_err)?,
deletes: py_deletes_to_rust(deletes)?,
partition: None,
partition_spec: None,
name_mapping: None,
partition,
partition_spec,
name_mapping,
case_sensitive,
},
})
Expand Down Expand Up @@ -268,6 +449,21 @@ impl PyFileScanTask {
self.inner.predicate.is_some()
}

#[getter]
fn has_partition_data(&self) -> bool {
self.inner.partition.is_some()
}

#[getter]
fn has_partition_spec(&self) -> bool {
self.inner.partition_spec.is_some()
}

#[getter]
fn has_name_mapping(&self) -> bool {
self.inner.name_mapping.is_some()
}

#[getter]
fn case_sensitive(&self) -> bool {
self.inner.case_sensitive
Expand Down
Loading