Skip to content
Merged
6 changes: 5 additions & 1 deletion crates/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,8 @@ path = "src/example_table.rs"

[[example]]
name = "example-upsert-lookup"
path = "src/example_kv_table.rs"
path = "src/example_kv_table.rs"

[[example]]
name = "example-partitioned-upsert-lookup"
path = "src/example_partitioned_kv_table.rs"
153 changes: 153 additions & 0 deletions crates/examples/src/example_partitioned_kv_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use clap::Parser;
use fluss::client::{FlussAdmin, FlussConnection, UpsertWriter};
use fluss::config::Config;
use fluss::error::Result;
use fluss::metadata::{DataTypes, PartitionSpec, Schema, TableDescriptor, TablePath};
use fluss::row::{GenericRow, InternalRow};
use std::collections::HashMap;

#[tokio::main]
#[allow(dead_code)]
pub async fn main() -> Result<()> {
let mut config = Config::parse();
config.bootstrap_server = Some("127.0.0.1:9123".to_string());

let conn = FlussConnection::new(config).await?;

let table_descriptor = TableDescriptor::builder()
.schema(
Schema::builder()
.column("id", DataTypes::int())
.column("region", DataTypes::string())
.column("zone", DataTypes::bigint())
.column("score", DataTypes::bigint())
.primary_key(vec![
"id".to_string(),
"region".to_string(),
"zone".to_string(),
])
.build()?,
)
.partitioned_by(vec!["region".to_string(), "zone".to_string()])
.build()?;

let table_path = TablePath::new("fluss".to_owned(), "partitioned_kv_example".to_owned());

let mut admin = conn.get_admin().await?;
admin
.create_table(&table_path, &table_descriptor, true)
.await?;
println!(
"Created KV Table:\n {}\n",
admin.get_table(&table_path).await?
);

create_partition(&table_path, &mut admin, "APAC", 1).await;
create_partition(&table_path, &mut admin, "EMEA", 2).await;
create_partition(&table_path, &mut admin, "US", 3).await;

let table = conn.get_table(&table_path).await?;
let table_upsert = table.new_upsert()?;
let mut upsert_writer = table_upsert.create_writer()?;

println!("\n=== Upserting ===");
for (id, region, zone, score) in [
(1001, "APAC", 1i64, 1234i64),
(1002, "EMEA", 2, 2234),
(1003, "US", 3, 3234),
] {
let mut row = GenericRow::new(4);
row.set_field(0, id);
row.set_field(1, region);
row.set_field(2, zone);
row.set_field(3, score);
upsert_writer.upsert(&row).await?;
println!("Upserted: {row:?}");
}

println!("\n=== Looking up ===");
let mut lookuper = table.new_lookup()?.create_lookuper()?;

for (id, region, zone) in [(1001, "APAC", 1i64), (1002, "EMEA", 2), (1003, "US", 3)] {
let result = lookuper
.lookup(&make_key(id, region, zone))
.await
.expect("lookup");
let row = result.get_single_row()?.unwrap();
println!(
"Found id={id}: region={}, zone={}, score={}",
row.get_string(1),
row.get_long(2),
row.get_long(3)
);
}

println!("\n=== Updating ===");
let mut row = GenericRow::new(4);
row.set_field(0, 1001);
row.set_field(1, "APAC");
row.set_field(2, 1i64);
row.set_field(3, 4321i64);
upsert_writer.upsert(&row).await?;
println!("Updated: {row:?}");

let result = lookuper.lookup(&make_key(1001, "APAC", 1)).await?;
let row = result.get_single_row()?.unwrap();
println!(
"Verified update: region={}, zone={}",
row.get_string(1),
row.get_long(2)
);

println!("\n=== Deleting ===");
let mut row = GenericRow::new(4);
row.set_field(0, 1002);
row.set_field(1, "EMEA");
row.set_field(2, 2i64);
upsert_writer.delete(&row).await?;
println!("Deleted: {row:?}");

let result = lookuper.lookup(&make_key(1002, "EMEA", 2)).await?;
if result.get_single_row()?.is_none() {
println!("Verified deletion");
}

Ok(())
}

async fn create_partition(table_path: &TablePath, admin: &mut FlussAdmin, region: &str, zone: i64) {
let mut partition_values = HashMap::new();
partition_values.insert("region".to_string(), region.to_string());
partition_values.insert("zone".to_string(), zone.to_string());
let partition_spec = PartitionSpec::new(partition_values);

admin
.create_partition(table_path, &partition_spec, true)
.await
.unwrap();
}

fn make_key(id: i32, region: &str, zone: i64) -> GenericRow<'static> {
let mut row = GenericRow::new(4);
row.set_field(0, id);
row.set_field(1, region.to_string());
row.set_field(2, zone);
row
}
1 change: 1 addition & 0 deletions crates/examples/src/example_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

mod example_kv_table;
mod example_partitioned_kv_table;

use clap::Parser;
use fluss::client::FlussConnection;
Expand Down
1 change: 0 additions & 1 deletion crates/fluss/src/bucketing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ pub trait BucketingFunction: Sync + Send {
fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32>;
}

