diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index ea1efc38..9061169d 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -18,7 +18,7 @@ use crate::client::metadata::Metadata; use crate::metadata::{ DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, PartitionSpec, - TableBucket, TableDescriptor, TableInfo, TablePath, + PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, TablePath, }; use crate::rpc::message::{ CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest, @@ -33,7 +33,6 @@ use crate::error::{Error, Result}; use crate::proto::GetTableInfoResponse; use crate::{BucketId, PartitionId, TableId}; use std::collections::{HashMap, HashSet}; -use std::slice::from_ref; use std::sync::Arc; use tokio::task::JoinHandle; @@ -294,23 +293,69 @@ impl FlussAdmin { buckets_id: &[BucketId], offset_spec: OffsetSpec, ) -> Result> { - self.metadata - .check_and_update_table_metadata(from_ref(table_path)) - .await?; + self.do_list_offsets(table_path, None, buckets_id, offset_spec) + .await + } + + /// List offset for the specified buckets in a partition. This operation enables to find + /// the beginning offset, end offset as well as the offset matching a timestamp in buckets. + pub async fn list_partition_offsets( + &self, + table_path: &TablePath, + partition_name: &str, + buckets_id: &[BucketId], + offset_spec: OffsetSpec, + ) -> Result> { + self.do_list_offsets(table_path, Some(partition_name), buckets_id, offset_spec) + .await + } + async fn do_list_offsets( + &self, + table_path: &TablePath, + partition_name: Option<&str>, + buckets_id: &[BucketId], + offset_spec: OffsetSpec, + ) -> Result> { if buckets_id.is_empty() { - return Err(Error::UnexpectedError { + return Err(Error::IllegalArgument { message: "Buckets are empty.".to_string(), - source: None, }); } + // force to update table metadata like java side + self.metadata.update_table_metadata(table_path).await?; + let cluster = self.metadata.get_cluster(); let table_id = cluster.get_table(table_path)?.table_id; + // Resolve partition_id from partition_name if provided + let partition_id = if let Some(name) = partition_name { + let physical_table_path = Arc::new(PhysicalTablePath::of_partitioned( + Arc::new(table_path.clone()), + Some(name.to_string()), + )); + + // Update partition metadata like java side + self.metadata + .update_physical_table_metadata(std::slice::from_ref(&physical_table_path)) + .await?; + + let cluster = self.metadata.get_cluster(); + Some( + cluster + .get_partition_id(&physical_table_path) + .ok_or_else(|| Error::PartitionNotExist { + message: format!("Partition '{name}' not found for table '{table_path}'"), + })?, + ) + } else { + None + }; + // Prepare requests let requests_by_server = - self.prepare_list_offsets_requests(table_id, None, buckets_id, offset_spec)?; + self.prepare_list_offsets_requests(table_id, partition_id, buckets_id, offset_spec)?; // Send Requests let response_futures = self.send_list_offsets_request(requests_by_server).await?; @@ -338,7 +383,7 @@ impl FlussAdmin { let mut node_for_bucket_list: HashMap> = HashMap::new(); for bucket_id in buckets { - let table_bucket = TableBucket::new(table_id, *bucket_id); + let table_bucket = TableBucket::new_with_partition(table_id, partition_id, *bucket_id); let leader = cluster.leader_for(&table_bucket).ok_or_else(|| { // todo: consider retry? Error::UnexpectedError { diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 52ccd62e..c6244cd7 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -142,6 +142,27 @@ impl Metadata { .await } + pub async fn update_physical_table_metadata( + &self, + physical_table_paths: &[Arc], + ) -> Result<()> { + let mut update_table_paths = HashSet::new(); + let mut update_partition_paths = HashSet::new(); + for physical_table_path in physical_table_paths { + match physical_table_path.get_partition_name() { + Some(_) => { + update_partition_paths.insert(physical_table_path); + } + None => { + update_table_paths.insert(physical_table_path.get_table_path()); + } + } + } + + self.update_tables_metadata(&update_table_paths, &update_partition_paths, vec![]) + .await + } + pub async fn check_and_update_table_metadata(&self, table_paths: &[TablePath]) -> Result<()> { let cluster_binding = self.cluster.read().clone(); let need_update_table_paths: HashSet<&TablePath> = table_paths diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 64e6289c..27b4d831 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -1072,6 +1072,38 @@ mod table_test { .await .expect("Failed to flush batches"); + // Test list_offsets_for_partition + // US partition has 4 records: 2 from row append + 2 from batch append + let us_offsets = admin + .list_partition_offsets(&table_path, "US", &[0], OffsetSpec::Latest) + .await + .expect("Failed to list offsets for US partition"); + assert_eq!( + us_offsets.get(&0), + Some(&4), + "US partition should have 4 records" + ); + + // EU partition has 4 records: 2 from row append + 2 from batch append + let eu_offsets = admin + .list_partition_offsets(&table_path, "EU", &[0], OffsetSpec::Latest) + .await + .expect("Failed to list offsets for EU partition"); + assert_eq!( + eu_offsets.get(&0), + Some(&4), + "EU partition should have 4 records" + ); + + // test list a not exist partition should return error + let result = admin + .list_partition_offsets(&table_path, "NOT Exists", &[0], OffsetSpec::Latest) + .await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains( + "Table partition 'fluss.test_partitioned_log_append(p=NOT Exists)' does not exist." + )); + admin .drop_table(&table_path, false) .await