diff --git a/Cargo.lock b/Cargo.lock index c9609016a..d2e29aa9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1887,6 +1887,7 @@ dependencies = [ "async-lock", "async-trait", "axum", + "base64 0.21.7", "bytes", "futures", "http-body 1.0.1", @@ -1974,6 +1975,7 @@ version = "0.5.3" dependencies = [ "async-lock", "async-trait", + "base64 0.21.7", "bitflags", "blake3", "bytes", diff --git a/nativelink-service/Cargo.toml b/nativelink-service/Cargo.toml index 92b9e6b51..004ee4d4e 100644 --- a/nativelink-service/Cargo.toml +++ b/nativelink-service/Cargo.toml @@ -25,6 +25,7 @@ tonic = { version = "0.12.3", features = ["transport", "tls"], default-features tower = { version = "0.5.1", default-features = false } tracing = { version = "0.1.40", default-features = false } uuid = { version = "1.10.0", default-features = false, features = ["v4", "serde"] } +base64 = "0.21" [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } diff --git a/nativelink-service/src/bep_server.rs b/nativelink-service/src/bep_server.rs index 7cfa9172f..9b68a2770 100644 --- a/nativelink-service/src/bep_server.rs +++ b/nativelink-service/src/bep_server.rs @@ -14,6 +14,7 @@ use std::borrow::Cow; use std::pin::Pin; +use std::sync::Arc; use bytes::BytesMut; use futures::stream::unfold; @@ -30,7 +31,8 @@ use nativelink_store::store_manager::StoreManager; use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike}; use prost::Message; use tonic::{Request, Response, Result, Status, Streaming}; -use tracing::{instrument, Level}; +use tracing::{instrument, Level, event}; +use nativelink_util::background_spawn; pub struct BepServer { store: Store, @@ -40,10 +42,88 @@ impl BepServer { pub fn new( config: &nativelink_config::cas_server::BepConfig, store_manager: &StoreManager, + metadata_rx: Arc>> ) -> Result { + // Clone the store here and put that into a new thread for publishing via store api let store = store_manager .get_store(&config.store) .err_tip(|| format!("Expected store {} to exist in store manager", &config.store))?; + let metadata_store = store.clone(); + + background_spawn!("bep_metadata_event_flusher", async move { + let mut metadata_rx = metadata_rx.lock().await; + loop { + tokio::select! { + metadata_event = metadata_rx.recv() => { + if let Some(me) = metadata_event { + let md = me.request_metadata_tracer.get_metadata(); + match md { + Ok(rmd) => { + let mut buf = BytesMut::new(); + let name = &me.name; + let tool_invocation_id = &rmd.tool_invocation_id; + let action_id = &rmd.action_id; + let correlated_invocations_id = &rmd.correlated_invocations_id; + let action_mnemonic = &rmd.action_mnemonic; + let target_id = &rmd.target_id; + let configuration_id = &rmd.configuration_id; + + // TODO: assert strings are not empty. + let store_key = StoreKey::Str(Cow::Owned(format!( + "RequestMetadata:{}:{}:{}:{}:{}:{}:{}", + tool_invocation_id, + correlated_invocations_id, + action_id, + name, + target_id, + configuration_id, + action_mnemonic + ))); + + let encoding_result = rmd.encode(&mut buf); + + match encoding_result { + Ok(_) => { + let store_result = metadata_store.update_oneshot( + store_key, + buf.freeze() + ).await; + + match store_result { + Ok(_) => {} + Err(err) => { + // Barf + let store_key = StoreKey::Str(Cow::Owned(format!( + "RequestMetadata:{}:{}:{}:{}:{}:{}:{}", + tool_invocation_id, + correlated_invocations_id, + action_id, + name, + target_id, + configuration_id, + action_mnemonic + ))); + event!(Level::ERROR, ?err, ?store_key, "Failed to store result"); + } + } + }, + Err(err) => { + event!(Level::ERROR, ?err, ?rmd, "Failed to encode RequestMetadata buffer"); + } + } + }, + Err(err) => { + event!(Level::ERROR, ?err, ?me, "Failed to deserialize metadata"); + } + } + + } else { + event!(Level::TRACE, "metadata_event is empty"); + } + } + } + } + }); Ok(Self { store }) } diff --git a/nativelink-service/src/capabilities_server.rs b/nativelink-service/src/capabilities_server.rs index 99b14931f..feab750d8 100644 --- a/nativelink-service/src/capabilities_server.rs +++ b/nativelink-service/src/capabilities_server.rs @@ -27,23 +27,27 @@ use nativelink_proto::build::bazel::remote::execution::v2::{ ActionCacheUpdateCapabilities, CacheCapabilities, ExecutionCapabilities, GetCapabilitiesRequest, PriorityCapabilities, ServerCapabilities, }; +use nativelink_util::request_metadata_tracer; use nativelink_proto::build::bazel::semver::SemVer; use nativelink_util::digest_hasher::default_digest_hasher_func; use nativelink_util::operation_state_manager::ClientStateManager; use tonic::{Request, Response, Status}; use tracing::{event, instrument, Level}; +use tracing::error_span; const MAX_BATCH_TOTAL_SIZE: i64 = 64 * 1024; #[derive(Debug, Default)] pub struct CapabilitiesServer { supported_node_properties_for_instance: HashMap>, + metadata_tx: Option> } impl CapabilitiesServer { pub async fn new( config: &HashMap, scheduler_map: &HashMap>, + metadata_tx: tokio::sync::mpsc::UnboundedSender ) -> Result { let mut supported_node_properties_for_instance = HashMap::new(); for (instance_name, cfg) in config { @@ -80,25 +84,16 @@ impl CapabilitiesServer { } Ok(CapabilitiesServer { supported_node_properties_for_instance, + metadata_tx: Some(metadata_tx.clone()) }) } pub fn into_service(self) -> Server { Server::new(self) } -} -#[tonic::async_trait] -impl Capabilities for CapabilitiesServer { #[allow(clippy::blocks_in_conditions)] - #[instrument( - err, - ret(level = Level::INFO), - level = Level::ERROR, - skip_all, - fields(request = ?grpc_request.get_ref()) - )] - async fn get_capabilities( + async fn inner_get_capabilities( &self, grpc_request: Request, ) -> Result, Status> { @@ -156,3 +151,41 @@ impl Capabilities for CapabilitiesServer { Ok(Response::new(resp)) } } + +#[tonic::async_trait] +impl Capabilities for CapabilitiesServer { + #[allow(clippy::blocks_in_conditions)] + #[instrument( + err, + ret(level = Level::INFO), + level = Level::ERROR, + skip_all, + fields(request = ?grpc_request.get_ref()) + )] + async fn get_capabilities( + &self, + grpc_request: Request, + ) -> Result, Status> { + let metadata_bin = request_metadata_tracer::extract_request_metadata_bin(&grpc_request); + let result = match metadata_bin { + Some( + request_metadata_tracer + ) => { + let metadata_bin_str = request_metadata_tracer.metadata; + let context = request_metadata_tracer::make_ctx_request_metadata_tracer(&metadata_bin_str, &self.metadata_tx.as_ref().unwrap()).err_tip(|| "Unable to parse request metadata")?; + context.wrap_async( + error_span!("get_capabilities"), + CapabilitiesServer::inner_get_capabilities(self, grpc_request) + ).await + .map_err(Into::into) + }, + _ => { + CapabilitiesServer::inner_get_capabilities(self, grpc_request).await + } + }; + + // Grab from origin and emit event + request_metadata_tracer::emit_metadata_event(String::from("get_capabilities")); + return result + } +} diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel index ac17063f1..fbe3dc3a4 100644 --- a/nativelink-util/BUILD.bazel +++ b/nativelink-util/BUILD.bazel @@ -28,6 +28,7 @@ rust_library( "src/origin_context.rs", "src/platform_properties.rs", "src/proto_stream_utils.rs", + "src/request_metadata_tracer.rs", "src/resource_info.rs", "src/retry.rs", "src/store_trait.rs", @@ -45,6 +46,7 @@ rust_library( "//nativelink-metric", "//nativelink-proto", "@crates//:async-lock", + "@crates//:base64", "@crates//:bitflags", "@crates//:blake3", "@crates//:bytes", diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml index fb635af60..7fc2c03a0 100644 --- a/nativelink-util/Cargo.toml +++ b/nativelink-util/Cargo.toml @@ -42,6 +42,7 @@ tracing = { version = "0.1.40", default-features = false } tracing-subscriber = { version = "0.3.18", features = ["ansi", "env-filter", "json"], default-features = false } uuid = { version = "1.10.0", default-features = false, features = ["v4", "serde"] } mock_instant = "0.5.1" +base64 = "0.21" [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index 17edbf700..7867a21a7 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -30,6 +30,7 @@ pub mod operation_state_manager; pub mod origin_context; pub mod platform_properties; pub mod proto_stream_utils; +pub mod request_metadata_tracer; pub mod resource_info; pub mod retry; pub mod store_trait; diff --git a/nativelink-util/src/request_metadata_tracer.rs b/nativelink-util/src/request_metadata_tracer.rs new file mode 100644 index 000000000..5bce9e025 --- /dev/null +++ b/nativelink-util/src/request_metadata_tracer.rs @@ -0,0 +1,131 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use nativelink_error::ResultExt; +use crate::origin_context::{ActiveOriginContext, OriginContext}; +use crate::make_symbol; +use nativelink_error::{make_err, Code, Error}; +use std::sync::Arc; +use tonic::Request; +use base64::Engine; +use base64::engine::general_purpose::STANDARD_NO_PAD; +use nativelink_proto::build::bazel::remote::execution::v2::RequestMetadata; +use prost::Message; +use bytes; +use tracing::{event, Level}; +use tokio::sync::mpsc::UnboundedSender; + +// type RequestMetadataTraceFnType = dyn Fn(String) + Send + Sync; +type RequestMetadataTraceFnType = Box; +make_symbol!(REQUEST_METADATA_TRACE, RequestMetadataTraceFnType); + +const BAZEL_METADATA_KEY: &'static str = "build.bazel.remote.execution.v2.requestmetadata-bin"; + +#[derive(Clone, Debug)] +pub struct RequestMetadataTracer { + pub metadata: String, +} + +#[derive(Clone, Debug)] +pub struct MetadataEvent { + pub request_metadata_tracer: RequestMetadataTracer, + pub name: String // Name value of who emitted the event +} + + + +pub fn make_ctx_request_metadata_tracer(metadata_bin: &str, metadata_tx: &UnboundedSender) -> Result, Error> { + // let tx = ...; + // let sender = |data| { + // tx.send(Event { + // metadata, + // data, // # rust enum / simple string, the name of where thing is being called. + // }) + // }; + let mut new_ctx = ActiveOriginContext::fork().err_tip(|| "Must be in a ActiveOriginContext")?; + let metadata = RequestMetadataTracer { + metadata: metadata_bin.to_string() + }; + + let metadata_tx = metadata_tx.to_owned(); + + let sender: Box = Box::new(move |name: String| { + let metadata_event = MetadataEvent { + request_metadata_tracer: metadata.clone(), + name: name + }; + let _ = metadata_tx.send(metadata_event); + }); + + new_ctx.set_value(&REQUEST_METADATA_TRACE, Arc::new(sender)); + Ok(Arc::new(new_ctx)) +} + +pub fn emit_metadata_event(name: String) { + let sender_result = ActiveOriginContext::get_value(&REQUEST_METADATA_TRACE); + match sender_result { + Ok(Some(sender)) => { + sender(name) + }, + _ => { + event!(Level::WARN, "No event emitted"); + } + } +} + +pub fn extract_request_metadata_bin(request: &Request) -> Option { + let headers = request.metadata().clone().into_headers(); + let metadata_opt: Option<&hyper::header::HeaderValue> = headers.get(BAZEL_METADATA_KEY); + + if let Some(header_value) = metadata_opt { + match header_value.to_str() { + Ok(metadata_str) => return Some(RequestMetadataTracer { + metadata: metadata_str.to_string() + }), + Err(err) => { + event!( + Level::ERROR, + ?err, + "Unable to extract metadata from headers", + ); + return None + } + } + } + return None +} + +impl RequestMetadataTracer { + pub fn get_metadata(&self) -> Result { + let decoded = match STANDARD_NO_PAD.decode(self.metadata.clone()) { + Ok(decoded) => { + decoded + }, + Err(err) => { + return Err(make_err!(Code::Internal, "Could not convert request data from base64: {err}")); + } + }; + + let buf = bytes::BytesMut::from(decoded.as_slice()); + + let request_metadata = match RequestMetadata::decode(buf) { + Ok(request_metadata) => request_metadata, + Err(err) => { + return Err(make_err!(Code::Internal, "Could not convert grpc request from binary data: {err}")); + } + }; + + return Ok(request_metadata) + } +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 7e2276a63..d57722aa0 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -168,6 +168,9 @@ async fn inner_main( server_start_timestamp: u64, shutdown_tx: broadcast::Sender>>, ) -> Result<(), Box> { + // rx/tx, with the rx we spawn / pool for information and flush into redis + // bep service, pack data into bep. + fn into_encoding(from: HttpCompressionAlgorithm) -> Option { match from { HttpCompressionAlgorithm::gzip => Some(CompressionEncoding::Gzip), @@ -243,6 +246,20 @@ async fn inner_main( schedulers: action_schedulers.clone(), })); + let (metadata_tx, metadata_rx) = tokio::sync::mpsc::unbounded_channel::< + nativelink_util::request_metadata_tracer::MetadataEvent, + >(); + + // let metadata_rx: Arc>> = Arc::new(Mutex::new(metadata_rx)); + + let metadata_rx: Arc< + tokio::sync::Mutex< + tokio::sync::mpsc::UnboundedReceiver< + nativelink_util::request_metadata_tracer::MetadataEvent, + >, + >, + > = Arc::new(tokio::sync::Mutex::new(metadata_rx)); + for (server_cfg, connected_clients_mux) in servers_and_clients { let services = server_cfg.services.ok_or("'services' must be configured")?; @@ -364,6 +381,7 @@ async fn inner_main( CapabilitiesServer::new( services.capabilities.as_ref().unwrap(), &action_schedulers, + metadata_tx.clone(), ) }), ) @@ -422,7 +440,8 @@ async fn inner_main( services .experimental_bep .map_or(Ok(None), |cfg| { - BepServer::new(&cfg, &store_manager).map(|v| { + // Pass the rx into this service. + BepServer::new(&cfg, &store_manager, metadata_rx.clone()).map(|v| { let mut service = v.into_service(); let send_algo = &http_config.compression.send_compression_algorithm; if let Some(encoding) =