diff --git a/crates/rollup-boost/src/client/rpc.rs b/crates/rollup-boost/src/client/rpc.rs index 8560496a..185a1bc4 100644 --- a/crates/rollup-boost/src/client/rpc.rs +++ b/crates/rollup-boost/src/client/rpc.rs @@ -474,7 +474,7 @@ pub mod tests { use assert_cmd::Command; use http::Uri; use jsonrpsee::core::client::ClientT; - use parking_lot::Mutex; + use std::sync::Mutex; use crate::payload::PayloadSource; use alloy_rpc_types_engine::JwtSecret; @@ -499,7 +499,12 @@ pub mod tests { LazyLock::new(|| Mutex::new(HashSet::new())); loop { let port: u16 = rand::random_range(1000..20000); - if TcpListener::bind(("127.0.0.1", port)).is_ok() && CLAIMED_PORTS.lock().insert(port) { + if TcpListener::bind(("127.0.0.1", port)).is_ok() + && CLAIMED_PORTS + .lock() + .expect("CLAIMED_PORTS poisoned") + .insert(port) + { return port; } } diff --git a/crates/rollup-boost/src/debug_api.rs b/crates/rollup-boost/src/debug_api.rs index 83328bd6..36c1c4c4 100644 --- a/crates/rollup-boost/src/debug_api.rs +++ b/crates/rollup-boost/src/debug_api.rs @@ -2,9 +2,9 @@ use jsonrpsee::core::{RpcResult, async_trait}; use jsonrpsee::http_client::HttpClient; use jsonrpsee::proc_macros::rpc; use jsonrpsee::server::Server; -use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use tokio::sync::Mutex; use crate::update_execution_mode_gauge; @@ -81,7 +81,7 @@ impl DebugServer { let server = Server::builder().build(debug_addr).await?; // Register the initial execution mode metric - let current_mode = self.execution_mode(); + let current_mode = self.execution_mode().await; update_execution_mode_gauge(current_mode); let handle = server.start(self.into_rpc()); @@ -95,12 +95,12 @@ impl DebugServer { Ok(()) } - pub fn execution_mode(&self) -> ExecutionMode { - *self.execution_mode.lock() + pub async fn execution_mode(&self) -> ExecutionMode { + *self.execution_mode.lock().await } - pub fn set_execution_mode(&self, mode: ExecutionMode) { - *self.execution_mode.lock() = mode; + pub async fn set_execution_mode(&self, mode: ExecutionMode) { + *self.execution_mode.lock().await = mode; update_execution_mode_gauge(mode); } } @@ -111,7 +111,7 @@ impl DebugApiServer for DebugServer { &self, request: SetExecutionModeRequest, ) -> RpcResult { - self.set_execution_mode(request.execution_mode); + self.set_execution_mode(request.execution_mode).await; tracing::info!("Set execution mode to {:?}", request.execution_mode); @@ -122,7 +122,7 @@ impl DebugApiServer for DebugServer { async fn get_execution_mode(&self) -> RpcResult { Ok(GetExecutionModeResponse { - execution_mode: self.execution_mode(), + execution_mode: self.execution_mode().await, }) } } diff --git a/crates/rollup-boost/src/flashblocks/service.rs b/crates/rollup-boost/src/flashblocks/service.rs index 209a3efc..47f216dc 100644 --- a/crates/rollup-boost/src/flashblocks/service.rs +++ b/crates/rollup-boost/src/flashblocks/service.rs @@ -422,7 +422,13 @@ mod tests { .await?; let get_payload_requests_builder = builder_mock.get_payload_requests.clone(); - assert_eq!(get_payload_requests_builder.lock().len(), 1); + assert_eq!( + get_payload_requests_builder + .lock() + .expect("get_payload_requests mutex poisoned") + .len(), + 1 + ); Ok(()) } @@ -455,7 +461,13 @@ mod tests { .await?; let get_payload_requests_builder = builder_mock.get_payload_requests.clone(); - assert_eq!(get_payload_requests_builder.lock().len(), 1); + assert_eq!( + get_payload_requests_builder + .lock() + .expect("get_payload_requests mutex poisoned") + .len(), + 1 + ); Ok(()) } diff --git a/crates/rollup-boost/src/health.rs b/crates/rollup-boost/src/health.rs index 045f958b..ed0b0632 100644 --- a/crates/rollup-boost/src/health.rs +++ b/crates/rollup-boost/src/health.rs @@ -4,7 +4,7 @@ use std::{ }; use alloy_rpc_types_eth::BlockNumberOrTag; -use parking_lot::Mutex; +use tokio::sync::Mutex; use tokio::{ task::JoinHandle, time::{Instant, sleep_until}, @@ -63,26 +63,26 @@ impl HealthHandle { .gt(&self.max_unsafe_interval) { warn!(target: "rollup_boost::health", curr_unix = %t, unsafe_unix = %block.header.timestamp, "L2 client - unsafe block timestamp is too old, updating health status to ServiceUnavailable"); - self.probes.set_health(Health::ServiceUnavailable); + self.probes.set_health(Health::ServiceUnavailable).await; sleep_until(Instant::now() + self.health_check_interval).await; continue; - } else if self.execution_mode.lock().is_disabled() - || self.execution_mode.lock().is_dry_run() + } else if self.execution_mode.lock().await.is_disabled() + || self.execution_mode.lock().await.is_dry_run() { - self.probes.set_health(Health::Healthy); + self.probes.set_health(Health::Healthy).await; sleep_until(Instant::now() + self.health_check_interval).await; continue; } } Err(e) => { warn!(target: "rollup_boost::health", "L2 client - Failed to get unsafe block {} - updating health status", e); - self.probes.set_health(Health::ServiceUnavailable); + self.probes.set_health(Health::ServiceUnavailable).await; sleep_until(Instant::now() + self.health_check_interval).await; continue; } }; - if self.execution_mode.lock().is_enabled() { + if self.execution_mode.lock().await.is_enabled() { // Only check builder client health if execution mode is enabled // If its unhealthy, set the health status to PartialContent match self @@ -95,14 +95,14 @@ impl HealthHandle { .gt(&self.max_unsafe_interval) { warn!(target: "rollup_boost::health", curr_unix = %t, unsafe_unix = %block.header.timestamp, "Builder client - unsafe block timestamp is too old updating health status"); - self.probes.set_health(Health::PartialContent); + self.probes.set_health(Health::PartialContent).await; } else { - self.probes.set_health(Health::Healthy); + self.probes.set_health(Health::Healthy).await; } } Err(e) => { warn!(target: "rollup_boost::health", "Builder client - Failed to get unsafe block {} - updating health status", e); - self.probes.set_health(Health::PartialContent); + self.probes.set_health(Health::PartialContent).await; } }; } @@ -327,7 +327,7 @@ mod tests { health_handle.spawn(); tokio::time::sleep(Duration::from_secs(2)).await; - assert!(matches!(probes.health(), Health::Healthy)); + assert!(matches!(probes.health().await, Health::Healthy)); Ok(()) } @@ -369,7 +369,7 @@ mod tests { health_handle.spawn(); tokio::time::sleep(Duration::from_secs(2)).await; - assert!(matches!(probes.health(), Health::PartialContent)); + assert!(matches!(probes.health().await, Health::PartialContent)); Ok(()) } @@ -411,7 +411,7 @@ mod tests { health_handle.spawn(); tokio::time::sleep(Duration::from_secs(2)).await; - assert!(matches!(probes.health(), Health::ServiceUnavailable)); + assert!(matches!(probes.health().await, Health::ServiceUnavailable)); Ok(()) } @@ -452,7 +452,7 @@ mod tests { health_handle.spawn(); tokio::time::sleep(Duration::from_secs(2)).await; - assert!(matches!(probes.health(), Health::Healthy)); + assert!(matches!(probes.health().await, Health::Healthy)); Ok(()) } @@ -493,7 +493,7 @@ mod tests { health_handle.spawn(); tokio::time::sleep(Duration::from_secs(2)).await; - assert!(matches!(probes.health(), Health::Healthy)); + assert!(matches!(probes.health().await, Health::Healthy)); Ok(()) } @@ -533,7 +533,7 @@ mod tests { health_handle.spawn(); tokio::time::sleep(Duration::from_secs(2)).await; - assert!(matches!(probes.health(), Health::PartialContent)); + assert!(matches!(probes.health().await, Health::PartialContent)); Ok(()) } @@ -574,7 +574,7 @@ mod tests { health_handle.spawn(); tokio::time::sleep(Duration::from_secs(2)).await; - assert!(matches!(probes.health(), Health::ServiceUnavailable)); + assert!(matches!(probes.health().await, Health::ServiceUnavailable)); Ok(()) } diff --git a/crates/rollup-boost/src/probe.rs b/crates/rollup-boost/src/probe.rs index f297e4d8..13874381 100644 --- a/crates/rollup-boost/src/probe.rs +++ b/crates/rollup-boost/src/probe.rs @@ -10,7 +10,7 @@ use jsonrpsee::{ http_client::{HttpRequest, HttpResponse}, server::HttpBody, }; -use parking_lot::Mutex; +use tokio::sync::Mutex; use tower::{Layer, Service}; use tracing::info; @@ -45,13 +45,13 @@ pub struct Probes { } impl Probes { - pub fn set_health(&self, value: Health) { + pub async fn set_health(&self, value: Health) { info!(target: "rollup_boost::probe", "Updating health probe to to {:?}", value); - *self.health.lock() = value; + *self.health.lock().await = value; } - pub fn health(&self) -> Health { - *self.health.lock() + pub async fn health(&self) -> Health { + *self.health.lock().await } } @@ -115,7 +115,7 @@ where async move { match request.uri().path() { // Return health status - "/healthz" => Ok(service.probes.health().into()), + "/healthz" => Ok(service.probes.health().await.into()), // Service is responding, and therefor ready "/readyz" => Ok(ok()), // Service is responding, and therefor live diff --git a/crates/rollup-boost/src/server.rs b/crates/rollup-boost/src/server.rs index aa68f5d8..49da7c20 100644 --- a/crates/rollup-boost/src/server.rs +++ b/crates/rollup-boost/src/server.rs @@ -35,11 +35,11 @@ use op_alloy_rpc_types_engine::{ OpPayloadAttributes, }; use opentelemetry::trace::SpanKind; -use parking_lot::Mutex; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::{debug, error, info, instrument}; @@ -201,7 +201,9 @@ impl RollupBoostServer { .await; // async call to builder to sync the builder node - if !self.execution_mode.lock().is_disabled() && !self.should_skip_unhealthy_builder() { + if !self.execution_mode.lock().await.is_disabled() + && !self.should_skip_unhealthy_builder().await + { let builder = self.builder_client.clone(); let new_payload_clone = new_payload.clone(); tokio::spawn(async move { builder.new_payload(new_payload_clone).await }); @@ -218,10 +220,10 @@ impl RollupBoostServer { // If execution mode is disabled, return the l2 payload without sending // the request to the builder - if self.execution_mode.lock().is_disabled() { + if self.execution_mode.lock().await.is_disabled() { return match l2_fut.await { Ok(payload) => { - self.probes.set_health(Health::Healthy); + self.probes.set_health(Health::Healthy).await; let context = PayloadSource::L2; tracing::Span::current().record("payload_source", context.to_string()); counter!("rpc.blocks_created", "source" => context.to_string()).increment(1); @@ -241,7 +243,7 @@ impl RollupBoostServer { } Err(e) => { - self.probes.set_health(Health::ServiceUnavailable); + self.probes.set_health(Health::ServiceUnavailable).await; Err(e.into()) } }; @@ -305,9 +307,14 @@ impl RollupBoostServer { // Evaluate the builder and l2 response and select the final payload let (payload, context) = { - let l2_payload = - l2_payload.inspect_err(|_| self.probes.set_health(Health::ServiceUnavailable))?; - self.probes.set_health(Health::Healthy); + let l2_payload = l2_payload.inspect_err(|_| { + // best-effort update; ignore await inside closure by spawning + let probes = self.probes.clone(); + tokio::spawn(async move { + probes.set_health(Health::ServiceUnavailable).await; + }); + })?; + self.probes.set_health(Health::Healthy).await; // Convert Result> to Option by extracting the inner Option. // If there's an error, log it and return None instead. @@ -334,7 +341,7 @@ impl RollupBoostServer { // If execution mode is set to DryRun, fallback to the l2_payload, // otherwise prefer the builder payload - if self.execution_mode.lock().is_dry_run() { + if self.execution_mode.lock().await.is_dry_run() { (l2_payload, PayloadSource::L2) } else if let Some(selection_policy) = &self.block_selection_policy { selection_policy.select_block(builder_payload, l2_payload) @@ -344,8 +351,8 @@ impl RollupBoostServer { } else { // Only update the health status if the builder payload fails // and execution mode is enabled - if self.execution_mode.lock().is_enabled() && builder_api_failed { - self.probes.set_health(Health::PartialContent); + if self.execution_mode.lock().await.is_enabled() && builder_api_failed { + self.probes.set_health(Health::PartialContent).await; } (l2_payload, PayloadSource::L2) } @@ -375,8 +382,8 @@ impl RollupBoostServer { Ok(payload) } - fn should_skip_unhealthy_builder(&self) -> bool { - self.ignore_unhealthy_builders && !matches!(self.probes.health(), Health::Healthy) + async fn should_skip_unhealthy_builder(&self) -> bool { + self.ignore_unhealthy_builders && !matches!(self.probes.health().await, Health::Healthy) } async fn calculate_external_state_root( @@ -522,12 +529,12 @@ impl EngineApiServer for RollupBoostServer { .fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()); // If execution mode is disabled, return the l2 client response immediately - if self.execution_mode.lock().is_disabled() { + if self.execution_mode.lock().await.is_disabled() { return Ok(l2_fut.await?); } // If traffic to the unhealthy builder is not allowed and the builder is unhealthy, - if self.should_skip_unhealthy_builder() { + if self.should_skip_unhealthy_builder().await { info!(message = "builder is unhealthy, skipping FCU to builder"); return Ok(l2_fut.await?); } @@ -754,10 +761,10 @@ pub mod tests { use jsonrpsee::RpcModule; use jsonrpsee::http_client::HttpClient; use jsonrpsee::server::{Server, ServerBuilder, ServerHandle}; - use parking_lot::Mutex; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; + use std::sync::Mutex; use tokio::time::sleep; #[derive(Debug, Clone)] @@ -880,7 +887,7 @@ pub mod tests { let (probe_layer, probes) = ProbeLayer::new(); // For tests, set initial health to Healthy since we don't run health checks - probes.set_health(Health::Healthy); + probes.set_health(Health::Healthy).await; let rollup_boost = RollupBoostServer::new( l2_rpc_client, @@ -964,9 +971,11 @@ pub mod tests { assert!(fcu_response.is_ok()); let fcu_requests = test_harness.l2_mock.fcu_requests.clone(); { - let fcu_requests_mu = fcu_requests.lock(); + let fcu_requests_mu = fcu_requests.lock().expect("fcu_requests mutex poisoned"); let fcu_requests_builder = test_harness.builder_mock.fcu_requests.clone(); - let fcu_requests_builder_mu = fcu_requests_builder.lock(); + let fcu_requests_builder_mu = fcu_requests_builder + .lock() + .expect("fcu_requests_builder mutex poisoned"); assert_eq!(fcu_requests_mu.len(), 1); assert_eq!(fcu_requests_builder_mu.len(), 1); let req: &(ForkchoiceState, Option) = @@ -991,10 +1000,14 @@ pub mod tests { assert!(new_payload_response.is_ok()); let new_payload_requests = test_harness.l2_mock.new_payload_requests.clone(); { - let new_payload_requests_mu = new_payload_requests.lock(); + let new_payload_requests_mu = new_payload_requests + .lock() + .expect("new_payloads_requests mutex poisoned"); let new_payload_requests_builder = test_harness.builder_mock.new_payload_requests.clone(); - let new_payload_requests_builder_mu = new_payload_requests_builder.lock(); + let new_payload_requests_builder_mu = new_payload_requests_builder + .lock() + .expect("new_payloads_requests_builder mutex poisoned"); assert_eq!(new_payload_requests_mu.len(), 1); assert_eq!(new_payload_requests_builder_mu.len(), 1); let req: &(ExecutionPayloadV3, Vec>, B256) = @@ -1019,12 +1032,18 @@ pub mod tests { assert!(get_payload_response.is_ok()); let get_payload_requests = test_harness.l2_mock.get_payload_requests.clone(); { - let get_payload_requests_mu = get_payload_requests.lock(); + let get_payload_requests_mu = get_payload_requests + .lock() + .expect("get_payloads_requests mutex poisoned"); let get_payload_requests_builder = test_harness.builder_mock.get_payload_requests.clone(); - let get_payload_requests_builder_mu = get_payload_requests_builder.lock(); + let get_payload_requests_builder_mu = get_payload_requests_builder + .lock() + .expect("get_payloads_requests_builder mutex poisoned"); let new_payload_requests = test_harness.l2_mock.new_payload_requests.clone(); - let new_payload_requests_mu = new_payload_requests.lock(); + let new_payload_requests_mu = new_payload_requests + .lock() + .expect("new_payloads_requests mutex poisoned"); assert_eq!(get_payload_requests_builder_mu.len(), 0); assert_eq!(get_payload_requests_mu.len(), 1); assert_eq!(new_payload_requests_mu.len(), 1); @@ -1076,7 +1095,10 @@ pub mod tests { module .register_method("engine_forkchoiceUpdatedV3", move |params, _, _| { let params: (ForkchoiceState, Option) = params.parse()?; - let mut fcu_requests = mock_engine_server.fcu_requests.lock(); + let mut fcu_requests = mock_engine_server + .fcu_requests + .lock() + .expect("fcu_requests mutex poisoned"); fcu_requests.push(params); let mut response = mock_engine_server.fcu_response.clone(); @@ -1093,7 +1115,10 @@ pub mod tests { module .register_method("engine_getPayloadV3", move |params, _, _| { let params: (PayloadId,) = params.parse()?; - let mut get_payload_requests = mock_engine_server.get_payload_requests.lock(); + let mut get_payload_requests = mock_engine_server + .get_payload_requests + .lock() + .expect("get_payloads_requests mutex poisoned"); get_payload_requests.push(params.0); // Return the response based on the call index, or the last one if we exceed the list @@ -1120,7 +1145,10 @@ pub mod tests { module .register_method("engine_newPayloadV3", move |params, _, _| { let params: (ExecutionPayloadV3, Vec, B256) = params.parse()?; - let mut new_payload_requests = mock_engine_server.new_payload_requests.lock(); + let mut new_payload_requests = mock_engine_server + .new_payload_requests + .lock() + .expect("new_payloads_requests mutex poisoned"); new_payload_requests.push(params); mock_engine_server.new_payload_response.clone() @@ -1162,9 +1190,19 @@ pub mod tests { sleep(std::time::Duration::from_millis(100)).await; { - let builder_fcu_req = builder_mock.fcu_requests.lock(); + let builder_fcu_req = builder_mock + .fcu_requests + .lock() + .expect("fcu_requests mutex poisoned"); assert_eq!(builder_fcu_req.len(), 1); - assert_eq!(l2_mock.fcu_requests.lock().len(), 1); + assert_eq!( + l2_mock + .fcu_requests + .lock() + .expect("fcu_requests_mu mutex poisoned") + .len(), + 1 + ); } // Test getPayload call @@ -1175,12 +1213,18 @@ pub mod tests { sleep(std::time::Duration::from_millis(100)).await; { - let builder_gp_reqs = builder_mock.get_payload_requests.lock(); + let builder_gp_reqs = builder_mock + .get_payload_requests + .lock() + .expect("get_payload_requests mutex poisoned"); assert_eq!(builder_gp_reqs.len(), 0); } { - let local_gp_reqs = l2_mock.get_payload_requests.lock(); + let local_gp_reqs = l2_mock + .get_payload_requests + .lock() + .expect("get_payload_requests mutex poisoned"); assert_eq!(local_gp_reqs.len(), 1); assert_eq!(local_gp_reqs[0], same_id); } diff --git a/crates/rollup-boost/src/tests/common/mod.rs b/crates/rollup-boost/src/tests/common/mod.rs index 3b7feadb..c1959330 100644 --- a/crates/rollup-boost/src/tests/common/mod.rs +++ b/crates/rollup-boost/src/tests/common/mod.rs @@ -21,7 +21,6 @@ use jsonrpsee::http_client::{HttpClient, transport::HttpBackend}; use jsonrpsee::proc_macros::rpc; use op_alloy_consensus::TxDeposit; use op_alloy_rpc_types_engine::OpPayloadAttributes; -use parking_lot::Mutex; use proxy::{BuilderProxyHandler, start_proxy_server}; use serde_json::Value; use services::op_reth::{AUTH_RPC_PORT, OpRethConfig, OpRethImage, OpRethMehods, P2P_PORT}; @@ -30,6 +29,7 @@ use std::collections::HashSet; use std::net::TcpListener; use std::path::PathBuf; use std::str::FromStr; +use std::sync::Mutex; use std::sync::{Arc, LazyLock}; use std::time::{Duration, SystemTime}; use std::{fs::File, io::BufReader, time::UNIX_EPOCH}; @@ -643,7 +643,12 @@ pub fn get_available_port() -> u16 { LazyLock::new(|| Mutex::new(HashSet::new())); loop { let port: u16 = rand::random_range(1000..20000); - if TcpListener::bind(("127.0.0.1", port)).is_ok() && CLAIMED_PORTS.lock().insert(port) { + if TcpListener::bind(("127.0.0.1", port)).is_ok() + && CLAIMED_PORTS + .lock() + .expect("CLAIMED_PORTS poisoned") + .insert(port) + { return port; } }