#[allow(dead_code)]
impl dyn BucketingFunction {
/// Provides the bucketing function for a given [DataLakeFormat]
///
Expand Down
8 changes: 7 additions & 1 deletion crates/fluss/src/client/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::rpc::{RpcClient, ServerConnection};
use crate::error::{Error, Result};
use crate::proto::GetTableInfoResponse;
use crate::{BucketId, PartitionId, TableId};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::slice::from_ref;
use std::sync::Arc;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -111,6 +111,12 @@ impl FlussAdmin {
.admin_gateway
.request(GetTableRequest::new(table_path))
.await?;

// force update to avoid stale data in cache
self.metadata
.update_tables_metadata(&HashSet::from([table_path]), &HashSet::new(), vec![])
.await?;

let GetTableInfoResponse {
table_id,
schema_id,
Expand Down
5 changes: 0 additions & 5 deletions crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@ impl FlussConnection {
pub async fn get_table(&self, table_path: &TablePath) -> Result<FlussTable<'_>> {
self.metadata.update_table_metadata(table_path).await?;
let table_info = self.metadata.get_cluster().get_table(table_path)?.clone();
if table_info.is_partitioned() {
return Err(crate::error::Error::UnsupportedOperation {
message: "Partitioned tables are not supported".to_string(),
});
}
Ok(FlussTable::new(self, self.metadata.clone(), table_info))
}
}
81 changes: 70 additions & 11 deletions crates/fluss/src/client/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,14 @@ impl Metadata {
ServerType::CoordinatorServer,
);
let con = connections.get_connection(&server_node).await?;
let response = con.request(UpdateMetadataRequest::new(&[])).await?;

let response = con
.request(UpdateMetadataRequest::new(
&HashSet::default(),
&HashSet::new(),
vec![],
))
.await?;
Cluster::from_metadata_response(response, None)
}

Expand Down Expand Up @@ -95,7 +102,12 @@ impl Metadata {
Ok(())
}

pub async fn update_tables_metadata(&self, table_paths: &HashSet<&TablePath>) -> Result<()> {
pub async fn update_tables_metadata(
&self,
table_paths: &HashSet<&TablePath>,
physical_table_paths: &HashSet<&Arc<PhysicalTablePath>>,
partition_ids: Vec<i64>,
) -> Result<()> {
let maybe_server = {
let guard = self.cluster.read();
guard.get_one_available_server().cloned()
Expand All @@ -114,16 +126,19 @@ impl Metadata {

let conn = self.connections.get_connection(&server).await?;

let update_table_paths: Vec<&TablePath> = table_paths.iter().copied().collect();
let response = conn
.request(UpdateMetadataRequest::new(update_table_paths.as_slice()))
.request(UpdateMetadataRequest::new(
table_paths,
physical_table_paths,
partition_ids,
))
.await?;
self.update(response).await?;
Ok(())
}

pub async fn update_table_metadata(&self, table_path: &TablePath) -> Result<()> {
self.update_tables_metadata(&HashSet::from([table_path]))
self.update_tables_metadata(&HashSet::from([table_path]), &HashSet::new(), vec![])
.await
}

Expand All @@ -133,8 +148,9 @@ impl Metadata {
.iter()
.filter(|table_path| cluster_binding.opt_get_table(table_path).is_none())
.collect();

if !need_update_table_paths.is_empty() {
self.update_tables_metadata(&need_update_table_paths)
self.update_tables_metadata(&need_update_table_paths, &HashSet::new(), vec![])
.await?;
}
Ok(())
Expand All @@ -150,7 +166,48 @@ impl Metadata {
guard.clone()
}

pub fn leader_for(&self, table_bucket: &TableBucket) -> Option<ServerNode> {
const MAX_RETRY_TIMES: u8 = 3;

pub async fn leader_for(
&self,
table_path: &TablePath,
table_bucket: &TableBucket,
) -> Result<Option<ServerNode>> {
let leader = self.get_leader_for(table_bucket);

if leader.is_some() {
Ok(leader)
} else {
for _ in 0..Self::MAX_RETRY_TIMES {
if let Some(partition_id) = table_bucket.partition_id() {
self.update_tables_metadata(
&HashSet::from([table_path]),
&HashSet::new(),
vec![partition_id],
)
.await?;
} else {
self.update_tables_metadata(
&HashSet::from([table_path]),
&HashSet::new(),
vec![],
)
.await?;
}

let cluster = self.cluster.read();
let leader = cluster.leader_for(table_bucket);

if leader.is_some() {
return Ok(leader.cloned());
}
}

Ok(None)
}
}

fn get_leader_for(&self, table_bucket: &TableBucket) -> Option<ServerNode> {
let cluster = self.cluster.read();
cluster.leader_for(table_bucket).cloned()
}
Expand All @@ -173,14 +230,16 @@ mod tests {
use crate::metadata::{TableBucket, TablePath};
use crate::test_utils::build_cluster_arc;

#[test]
fn leader_for_returns_server() {
#[tokio::test]
async fn leader_for_returns_server() {
let table_path = TablePath::new("db".to_string(), "tbl".to_string());
let cluster = build_cluster_arc(&table_path, 1, 1);
let metadata = Metadata::new_for_test(cluster);
let leader = metadata
.leader_for(&TableBucket::new(1, 0))
.expect("leader");
.leader_for(&table_path, &TableBucket::new(1, 0))
.await
.expect("leader request should be Ok")
.expect("leader should exist");
assert_eq!(leader.id(), 1);
}

Expand Down
Loading
Loading