Skip to content

Commit f3c415e

Browse files
Fixed get_admin() to check before creating new admin
1 parent 847f5fe commit f3c415e

2 files changed

Lines changed: 15 additions & 2 deletions

File tree

crates/fluss/src/client/admin.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use std::collections::{HashMap, HashSet};
3636
use std::sync::Arc;
3737
use tokio::task::JoinHandle;
3838

39+
#[derive(Clone)]
3940
pub struct FlussAdmin {
4041
admin_gateway: ServerConnection,
4142
#[allow(dead_code)]

crates/fluss/src/client/connection.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::client::WriterClient;
18+
use crate::client::{WriterClient};
1919
use crate::client::admin::FlussAdmin;
2020
use crate::client::metadata::Metadata;
2121
use crate::client::table::FlussTable;
2222
use crate::config::Config;
2323
use crate::rpc::RpcClient;
2424
use parking_lot::RwLock;
2525
use std::sync::Arc;
26+
use tokio::sync::OnceCell;
2627

2728
use crate::error::{Error, FlussError, Result};
2829
use crate::metadata::TablePath;
@@ -32,6 +33,7 @@ pub struct FlussConnection {
3233
network_connects: Arc<RpcClient>,
3334
args: Config,
3435
writer_client: RwLock<Option<Arc<WriterClient>>>,
36+
admin_client: OnceCell<FlussAdmin>,
3537
}
3638

3739
impl FlussConnection {
@@ -44,6 +46,7 @@ impl FlussConnection {
4446
network_connects: connections.clone(),
4547
args: arg.clone(),
4648
writer_client: Default::default(),
49+
admin_client: OnceCell::new(),
4750
})
4851
}
4952

@@ -60,7 +63,16 @@ impl FlussConnection {
6063
}
6164

6265
pub async fn get_admin(&self) -> Result<FlussAdmin> {
63-
FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()).await
66+
// Lazily initialize and cache the FlussAdmin instance. The cached FlussAdmin
67+
// holds a reference to RpcClient, which manages connection reuse and re-acquisition
68+
// when a cached connection becomes poisoned. Subsequent calls clone cheaply —
69+
// all internal fields (ServerConnection, Arc<Metadata>, Arc<RpcClient>) are
70+
// Arc-backed so cloning is just a reference-count bump.
71+
let admin = self
72+
.admin_client
73+
.get_or_try_init(|| FlussAdmin::new(self.network_connects.clone(), self.metadata.clone()))
74+
.await?;
75+
Ok(admin.clone())
6476
}
6577

6678
pub fn get_or_create_writer_client(&self) -> Result<Arc<WriterClient>> {

0 commit comments

Comments
 (0)