Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 54 additions & 9 deletions crates/fluss/src/client/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::client::metadata::Metadata;
use crate::metadata::{
DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, PartitionSpec,
TableBucket, TableDescriptor, TableInfo, TablePath,
PhysicalTablePath, TableBucket, TableDescriptor, TableInfo, TablePath,
};
use crate::rpc::message::{
CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest,
Expand All @@ -33,7 +33,6 @@ use crate::error::{Error, Result};
use crate::proto::GetTableInfoResponse;
use crate::{BucketId, PartitionId, TableId};
use std::collections::{HashMap, HashSet};
use std::slice::from_ref;
use std::sync::Arc;
use tokio::task::JoinHandle;

Expand Down Expand Up @@ -294,23 +293,69 @@ impl FlussAdmin {
buckets_id: &[BucketId],
offset_spec: OffsetSpec,
) -> Result<HashMap<i32, i64>> {
self.metadata
.check_and_update_table_metadata(from_ref(table_path))
.await?;
self.do_list_offsets(table_path, None, buckets_id, offset_spec)
.await
}

/// List offset for the specified buckets in a partition. This operation enables to find
/// the beginning offset, end offset as well as the offset matching a timestamp in buckets.
pub async fn list_partition_offsets(
&self,
table_path: &TablePath,
partition_name: &str,
buckets_id: &[BucketId],
offset_spec: OffsetSpec,
) -> Result<HashMap<i32, i64>> {
self.do_list_offsets(table_path, Some(partition_name), buckets_id, offset_spec)
.await
}

async fn do_list_offsets(
&self,
table_path: &TablePath,
partition_name: Option<&str>,
buckets_id: &[BucketId],
offset_spec: OffsetSpec,
) -> Result<HashMap<i32, i64>> {
if buckets_id.is_empty() {
return Err(Error::UnexpectedError {
return Err(Error::IllegalArgument {
message: "Buckets are empty.".to_string(),
source: None,
});
}

// force to update table metadata like java side
self.metadata.update_table_metadata(table_path).await?;

let cluster = self.metadata.get_cluster();
let table_id = cluster.get_table(table_path)?.table_id;

// Resolve partition_id from partition_name if provided
let partition_id = if let Some(name) = partition_name {
let physical_table_path = Arc::new(PhysicalTablePath::of_partitioned(
Arc::new(table_path.clone()),
Some(name.to_string()),
));

// Update partition metadata like java side
self.metadata
.update_physical_table_metadata(std::slice::from_ref(&physical_table_path))
.await?;

Comment thread
luoyuxia marked this conversation as resolved.
let cluster = self.metadata.get_cluster();
Some(
cluster
.get_partition_id(&physical_table_path)
.ok_or_else(|| Error::PartitionNotExist {
message: format!("Partition '{name}' not found for table '{table_path}'"),
})?,
)
} else {
None
};

// Prepare requests
let requests_by_server =
self.prepare_list_offsets_requests(table_id, None, buckets_id, offset_spec)?;
self.prepare_list_offsets_requests(table_id, partition_id, buckets_id, offset_spec)?;

// Send Requests
let response_futures = self.send_list_offsets_request(requests_by_server).await?;
Expand Down Expand Up @@ -338,7 +383,7 @@ impl FlussAdmin {
let mut node_for_bucket_list: HashMap<i32, Vec<BucketId>> = HashMap::new();

for bucket_id in buckets {
let table_bucket = TableBucket::new(table_id, *bucket_id);
let table_bucket = TableBucket::new_with_partition(table_id, partition_id, *bucket_id);
let leader = cluster.leader_for(&table_bucket).ok_or_else(|| {
// todo: consider retry?
Error::UnexpectedError {
Expand Down
21 changes: 21 additions & 0 deletions crates/fluss/src/client/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,27 @@ impl Metadata {
.await
}

pub async fn update_physical_table_metadata(
&self,
physical_table_paths: &[Arc<PhysicalTablePath>],
) -> Result<()> {
let mut update_table_paths = HashSet::new();
let mut update_partition_paths = HashSet::new();
for physical_table_path in physical_table_paths {
match physical_table_path.get_partition_name() {
Some(_) => {
update_partition_paths.insert(physical_table_path);
}
None => {
update_table_paths.insert(physical_table_path.get_table_path());
Comment thread
luoyuxia marked this conversation as resolved.
}
}
}

self.update_tables_metadata(&update_table_paths, &update_partition_paths, vec![])
.await
}

pub async fn check_and_update_table_metadata(&self, table_paths: &[TablePath]) -> Result<()> {
let cluster_binding = self.cluster.read().clone();
let need_update_table_paths: HashSet<&TablePath> = table_paths
Expand Down
32 changes: 32 additions & 0 deletions crates/fluss/tests/integration/log_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,38 @@ mod table_test {
.await
.expect("Failed to flush batches");

// Test list_offsets_for_partition
Comment thread
luoyuxia marked this conversation as resolved.
// US partition has 4 records: 2 from row append + 2 from batch append
let us_offsets = admin
.list_partition_offsets(&table_path, "US", &[0], OffsetSpec::Latest)
.await
.expect("Failed to list offsets for US partition");
assert_eq!(
us_offsets.get(&0),
Some(&4),
"US partition should have 4 records"
);

// EU partition has 4 records: 2 from row append + 2 from batch append
let eu_offsets = admin
Comment thread
luoyuxia marked this conversation as resolved.
.list_partition_offsets(&table_path, "EU", &[0], OffsetSpec::Latest)
.await
.expect("Failed to list offsets for EU partition");
assert_eq!(
eu_offsets.get(&0),
Some(&4),
"EU partition should have 4 records"
);

// test list a not exist partition should return error
let result = admin
.list_partition_offsets(&table_path, "NOT Exists", &[0], OffsetSpec::Latest)
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains(
"Table partition 'fluss.test_partitioned_log_append(p=NOT Exists)' does not exist."
));

admin
.drop_table(&table_path, false)
.await
Expand Down
Loading