Skip to content

Commit ff879b9

Browse files
feat: Add unsubscribe_partition to python bindings (#277)
1 parent 26edb7a commit ff879b9

3 files changed

Lines changed: 38 additions & 0 deletions

File tree

bindings/python/example/example.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,19 @@ async def main():
717717
print(f"\nto_arrow() returned {partitioned_arrow.num_rows} records from partitioned table:")
718718
print(partitioned_arrow.to_pandas())
719719

720+
# Demo: unsubscribe_partition - unsubscribe from one partition, read remaining
721+
print("\n--- Testing unsubscribe_partition ---")
722+
partitioned_scanner3 = await partitioned_table.new_scan().create_batch_scanner()
723+
for p in partition_infos:
724+
partitioned_scanner3.subscribe_partition(p.partition_id, 0, fluss.EARLIEST_OFFSET)
725+
# Unsubscribe from the first partition
726+
first_partition = partition_infos[0]
727+
partitioned_scanner3.unsubscribe_partition(first_partition.partition_id, 0)
728+
print(f"Unsubscribed from partition {first_partition.partition_name} (id={first_partition.partition_id})")
729+
remaining_arrow = partitioned_scanner3.to_arrow()
730+
print(f"After unsubscribe, to_arrow() returned {remaining_arrow.num_rows} records (from remaining partitions):")
731+
print(remaining_arrow.to_pandas())
732+
720733
# Demo: to_pandas() also works for partitioned tables
721734
print("\n--- Testing to_pandas() on partitioned table ---")
722735
partitioned_scanner2 = await partitioned_table.new_scan().create_batch_scanner()

bindings/python/fluss/__init__.pyi

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,14 @@ class LogScanner:
420420
start_offset: The offset to start reading from (use EARLIEST_OFFSET for beginning)
421421
"""
422422
...
423+
def unsubscribe_partition(self, partition_id: int, bucket_id: int) -> None:
424+
"""Unsubscribe from a specific partition bucket (partitioned tables only).
425+
426+
Args:
427+
partition_id: The partition ID to unsubscribe from
428+
bucket_id: The bucket ID within the partition
429+
"""
430+
...
423431
def poll(self, timeout_ms: int) -> List[ScanRecord]:
424432
"""Poll for individual records with metadata.
425433

bindings/python/src/table.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,6 +1657,23 @@ impl LogScanner {
16571657
})
16581658
}
16591659

1660+
/// Unsubscribe from a specific partition bucket (partitioned tables only).
1661+
///
1662+
/// Args:
1663+
/// partition_id: The partition ID to unsubscribe from
1664+
/// bucket_id: The bucket ID within the partition
1665+
fn unsubscribe_partition(&self, py: Python, partition_id: i64, bucket_id: i32) -> PyResult<()> {
1666+
py.detach(|| {
1667+
TOKIO_RUNTIME.block_on(async {
1668+
with_scanner!(
1669+
&self.scanner,
1670+
unsubscribe_partition(partition_id, bucket_id)
1671+
)
1672+
.map_err(|e| FlussError::new_err(e.to_string()))
1673+
})
1674+
})
1675+
}
1676+
16601677
/// Poll for individual records with metadata.
16611678
///
16621679
/// Args:

0 commit comments

Comments
 (0)