Skip to content

Commit

Permalink
implement meta and apply observability to meta (#2823)
Browse files Browse the repository at this point in the history
* feat: implemented get and new

* feat: implement delete and set

* feat: add init and init tracer

* feat: add observability config

* feat: downgrade jeager and add client test

* fix: modify package name of test_client

* refactor: format some files

* feat: add tracer

* feat: add tracer

* otel migration

* move test client

* update rust version

---------

Co-authored-by: smorihira <[email protected]>
Co-authored-by: Kiichiro YUKAWA <[email protected]>
  • Loading branch information
3 people authored Feb 19, 2025
1 parent db7898c commit 13f0769
Show file tree
Hide file tree
Showing 18 changed files with 660 additions and 432 deletions.
587 changes: 248 additions & 339 deletions rust/Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions rust/bin/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ edition = "2021"
[dependencies]
algorithm = { version = "0.1.0", path = "../../libs/algorithm" }
qbg = { version = "0.1.0", path = "../../libs/algorithms/qbg" }
anyhow = "1.0.88"
anyhow = "1.0.95"
cargo = "0.81.0"
config = "0.14.0"
flexi_logger = "0.29"
log = "0.4"
prost = "0.13.2"
prost-types = "0.13.2"
prost = "0.13.4"
prost-types = "0.13.4"
proto = { version = "0.1.0", path = "../../libs/proto" }
tokio = { version = "1.40.0", features = ["full"] }
tokio-stream = { version = "0.1.16", features = ["full"] }
tonic = "0.12.2"
tonic-types = "0.12.2"
tokio = { version = "1.43.0", features = ["full"] }
tokio-stream = { version = "0.1.17", features = ["full"] }
tonic = "0.12.3"
tonic-types = "0.12.3"
15 changes: 13 additions & 2 deletions rust/bin/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ version = "0.1.0"
edition = "2021"

[dependencies]
kv = "0.24.0"
opentelemetry = "0.27.1"
prost-types = "0.13.4"
proto = { version = "0.1.0", path = "../../libs/proto" }
tokio = { version = "1.40.0", features = ["full"] }
tonic = "0.12.2"
sled = "0.34.7"
tokio = { version = "1.43.0", features = ["full"] }
tonic = "0.12.3"
observability = { path = "../../libs/observability" }
defer = "0.2.1"

[[bin]]
name = "test_client"
path = "src/test_client.rs"
doc = false
20 changes: 18 additions & 2 deletions rust/bin/meta/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,23 @@
//
mod meta;

#[derive(Default, Debug)]
use std::sync::Arc;
use kv::*;

pub struct Meta {

store: Arc<Store>,
bucket: Bucket<'static, Raw, Raw>,
}

impl Meta {
pub fn new(cfg_path: &str) -> Result<Self, kv::Error> {
let cfg = Config::new(cfg_path);
let store = Arc::new(Store::new(cfg)?);
let bucket = store.bucket::<Raw, Raw>(Some("meta_bucket"))?;

Ok(Meta { store, bucket })
}
}



104 changes: 97 additions & 7 deletions rust/bin/meta/src/handler/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,117 @@
// limitations under the License.
//

use kv::*;
use defer::defer;
use opentelemetry::{trace::{Tracer, TraceContextExt}, KeyValue, Context};
use observability::{ctx_span, tracer};
use proto::{meta::v1::meta_server, payload::v1::{meta, Empty}};

#[tonic::async_trait]
impl meta_server::Meta for super::Meta {
async fn get(
&self,
_request: tonic::Request<meta::Key>,
request: tonic::Request<meta::Key>,
) -> std::result::Result<tonic::Response<meta::Value>, tonic::Status> {
todo!()
let parent_ctx = request.extensions().get::<Context>().cloned().unwrap_or_else(Context::new);
let ctx = ctx_span!(&parent_ctx, "Meta::get");
defer!(ctx.span().end());

let key = request.into_inner().key;
let raw_key = Raw::from(key.as_bytes());

match self.bucket.get(&raw_key) {
Ok(Some(value_bytes)) => {
ctx.span().add_event("Key found", vec![KeyValue::new("key", key.clone())]);

let any_value = prost_types::Any {
type_url: "type.googleapis.com/your.package.MessageType".to_string(),
value: value_bytes.to_vec(),
};
let response = meta::Value {
value: Some(any_value),
};

Ok(tonic::Response::new(response))
},
Ok(None) => {
ctx.span().add_event("Key not found", vec![KeyValue::new("key", key)]);
Err(tonic::Status::not_found("Key not found"))
}
Err(e) => {
ctx.span().add_event("Database error", vec![KeyValue::new("error", e.to_string())]);
Err(tonic::Status::internal(format!("Database error: {}", e)))
}
}
}

async fn set(
&self,
_request: tonic::Request<meta::KeyValue>,
request: tonic::Request<meta::KeyValue>,
) -> std::result::Result<tonic::Response<Empty>, tonic::Status> {
todo!()
let parent_ctx = request.extensions().get::<Context>().cloned().unwrap_or_else(Context::new);
let ctx = ctx_span!(&parent_ctx, "Meta::set");
defer!(ctx.span().end());

let key_value = request.into_inner();

let key = match key_value.key {
Some(k) => k.key,
None => {
ctx.span().add_event("Invalid argument", vec![KeyValue::new("error", "Key is missing")]);
return Err(tonic::Status::invalid_argument("Key is missing"));
}
};

let value = match key_value.value {
Some(v) => match v.value {
Some(any_value) => any_value.value,
None => {
ctx.span().add_event("Invalid argument", vec![KeyValue::new("error", "Value is missing")]);
return Err(tonic::Status::invalid_argument("Value is missing"));
}
},
None => {
ctx.span().add_event("Invalid argument", vec![KeyValue::new("error", "Value is missing")]);
return Err(tonic::Status::invalid_argument("Value is missing"));
}
};

let raw_key = Raw::from(key.as_bytes());
let raw_value = sled::IVec::from(value);

match self.bucket.set(&raw_key, &raw_value) {
Ok(_) => {
ctx.span().add_event("Value set successfully", vec![KeyValue::new("key", key)]);
Ok(tonic::Response::new(Empty {}))
},
Err(e) => {
ctx.span().add_event("Failed to set value", vec![KeyValue::new("error", e.to_string())]);
Err(tonic::Status::internal(format!("Failed to set value: {}", e)))
}
}
}

async fn delete(
&self,
_request: tonic::Request<meta::Key>,
request: tonic::Request<meta::Key>,
) -> std::result::Result<tonic::Response<Empty>, tonic::Status> {
todo!()
let parent_ctx = request.extensions().get::<Context>().cloned().unwrap_or_else(Context::new);
let ctx = ctx_span!(&parent_ctx, "Meta::delete");
defer!(ctx.span().end());

let key = request.into_inner().key;
let raw_key = Raw::from(key.as_bytes());

match self.bucket.remove(&raw_key) {
Ok(_) => {
ctx.span().add_event("Key deleted successfully", vec![KeyValue::new("key", key)]);
Ok(tonic::Response::new(Empty {}))
},
Err(e) => {
ctx.span().add_event("Failed to delete key", vec![KeyValue::new("error", e.to_string())]);
Err(tonic::Status::internal(format!("Failed to delete key: {}", e)))
}
}
}
}
76 changes: 72 additions & 4 deletions rust/bin/meta/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,83 @@

mod handler;

use observability::{
config::{Config, Tracer}, observability::{Observability, ObservabilityImpl, SERVICE_NAME}
};
use opentelemetry::global;
use opentelemetry::propagation::Extractor;
use tonic::transport::Server;
use tonic::Request;

struct MetadataMap<'a>(&'a tonic::metadata::MetadataMap);

