Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 78 additions & 129 deletions bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,34 +265,29 @@ impl TableScan {
let (projected_schema, projected_row_type) =
calculate_projected_types(&table_info, projection_indices)?;

let py_scanner = match scanner_type {
let scanner_kind = match scanner_type {
ScannerType::Record => {
let rust_scanner = table_scan.create_log_scanner().map_err(|e| {
let s = table_scan.create_log_scanner().map_err(|e| {
FlussError::new_err(format!("Failed to create log scanner: {e}"))
})?;
LogScanner::from_log_scanner(
rust_scanner,
admin,
table_info,
projected_schema,
projected_row_type,
)
ScannerKind::Record(s)
}
ScannerType::Batch => {
let rust_scanner =
table_scan.create_record_batch_log_scanner().map_err(|e| {
FlussError::new_err(format!("Failed to create batch scanner: {e}"))
})?;
LogScanner::from_batch_scanner(
rust_scanner,
admin,
table_info,
projected_schema,
projected_row_type,
)
let s = table_scan.create_record_batch_log_scanner().map_err(|e| {
FlussError::new_err(format!("Failed to create batch scanner: {e}"))
})?;
ScannerKind::Batch(s)
}
};

let py_scanner = LogScanner::new(
scanner_kind,
admin,
table_info,
projected_schema,
projected_row_type,
);

Python::attach(|py| Py::new(py, py_scanner))
})
}
Expand Down Expand Up @@ -1555,17 +1550,52 @@ fn get_type_name(value: &Bound<PyAny>) -> String {
.unwrap_or_else(|_| "unknown".to_string())
}

/// Wraps the two scanner variants so we never have an impossible state
/// (both None or both Some).
enum ScannerKind {
Record(fcore::client::LogScanner),
Batch(fcore::client::RecordBatchLogScanner),
}

impl ScannerKind {
fn as_record(&self) -> PyResult<&fcore::client::LogScanner> {
match self {
Self::Record(s) => Ok(s),
Self::Batch(_) => Err(FlussError::new_err(
"poll() requires a record-based scanner. Use new_scan().create_log_scanner().",
)),
}
}

fn as_batch(&self) -> PyResult<&fcore::client::RecordBatchLogScanner> {
match self {
Self::Batch(s) => Ok(s),
Self::Record(_) => Err(FlussError::new_err(
"This method requires a batch-based scanner. Use new_scan().create_batch_scanner().",
)),
}
}
}

/// Dispatch a method call to whichever scanner variant is active.
/// Both `LogScanner` and `RecordBatchLogScanner` share the same subscribe interface.
macro_rules! with_scanner {
($scanner:expr, $method:ident($($arg:expr),*)) => {
match $scanner {
ScannerKind::Record(s) => s.$method($($arg),*).await,
ScannerKind::Batch(s) => s.$method($($arg),*).await,
}
};
}

/// Scanner for reading log data from a Fluss table.
///
/// This scanner supports two modes:
/// - Record-based scanning via `poll()` - returns individual records with metadata
/// - Batch-based scanning via `poll_arrow()` / `poll_batches()` - returns Arrow batches
#[pyclass]
pub struct LogScanner {
/// Record-based scanner for poll()
inner: Option<fcore::client::LogScanner>,
/// Batch-based scanner for poll_arrow/poll_batches
inner_batch: Option<fcore::client::RecordBatchLogScanner>,
scanner: ScannerKind,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
/// The projected Arrow schema to use for empty table creation
Expand All @@ -1586,19 +1616,8 @@ impl LogScanner {
fn subscribe(&self, py: Python, bucket_id: i32, start_offset: i64) -> PyResult<()> {
py.detach(|| {
TOKIO_RUNTIME.block_on(async {
if let Some(ref inner) = self.inner {
inner
.subscribe(bucket_id, start_offset)
.await
.map_err(|e| FlussError::new_err(format!("Failed to subscribe: {e}")))
} else if let Some(ref inner_batch) = self.inner_batch {
inner_batch
.subscribe(bucket_id, start_offset)
.await
.map_err(|e| FlussError::new_err(format!("Failed to subscribe: {e}")))
} else {
Err(FlussError::new_err("No scanner available"))
}
with_scanner!(&self.scanner, subscribe(bucket_id, start_offset))
.map_err(|e| FlussError::new_err(e.to_string()))
})
})
}
Expand All @@ -1610,19 +1629,8 @@ impl LogScanner {
fn subscribe_buckets(&self, py: Python, bucket_offsets: HashMap<i32, i64>) -> PyResult<()> {
py.detach(|| {
TOKIO_RUNTIME.block_on(async {
if let Some(ref inner) = self.inner {
inner
.subscribe_buckets(&bucket_offsets)
.await
.map_err(|e| FlussError::new_err(format!("Failed to subscribe batch: {e}")))
} else if let Some(ref inner_batch) = self.inner_batch {
inner_batch
.subscribe_buckets(&bucket_offsets)
.await
.map_err(|e| FlussError::new_err(format!("Failed to subscribe batch: {e}")))
} else {
Err(FlussError::new_err("No scanner available"))
}
with_scanner!(&self.scanner, subscribe_buckets(&bucket_offsets))
.map_err(|e| FlussError::new_err(e.to_string()))
})
})
}
Expand All @@ -1642,23 +1650,11 @@ impl LogScanner {
) -> PyResult<()> {
py.detach(|| {
TOKIO_RUNTIME.block_on(async {
if let Some(ref inner) = self.inner {
inner
.subscribe_partition(partition_id, bucket_id, start_offset)
.await
.map_err(|e| {
FlussError::new_err(format!("Failed to subscribe partition: {e}"))
})
} else if let Some(ref inner_batch) = self.inner_batch {
inner_batch
.subscribe_partition(partition_id, bucket_id, start_offset)
.await
.map_err(|e| {
FlussError::new_err(format!("Failed to subscribe partition: {e}"))
})
} else {
Err(FlussError::new_err("No scanner available"))
}
with_scanner!(
&self.scanner,
subscribe_partition(partition_id, bucket_id, start_offset)
)
.map_err(|e| FlussError::new_err(e.to_string()))
})
})
}
Expand All @@ -1677,12 +1673,7 @@ impl LogScanner {
/// - Returns an empty list if no records are available
/// - When timeout expires, returns an empty list (NOT an error)
fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Vec<ScanRecord>> {
let inner = self.inner.as_ref().ok_or_else(|| {
FlussError::new_err(
"Record-based scanner not available. Use new_scan().create_log_scanner() to create a scanner \
that supports poll().",
)
})?;
let scanner = self.scanner.as_record()?;

if timeout_ms < 0 {
return Err(FlussError::new_err(format!(
Expand All @@ -1692,7 +1683,7 @@ impl LogScanner {

let timeout = Duration::from_millis(timeout_ms as u64);
let scan_records = py
.detach(|| TOKIO_RUNTIME.block_on(async { inner.poll(timeout).await }))
.detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await }))
.map_err(|e| FlussError::new_err(e.to_string()))?;

// Convert ScanRecords to Python ScanRecord list
Expand Down Expand Up @@ -1724,12 +1715,7 @@ impl LogScanner {
/// - Returns an empty list if no batches are available
/// - When timeout expires, returns an empty list (NOT an error)
fn poll_batches(&self, py: Python, timeout_ms: i64) -> PyResult<Vec<RecordBatch>> {
let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
FlussError::new_err(
"Batch-based scanner not available. Use new_scan().create_batch_scanner() to create a scanner \
that supports poll_batches().",
)
})?;
let scanner = self.scanner.as_batch()?;

if timeout_ms < 0 {
return Err(FlussError::new_err(format!(
Expand All @@ -1739,7 +1725,7 @@ impl LogScanner {

let timeout = Duration::from_millis(timeout_ms as u64);
let scan_batches = py
.detach(|| TOKIO_RUNTIME.block_on(async { inner_batch.poll(timeout).await }))
.detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await }))
.map_err(|e| FlussError::new_err(e.to_string()))?;

// Convert ScanBatch to RecordBatch with metadata
Expand All @@ -1764,12 +1750,7 @@ impl LogScanner {
/// - Returns an empty table (with correct schema) if no records are available
/// - When timeout expires, returns an empty table (NOT an error)
fn poll_arrow(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> {
let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
FlussError::new_err(
"Batch-based scanner not available. Use new_scan().create_batch_scanner() to create a scanner \
that supports poll_arrow().",
)
})?;
let scanner = self.scanner.as_batch()?;

if timeout_ms < 0 {
return Err(FlussError::new_err(format!(
Expand All @@ -1779,7 +1760,7 @@ impl LogScanner {

let timeout = Duration::from_millis(timeout_ms as u64);
let scan_batches = py
.detach(|| TOKIO_RUNTIME.block_on(async { inner_batch.poll(timeout).await }))
.detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await }))
.map_err(|e| FlussError::new_err(e.to_string()))?;

// Convert ScanBatch to Arrow batches
Expand Down Expand Up @@ -1822,14 +1803,8 @@ impl LogScanner {
/// Returns:
/// PyArrow Table containing all data from subscribed buckets
fn to_arrow(&self, py: Python) -> PyResult<Py<PyAny>> {
// 1. Get subscribed buckets from scanner (requires batch scanner for get_subscribed_buckets)
let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
FlussError::new_err(
"Batch-based scanner not available. Use new_scan().create_batch_scanner() to create a scanner \
that supports to_arrow().",
)
})?;
let subscribed = inner_batch.get_subscribed_buckets();
let scanner = self.scanner.as_batch()?;
let subscribed = scanner.get_subscribed_buckets();
if subscribed.is_empty() {
return Err(FlussError::new_err(
"No buckets subscribed. Call subscribe(), subscribe_buckets(), or subscribe_partition() first.",
Expand Down Expand Up @@ -1866,36 +1841,15 @@ impl LogScanner {
}

impl LogScanner {
/// Create LogScanner for record-based scanning
pub fn from_log_scanner(
inner_scanner: fcore::client::LogScanner,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
projected_schema: SchemaRef,
projected_row_type: fcore::metadata::RowType,
) -> Self {
Self {
inner: Some(inner_scanner),
inner_batch: None,
admin,
table_info,
projected_schema,
projected_row_type,
partition_name_cache: std::sync::RwLock::new(None),
}
}

/// Create LogScanner for batch-based scanning
pub fn from_batch_scanner(
inner_batch_scanner: fcore::client::RecordBatchLogScanner,
fn new(
scanner: ScannerKind,
admin: fcore::client::FlussAdmin,
table_info: fcore::metadata::TableInfo,
projected_schema: SchemaRef,
projected_row_type: fcore::metadata::RowType,
) -> Self {
Self {
inner: None,
inner_batch: Some(inner_batch_scanner),
scanner,
admin,
table_info,
projected_schema,
Expand Down Expand Up @@ -1946,10 +1900,8 @@ impl LogScanner {
py: Python,
subscribed: &[(fcore::metadata::TableBucket, i64)],
) -> PyResult<HashMap<fcore::metadata::TableBucket, i64>> {
let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
FlussError::new_err("Batch-based scanner required for this operation")
})?;
let is_partitioned = inner_batch.is_partitioned();
let scanner = self.scanner.as_batch()?;
let is_partitioned = scanner.is_partitioned();
let table_path = &self.table_info.table_path;

if !is_partitioned {
Expand Down Expand Up @@ -2055,16 +2007,13 @@ impl LogScanner {
py: Python,
mut stopping_offsets: HashMap<fcore::metadata::TableBucket, i64>,
) -> PyResult<Py<PyAny>> {
let inner_batch = self.inner_batch.as_ref().ok_or_else(|| {
FlussError::new_err("Batch-based scanner required for this operation")
})?;
let scanner = self.scanner.as_batch()?;
let mut all_batches = Vec::new();

while !stopping_offsets.is_empty() {
let scan_batches = py
.detach(|| {
TOKIO_RUNTIME
.block_on(async { inner_batch.poll(Duration::from_millis(500)).await })
TOKIO_RUNTIME.block_on(async { scanner.poll(Duration::from_millis(500)).await })
})
.map_err(|e| FlussError::new_err(format!("Failed to poll: {e}")))?;

Expand Down
Loading