diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index c285f25c..7184c8d2 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -265,34 +265,29 @@ impl TableScan { let (projected_schema, projected_row_type) = calculate_projected_types(&table_info, projection_indices)?; - let py_scanner = match scanner_type { + let scanner_kind = match scanner_type { ScannerType::Record => { - let rust_scanner = table_scan.create_log_scanner().map_err(|e| { + let s = table_scan.create_log_scanner().map_err(|e| { FlussError::new_err(format!("Failed to create log scanner: {e}")) })?; - LogScanner::from_log_scanner( - rust_scanner, - admin, - table_info, - projected_schema, - projected_row_type, - ) + ScannerKind::Record(s) } ScannerType::Batch => { - let rust_scanner = - table_scan.create_record_batch_log_scanner().map_err(|e| { - FlussError::new_err(format!("Failed to create batch scanner: {e}")) - })?; - LogScanner::from_batch_scanner( - rust_scanner, - admin, - table_info, - projected_schema, - projected_row_type, - ) + let s = table_scan.create_record_batch_log_scanner().map_err(|e| { + FlussError::new_err(format!("Failed to create batch scanner: {e}")) + })?; + ScannerKind::Batch(s) } }; + let py_scanner = LogScanner::new( + scanner_kind, + admin, + table_info, + projected_schema, + projected_row_type, + ); + Python::attach(|py| Py::new(py, py_scanner)) }) } @@ -1555,6 +1550,44 @@ fn get_type_name(value: &Bound) -> String { .unwrap_or_else(|_| "unknown".to_string()) } +/// Wraps the two scanner variants so we never have an impossible state +/// (both None or both Some). +enum ScannerKind { + Record(fcore::client::LogScanner), + Batch(fcore::client::RecordBatchLogScanner), +} + +impl ScannerKind { + fn as_record(&self) -> PyResult<&fcore::client::LogScanner> { + match self { + Self::Record(s) => Ok(s), + Self::Batch(_) => Err(FlussError::new_err( + "poll() requires a record-based scanner. Use new_scan().create_log_scanner().", + )), + } + } + + fn as_batch(&self) -> PyResult<&fcore::client::RecordBatchLogScanner> { + match self { + Self::Batch(s) => Ok(s), + Self::Record(_) => Err(FlussError::new_err( + "This method requires a batch-based scanner. Use new_scan().create_batch_scanner().", + )), + } + } +} + +/// Dispatch a method call to whichever scanner variant is active. +/// Both `LogScanner` and `RecordBatchLogScanner` share the same subscribe interface. +macro_rules! with_scanner { + ($scanner:expr, $method:ident($($arg:expr),*)) => { + match $scanner { + ScannerKind::Record(s) => s.$method($($arg),*).await, + ScannerKind::Batch(s) => s.$method($($arg),*).await, + } + }; +} + /// Scanner for reading log data from a Fluss table. /// /// This scanner supports two modes: @@ -1562,10 +1595,7 @@ fn get_type_name(value: &Bound) -> String { /// - Batch-based scanning via `poll_arrow()` / `poll_batches()` - returns Arrow batches #[pyclass] pub struct LogScanner { - /// Record-based scanner for poll() - inner: Option, - /// Batch-based scanner for poll_arrow/poll_batches - inner_batch: Option, + scanner: ScannerKind, admin: fcore::client::FlussAdmin, table_info: fcore::metadata::TableInfo, /// The projected Arrow schema to use for empty table creation @@ -1586,19 +1616,8 @@ impl LogScanner { fn subscribe(&self, py: Python, bucket_id: i32, start_offset: i64) -> PyResult<()> { py.detach(|| { TOKIO_RUNTIME.block_on(async { - if let Some(ref inner) = self.inner { - inner - .subscribe(bucket_id, start_offset) - .await - .map_err(|e| FlussError::new_err(format!("Failed to subscribe: {e}"))) - } else if let Some(ref inner_batch) = self.inner_batch { - inner_batch - .subscribe(bucket_id, start_offset) - .await - .map_err(|e| FlussError::new_err(format!("Failed to subscribe: {e}"))) - } else { - Err(FlussError::new_err("No scanner available")) - } + with_scanner!(&self.scanner, subscribe(bucket_id, start_offset)) + .map_err(|e| FlussError::new_err(e.to_string())) }) }) } @@ -1610,19 +1629,8 @@ impl LogScanner { fn subscribe_buckets(&self, py: Python, bucket_offsets: HashMap) -> PyResult<()> { py.detach(|| { TOKIO_RUNTIME.block_on(async { - if let Some(ref inner) = self.inner { - inner - .subscribe_buckets(&bucket_offsets) - .await - .map_err(|e| FlussError::new_err(format!("Failed to subscribe batch: {e}"))) - } else if let Some(ref inner_batch) = self.inner_batch { - inner_batch - .subscribe_buckets(&bucket_offsets) - .await - .map_err(|e| FlussError::new_err(format!("Failed to subscribe batch: {e}"))) - } else { - Err(FlussError::new_err("No scanner available")) - } + with_scanner!(&self.scanner, subscribe_buckets(&bucket_offsets)) + .map_err(|e| FlussError::new_err(e.to_string())) }) }) } @@ -1642,23 +1650,11 @@ impl LogScanner { ) -> PyResult<()> { py.detach(|| { TOKIO_RUNTIME.block_on(async { - if let Some(ref inner) = self.inner { - inner - .subscribe_partition(partition_id, bucket_id, start_offset) - .await - .map_err(|e| { - FlussError::new_err(format!("Failed to subscribe partition: {e}")) - }) - } else if let Some(ref inner_batch) = self.inner_batch { - inner_batch - .subscribe_partition(partition_id, bucket_id, start_offset) - .await - .map_err(|e| { - FlussError::new_err(format!("Failed to subscribe partition: {e}")) - }) - } else { - Err(FlussError::new_err("No scanner available")) - } + with_scanner!( + &self.scanner, + subscribe_partition(partition_id, bucket_id, start_offset) + ) + .map_err(|e| FlussError::new_err(e.to_string())) }) }) } @@ -1677,12 +1673,7 @@ impl LogScanner { /// - Returns an empty list if no records are available /// - When timeout expires, returns an empty list (NOT an error) fn poll(&self, py: Python, timeout_ms: i64) -> PyResult> { - let inner = self.inner.as_ref().ok_or_else(|| { - FlussError::new_err( - "Record-based scanner not available. Use new_scan().create_log_scanner() to create a scanner \ - that supports poll().", - ) - })?; + let scanner = self.scanner.as_record()?; if timeout_ms < 0 { return Err(FlussError::new_err(format!( @@ -1692,7 +1683,7 @@ impl LogScanner { let timeout = Duration::from_millis(timeout_ms as u64); let scan_records = py - .detach(|| TOKIO_RUNTIME.block_on(async { inner.poll(timeout).await })) + .detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await })) .map_err(|e| FlussError::new_err(e.to_string()))?; // Convert ScanRecords to Python ScanRecord list @@ -1724,12 +1715,7 @@ impl LogScanner { /// - Returns an empty list if no batches are available /// - When timeout expires, returns an empty list (NOT an error) fn poll_batches(&self, py: Python, timeout_ms: i64) -> PyResult> { - let inner_batch = self.inner_batch.as_ref().ok_or_else(|| { - FlussError::new_err( - "Batch-based scanner not available. Use new_scan().create_batch_scanner() to create a scanner \ - that supports poll_batches().", - ) - })?; + let scanner = self.scanner.as_batch()?; if timeout_ms < 0 { return Err(FlussError::new_err(format!( @@ -1739,7 +1725,7 @@ impl LogScanner { let timeout = Duration::from_millis(timeout_ms as u64); let scan_batches = py - .detach(|| TOKIO_RUNTIME.block_on(async { inner_batch.poll(timeout).await })) + .detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await })) .map_err(|e| FlussError::new_err(e.to_string()))?; // Convert ScanBatch to RecordBatch with metadata @@ -1764,12 +1750,7 @@ impl LogScanner { /// - Returns an empty table (with correct schema) if no records are available /// - When timeout expires, returns an empty table (NOT an error) fn poll_arrow(&self, py: Python, timeout_ms: i64) -> PyResult> { - let inner_batch = self.inner_batch.as_ref().ok_or_else(|| { - FlussError::new_err( - "Batch-based scanner not available. Use new_scan().create_batch_scanner() to create a scanner \ - that supports poll_arrow().", - ) - })?; + let scanner = self.scanner.as_batch()?; if timeout_ms < 0 { return Err(FlussError::new_err(format!( @@ -1779,7 +1760,7 @@ impl LogScanner { let timeout = Duration::from_millis(timeout_ms as u64); let scan_batches = py - .detach(|| TOKIO_RUNTIME.block_on(async { inner_batch.poll(timeout).await })) + .detach(|| TOKIO_RUNTIME.block_on(async { scanner.poll(timeout).await })) .map_err(|e| FlussError::new_err(e.to_string()))?; // Convert ScanBatch to Arrow batches @@ -1822,14 +1803,8 @@ impl LogScanner { /// Returns: /// PyArrow Table containing all data from subscribed buckets fn to_arrow(&self, py: Python) -> PyResult> { - // 1. Get subscribed buckets from scanner (requires batch scanner for get_subscribed_buckets) - let inner_batch = self.inner_batch.as_ref().ok_or_else(|| { - FlussError::new_err( - "Batch-based scanner not available. Use new_scan().create_batch_scanner() to create a scanner \ - that supports to_arrow().", - ) - })?; - let subscribed = inner_batch.get_subscribed_buckets(); + let scanner = self.scanner.as_batch()?; + let subscribed = scanner.get_subscribed_buckets(); if subscribed.is_empty() { return Err(FlussError::new_err( "No buckets subscribed. Call subscribe(), subscribe_buckets(), or subscribe_partition() first.", @@ -1866,36 +1841,15 @@ impl LogScanner { } impl LogScanner { - /// Create LogScanner for record-based scanning - pub fn from_log_scanner( - inner_scanner: fcore::client::LogScanner, - admin: fcore::client::FlussAdmin, - table_info: fcore::metadata::TableInfo, - projected_schema: SchemaRef, - projected_row_type: fcore::metadata::RowType, - ) -> Self { - Self { - inner: Some(inner_scanner), - inner_batch: None, - admin, - table_info, - projected_schema, - projected_row_type, - partition_name_cache: std::sync::RwLock::new(None), - } - } - - /// Create LogScanner for batch-based scanning - pub fn from_batch_scanner( - inner_batch_scanner: fcore::client::RecordBatchLogScanner, + fn new( + scanner: ScannerKind, admin: fcore::client::FlussAdmin, table_info: fcore::metadata::TableInfo, projected_schema: SchemaRef, projected_row_type: fcore::metadata::RowType, ) -> Self { Self { - inner: None, - inner_batch: Some(inner_batch_scanner), + scanner, admin, table_info, projected_schema, @@ -1946,10 +1900,8 @@ impl LogScanner { py: Python, subscribed: &[(fcore::metadata::TableBucket, i64)], ) -> PyResult> { - let inner_batch = self.inner_batch.as_ref().ok_or_else(|| { - FlussError::new_err("Batch-based scanner required for this operation") - })?; - let is_partitioned = inner_batch.is_partitioned(); + let scanner = self.scanner.as_batch()?; + let is_partitioned = scanner.is_partitioned(); let table_path = &self.table_info.table_path; if !is_partitioned { @@ -2055,16 +2007,13 @@ impl LogScanner { py: Python, mut stopping_offsets: HashMap, ) -> PyResult> { - let inner_batch = self.inner_batch.as_ref().ok_or_else(|| { - FlussError::new_err("Batch-based scanner required for this operation") - })?; + let scanner = self.scanner.as_batch()?; let mut all_batches = Vec::new(); while !stopping_offsets.is_empty() { let scan_batches = py .detach(|| { - TOKIO_RUNTIME - .block_on(async { inner_batch.poll(Duration::from_millis(500)).await }) + TOKIO_RUNTIME.block_on(async { scanner.poll(Duration::from_millis(500)).await }) }) .map_err(|e| FlussError::new_err(format!("Failed to poll: {e}")))?;