Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 62 additions & 50 deletions lib/runtime/src/transports/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,18 @@ use etcd_client::{
pub use etcd_client::{ConnectOptions, KeyValue, LeaseClient};
use tokio::time::{Duration, interval};

mod connector;
mod lease;
mod lock;
mod path;

use connector::Connector;
use lease::*;
pub use lock::*;
pub use path::*;

use super::utils::build_in_runtime;

/// ETCD Client
#[derive(Clone)]
pub struct Client {
client: etcd_client::Client,
primary_lease: u64,
runtime: Runtime,
rt: Arc<tokio::runtime::Runtime>,
}

impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "etcd::Client primary_lease={}", self.primary_lease)
}
}

#[derive(Debug, Clone)]
pub struct Lease {
/// ETCD lease ID
Expand Down Expand Up @@ -86,6 +73,21 @@ impl Lease {
}
}

/// ETCD Client
#[derive(Clone)]
pub struct Client {
connector: Arc<Connector>,
primary_lease: u64,
runtime: Runtime,
rt: Arc<tokio::runtime::Runtime>,
}

impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "etcd::Client primary_lease={}", self.primary_lease)
}
}

impl Client {
pub fn builder() -> ClientOptionsBuilder {
ClientOptionsBuilder::default()
Expand All @@ -102,24 +104,23 @@ impl Client {
pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> {
let token = runtime.primary_token();

let ((client, lease_id), rt) = build_in_runtime(
let ((connector, lease_id), rt) = build_in_runtime(
async move {
let client = etcd_client::Client::connect(
config.etcd_url.clone(),
config.etcd_connect_options,
)
.await
.with_context(|| {
format!(
"Unable to connect to etcd server at {}. Check etcd server status",
config.etcd_url.join(", ")
)
})?;
let etcd_urls = config.etcd_url.clone();
let connect_options = config.etcd_connect_options.clone();

// Create the connector
let connector = Connector::new(etcd_urls, connect_options)
.await
.with_context(|| {
format!(
"Unable to connect to etcd server at {}. Check etcd server status",
config.etcd_url.join(", ")
)
})?;

let lease_id = if config.attach_lease {
let lease_client = client.lease_client();

let lease = create_lease(lease_client, 10, token)
let lease = create_lease(connector.clone(), 10, token)
.await
.with_context(|| {
format!(
Expand All @@ -133,23 +134,24 @@ impl Client {
0
};

Ok((client, lease_id))
Ok((connector, lease_id))
},
1,
)
.await?;

Ok(Client {
client,
connector,
primary_lease: lease_id,
rt,
runtime,
})
}

/// Get a reference to the underlying [`etcd_client::Client`] instance.
pub(crate) fn etcd_client(&self) -> &etcd_client::Client {
&self.client
/// Get a clone of the underlying [`etcd_client::Client`] instance.
/// This returns a clone since the client is behind an RwLock.
pub fn etcd_client(&self) -> etcd_client::Client {
self.connector.get_client()
}

/// Get the primary lease ID.
Expand All @@ -169,16 +171,16 @@ impl Client {
/// This [`Lease`] will be tied to the [`Runtime`], specifically a child [`CancellationToken`].
pub async fn create_lease(&self, ttl: u64) -> Result<Lease> {
let token = self.runtime.child_token();
let lease_client = self.client.lease_client();
self.rt
.spawn(create_lease(lease_client, ttl, token))
.spawn(create_lease(self.connector.clone(), ttl, token))
.await?
}

// Revoke an etcd lease given its lease id. A wrapper over etcd_client::LeaseClient::revoke
pub async fn revoke_lease(&self, lease_id: u64) -> Result<()> {
let lease_client = self.client.lease_client();
self.rt.spawn(revoke_lease(lease_client, lease_id)).await?
self.rt
.spawn(revoke_lease(self.connector.clone(), lease_id))
.await?
}

pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<u64>) -> Result<()> {
Expand All @@ -193,7 +195,7 @@ impl Client {
]);

// Execute the transaction
let result = self.client.kv_client().txn(txn).await?;
let result = self.connector.get_client().kv_client().txn(txn).await?;

if result.succeeded() {
Ok(())
Expand Down Expand Up @@ -232,7 +234,7 @@ impl Client {
]);

// Execute the transaction
let result = self.client.kv_client().txn(txn).await?;
let result = self.connector.get_client().kv_client().txn(txn).await?;

// We have to enumerate the response paths to determine if the transaction succeeded
if result.succeeded() {
Expand Down Expand Up @@ -266,7 +268,8 @@ impl Client {
let id = lease_id.unwrap_or(self.lease_id());
let put_options = PutOptions::new().with_lease(id as i64);
let _ = self
.client
.connector
.get_client()
.kv_client()
.put(key.as_ref(), value.as_ref(), Some(put_options))
.await?;
Expand All @@ -282,7 +285,8 @@ impl Client {
let options = options
.unwrap_or_default()
.with_lease(self.primary_lease().id() as i64);
self.client
self.connector
.get_client()
.kv_client()
.put(key.as_ref(), value.as_ref(), Some(options))
.await
Expand All @@ -294,7 +298,12 @@ impl Client {
key: impl Into<Vec<u8>>,
options: Option<GetOptions>,
) -> Result<Vec<KeyValue>> {
let mut get_response = self.client.kv_client().get(key, options).await?;
let mut get_response = self
.connector
.get_client()
.kv_client()
.get(key, options)
.await?;
Ok(get_response.take_kvs())
}

Expand All @@ -303,7 +312,8 @@ impl Client {
key: impl Into<Vec<u8>>,
options: Option<DeleteOptions>,
) -> Result<u64> {
self.client
self.connector
.get_client()
.kv_client()
.delete(key, options)
.await
Expand All @@ -313,7 +323,8 @@ impl Client {

pub async fn kv_get_prefix(&self, prefix: impl AsRef<str>) -> Result<Vec<KeyValue>> {
let mut get_response = self
.client
.connector
.get_client()
.kv_client()
.get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
.await?;
Expand All @@ -328,7 +339,7 @@ impl Client {
key: impl Into<Vec<u8>>,
lease_id: Option<u64>,
) -> Result<LockResponse> {
let mut lock_client = self.client.lock_client();
let mut lock_client = self.connector.get_client().lock_client();
let id = lease_id.unwrap_or(self.lease_id());
let options = LockOptions::new().with_lease(id as i64);
lock_client
Expand All @@ -339,7 +350,7 @@ impl Client {

/// Release a distributed lock using the key from the LockResponse
pub async fn unlock(&self, lock_key: impl Into<Vec<u8>>) -> Result<()> {
let mut lock_client = self.client.lock_client();
let mut lock_client = self.connector.get_client().lock_client();
lock_client
.unlock(lock_key)
.await
Expand Down Expand Up @@ -367,8 +378,9 @@ impl Client {
prefix: impl AsRef<str> + std::fmt::Display,
include_existing: bool,
) -> Result<PrefixWatcher> {
let mut kv_client = self.client.kv_client();
let mut watch_client = self.client.watch_client();
let client = self.connector.get_client();
let mut kv_client = client.kv_client();
let mut watch_client = client.watch_client();

let mut get_response = kv_client
.get(prefix.as_ref(), Some(GetOptions::new().with_prefix()))
Expand Down
Loading
Loading