impl<'a> Extractor for MetadataMap<'a> {
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|metadata| metadata.to_str().ok())
}

fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(|key| match key {
tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
tonic::metadata::KeyRef::Binary(v) => v.as_str(),
})
.collect::<Vec<_>>()
}
}

fn intercept(mut req: Request<()>) -> Result<Request<()>, tonic::Status> {
let parent_cx =
global::get_text_map_propagator(|prop| prop.extract(&MetadataMap(req.metadata())));
req.extensions_mut().insert(parent_cx);
Ok(req)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:8081".parse()?;
let meta = handler::Meta::default();
// TODO: yaml is given from outside
// let config_yaml = r#"enabled: false
// endpoint: ""
// attributes: []
// tracer:
// enabled: false
// meter:
// enabled: false
// export_duration:
// secs: 60
// nanos: 0
// export_timeout_duration:
// secs: 30
// nanos: 0
// "#;
//
// decode config yaml
// let observability_cfg = serde_yaml::from_str(config_yaml).unwrap();
let observability_cfg = Config::new()
.enabled(true)
.attribute(SERVICE_NAME, "vald-lb-gateway")
.attribute("target_pod", "target_pod")
.attribute("target_node", "target_node")
.attribute("exported_kubernetes_namaspace", "default")
.attribute("kubernetes_name", "vald-lb-gateway")
.endpoint("http://127.0.0.1:4318")
.tracer(Tracer::new().enabled(true));
let mut observability = ObservabilityImpl::new(observability_cfg)?;

tonic::transport::Server::builder()
.add_service(proto::meta::v1::meta_server::MetaServer::new(meta))
let addr = "[::1]:8095".parse()?;
let cfg_path = "/tmp/meta/database"; // TODO: set the appropriate path
let meta = handler::Meta::new(cfg_path)?;

// the interceptor given here is implicitly executed for each request
Server::builder()
.add_service(proto::meta::v1::meta_server::MetaServer::with_interceptor(
meta, intercept,
))
.serve(addr)
.await?;

observability.shutdown()?;
Ok(())
}

Loading

0 comments on commit 13f0769

Please sign in to comment.