Skip to content

Commit

Permalink
Trace grpc metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam Singer committed Nov 14, 2024
1 parent b6cf659 commit 643aa0c
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 13 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nativelink-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ tonic = { version = "0.12.3", features = ["transport", "tls"], default-features
tower = { version = "0.5.1", default-features = false }
tracing = { version = "0.1.40", default-features = false }
uuid = { version = "1.10.0", default-features = false, features = ["v4", "serde"] }
base64 = "0.21"

[dev-dependencies]
nativelink-macro = { path = "../nativelink-macro" }
Expand Down
82 changes: 81 additions & 1 deletion nativelink-service/src/bep_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::borrow::Cow;
use std::pin::Pin;
use std::sync::Arc;

use bytes::BytesMut;
use futures::stream::unfold;
Expand All @@ -30,7 +31,8 @@ use nativelink_store::store_manager::StoreManager;
use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike};
use prost::Message;
use tonic::{Request, Response, Result, Status, Streaming};
use tracing::{instrument, Level};
use tracing::{instrument, Level, event};
use nativelink_util::background_spawn;

pub struct BepServer {
store: Store,
Expand All @@ -40,10 +42,88 @@ impl BepServer {
pub fn new(
config: &nativelink_config::cas_server::BepConfig,
store_manager: &StoreManager,
metadata_rx: Arc<tokio::sync::Mutex<tokio::sync::mpsc::UnboundedReceiver<nativelink_util::request_metadata_tracer::MetadataEvent>>>
) -> Result<Self, Error> {
// Clone the store here and put that into a new thread for publishing via store api
let store = store_manager
.get_store(&config.store)
.err_tip(|| format!("Expected store {} to exist in store manager", &config.store))?;
let metadata_store = store.clone();

background_spawn!("bep_metadata_event_flusher", async move {
let mut metadata_rx = metadata_rx.lock().await;
loop {
tokio::select! {
metadata_event = metadata_rx.recv() => {
if let Some(me) = metadata_event {
let md = me.request_metadata_tracer.get_metadata();
match md {
Ok(rmd) => {
let mut buf = BytesMut::new();
let name = &me.name;
let tool_invocation_id = &rmd.tool_invocation_id;
let action_id = &rmd.action_id;
let correlated_invocations_id = &rmd.correlated_invocations_id;
let action_mnemonic = &rmd.action_mnemonic;
let target_id = &rmd.target_id;
let configuration_id = &rmd.configuration_id;

// TODO: assert strings are not empty.
let store_key = StoreKey::Str(Cow::Owned(format!(
"RequestMetadata:{}:{}:{}:{}:{}:{}:{}",
tool_invocation_id,
correlated_invocations_id,
action_id,
name,
target_id,
configuration_id,
action_mnemonic
)));

let encoding_result = rmd.encode(&mut buf);

match encoding_result {
Ok(_) => {
let store_result = metadata_store.update_oneshot(
store_key,
buf.freeze()
).await;

match store_result {
Ok(_) => {}
Err(err) => {
// Barf
let store_key = StoreKey::Str(Cow::Owned(format!(
"RequestMetadata:{}:{}:{}:{}:{}:{}:{}",
tool_invocation_id,
correlated_invocations_id,
action_id,
name,
target_id,
configuration_id,
action_mnemonic
)));
event!(Level::ERROR, ?err, ?store_key, "Failed to store result");
}
}
},
Err(err) => {
event!(Level::ERROR, ?err, ?rmd, "Failed to encode RequestMetadata buffer");
}
}
},
Err(err) => {
event!(Level::ERROR, ?err, ?me, "Failed to deserialize metadata");
}
}

} else {
event!(Level::TRACE, "metadata_event is empty");
}
}
}
}
});

Ok(Self { store })
}
Expand Down
55 changes: 44 additions & 11 deletions nativelink-service/src/capabilities_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,27 @@ use nativelink_proto::build::bazel::remote::execution::v2::{
ActionCacheUpdateCapabilities, CacheCapabilities, ExecutionCapabilities,
GetCapabilitiesRequest, PriorityCapabilities, ServerCapabilities,
};
use nativelink_util::request_metadata_tracer;
use nativelink_proto::build::bazel::semver::SemVer;
use nativelink_util::digest_hasher::default_digest_hasher_func;
use nativelink_util::operation_state_manager::ClientStateManager;
use tonic::{Request, Response, Status};
use tracing::{event, instrument, Level};
use tracing::error_span;

const MAX_BATCH_TOTAL_SIZE: i64 = 64 * 1024;

#[derive(Debug, Default)]
pub struct CapabilitiesServer {
supported_node_properties_for_instance: HashMap<InstanceName, Vec<String>>,
metadata_tx: Option<tokio::sync::mpsc::UnboundedSender<nativelink_util::request_metadata_tracer::MetadataEvent>>
}

impl CapabilitiesServer {
pub async fn new(
config: &HashMap<InstanceName, CapabilitiesConfig>,
scheduler_map: &HashMap<String, Arc<dyn ClientStateManager>>,
metadata_tx: tokio::sync::mpsc::UnboundedSender<nativelink_util::request_metadata_tracer::MetadataEvent>
) -> Result<Self, Error> {
let mut supported_node_properties_for_instance = HashMap::new();
for (instance_name, cfg) in config {
Expand Down Expand Up @@ -80,25 +84,16 @@ impl CapabilitiesServer {
}
Ok(CapabilitiesServer {
supported_node_properties_for_instance,
metadata_tx: Some(metadata_tx.clone())
})
}

pub fn into_service(self) -> Server<CapabilitiesServer> {
Server::new(self)
}
}

#[tonic::async_trait]
impl Capabilities for CapabilitiesServer {
#[allow(clippy::blocks_in_conditions)]
#[instrument(
err,
ret(level = Level::INFO),
level = Level::ERROR,
skip_all,
fields(request = ?grpc_request.get_ref())
)]
async fn get_capabilities(
async fn inner_get_capabilities(
&self,
grpc_request: Request<GetCapabilitiesRequest>,
) -> Result<Response<ServerCapabilities>, Status> {
Expand Down Expand Up @@ -156,3 +151,41 @@ impl Capabilities for CapabilitiesServer {
Ok(Response::new(resp))
}
}

#[tonic::async_trait]
impl Capabilities for CapabilitiesServer {
#[allow(clippy::blocks_in_conditions)]
#[instrument(
err,
ret(level = Level::INFO),
level = Level::ERROR,
skip_all,
fields(request = ?grpc_request.get_ref())
)]
async fn get_capabilities(
&self,
grpc_request: Request<GetCapabilitiesRequest>,
) -> Result<Response<ServerCapabilities>, Status> {
let metadata_bin = request_metadata_tracer::extract_request_metadata_bin(&grpc_request);
let result = match metadata_bin {
Some(
request_metadata_tracer
) => {
let metadata_bin_str = request_metadata_tracer.metadata;
let context = request_metadata_tracer::make_ctx_request_metadata_tracer(&metadata_bin_str, &self.metadata_tx.as_ref().unwrap()).err_tip(|| "Unable to parse request metadata")?;
context.wrap_async(
error_span!("get_capabilities"),
CapabilitiesServer::inner_get_capabilities(self, grpc_request)
).await
.map_err(Into::into)
},
_ => {
CapabilitiesServer::inner_get_capabilities(self, grpc_request).await
}
};

// Grab from origin and emit event
request_metadata_tracer::emit_metadata_event(String::from("get_capabilities"));
return result
}
}
2 changes: 2 additions & 0 deletions nativelink-util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ rust_library(
"src/origin_context.rs",
"src/platform_properties.rs",
"src/proto_stream_utils.rs",
"src/request_metadata_tracer.rs",
"src/resource_info.rs",
"src/retry.rs",
"src/store_trait.rs",
Expand All @@ -45,6 +46,7 @@ rust_library(
"//nativelink-metric",
"//nativelink-proto",
"@crates//:async-lock",
"@crates//:base64",
"@crates//:bitflags",
"@crates//:blake3",
"@crates//:bytes",
Expand Down
1 change: 1 addition & 0 deletions nativelink-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ tracing = { version = "0.1.40", default-features = false }
tracing-subscriber = { version = "0.3.18", features = ["ansi", "env-filter", "json"], default-features = false }
uuid = { version = "1.10.0", default-features = false, features = ["v4", "serde"] }
mock_instant = "0.5.1"
base64 = "0.21"

[dev-dependencies]
nativelink-macro = { path = "../nativelink-macro" }
Expand Down
1 change: 1 addition & 0 deletions nativelink-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod operation_state_manager;
pub mod origin_context;
pub mod platform_properties;
pub mod proto_stream_utils;
pub mod request_metadata_tracer;
pub mod resource_info;
pub mod retry;
pub mod store_trait;
Expand Down
131 changes: 131 additions & 0 deletions nativelink-util/src/request_metadata_tracer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2024 The NativeLink Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use nativelink_error::ResultExt;
use crate::origin_context::{ActiveOriginContext, OriginContext};
use crate::make_symbol;
use nativelink_error::{make_err, Code, Error};
use std::sync::Arc;
use tonic::Request;
use base64::Engine;
use base64::engine::general_purpose::STANDARD_NO_PAD;
use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata;
use prost::Message;
use bytes;
use tracing::{event, Level};
use tokio::sync::mpsc::UnboundedSender;

// type RequestMetadataTraceFnType = dyn Fn(String) + Send + Sync;
type RequestMetadataTraceFnType = Box<dyn Fn(String) + Send + Sync + 'static>;
make_symbol!(REQUEST_METADATA_TRACE, RequestMetadataTraceFnType);

const BAZEL_METADATA_KEY: &'static str = "build.bazel.remote.execution.v2.requestmetadata-bin";

#[derive(Clone, Debug)]
pub struct RequestMetadataTracer {
pub metadata: String,
}

#[derive(Clone, Debug)]
pub struct MetadataEvent {
pub request_metadata_tracer: RequestMetadataTracer,
pub name: String // Name value of who emitted the event
}



pub fn make_ctx_request_metadata_tracer(metadata_bin: &str, metadata_tx: &UnboundedSender<MetadataEvent>) -> Result<Arc<OriginContext>, Error> {
// let tx = ...;
// let sender = |data| {
// tx.send(Event {
// metadata,
// data, // # rust enum / simple string, the name of where thing is being called.
// })
// };
let mut new_ctx = ActiveOriginContext::fork().err_tip(|| "Must be in a ActiveOriginContext")?;
let metadata = RequestMetadataTracer {
metadata: metadata_bin.to_string()
};

let metadata_tx = metadata_tx.to_owned();

let sender: Box<dyn Fn(String) + Send + Sync + 'static> = Box::new(move |name: String| {
let metadata_event = MetadataEvent {
request_metadata_tracer: metadata.clone(),
name: name
};
let _ = metadata_tx.send(metadata_event);
});

new_ctx.set_value(&REQUEST_METADATA_TRACE, Arc::new(sender));
Ok(Arc::new(new_ctx))
}

pub fn emit_metadata_event(name: String) {
let sender_result = ActiveOriginContext::get_value(&REQUEST_METADATA_TRACE);
match sender_result {
Ok(Some(sender)) => {
sender(name)
},
_ => {
event!(Level::WARN, "No event emitted");
}
}
}

pub fn extract_request_metadata_bin<T>(request: &Request<T>) -> Option<RequestMetadataTracer> {
let headers = request.metadata().clone().into_headers();
let metadata_opt: Option<&hyper::header::HeaderValue> = headers.get(BAZEL_METADATA_KEY);

if let Some(header_value) = metadata_opt {
match header_value.to_str() {
Ok(metadata_str) => return Some(RequestMetadataTracer {
metadata: metadata_str.to_string()
}),
Err(err) => {
event!(
Level::ERROR,
?err,
"Unable to extract metadata from headers",
);
return None
}
}
}
return None
}

impl RequestMetadataTracer {
pub fn get_metadata(&self) -> Result<RequestMetadata, Error> {
let decoded = match STANDARD_NO_PAD.decode(self.metadata.clone()) {
Ok(decoded) => {
decoded
},
Err(err) => {
return Err(make_err!(Code::Internal, "Could not convert request data from base64: {err}"));
}
};

let buf = bytes::BytesMut::from(decoded.as_slice());

let request_metadata = match RequestMetadata::decode(buf) {
Ok(request_metadata) => request_metadata,
Err(err) => {
return Err(make_err!(Code::Internal, "Could not convert grpc request from binary data: {err}"));
}
};

return Ok(request_metadata)
}
}
Loading

0 comments on commit 643aa0c

Please sign in to comment.