Skip to content

Commit

Permalink
use relayChains for determining channel size
Browse files Browse the repository at this point in the history
 - fix nits
 - add comments to functions
 - remove thread::spawn
  • Loading branch information
kamiyaa committed Jan 5, 2025
1 parent a32c8dd commit 4ff2a77
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 28 deletions.
12 changes: 4 additions & 8 deletions rust/main/agents/relayer/src/msg/op_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ impl OpQueue {
let matched_requests: Vec<_> = message_retry_requests
.iter_mut()
.filter_map(|(retry_req, retry_response)| {
let match_res = retry_req.pattern.op_matches(&op);
// update retry metrics
if match_res {
if retry_req.pattern.op_matches(&op) {
debug!(uuid = retry_req.uuid, "Matched request");
retry_response.matched += 1;
Some(retry_req.uuid.clone())
} else {
Expand All @@ -111,17 +111,13 @@ impl OpQueue {
})
.collect();

let matches = !matched_requests.is_empty();
if matches {
if !matched_requests.is_empty() {
info!(
operation = %op,
queue_label = %self.queue_metrics_label,
"Retrying OpQueue operation"
);
op.reset_attempts();
for matched_req in matched_requests {
info!(uuid = matched_req, "Matched request");
}
}
Reverse(op)
})
Expand All @@ -136,7 +132,7 @@ impl OpQueue {
"Sending relayer retry response back"
);
if let Err(err) = retry_req.transmitter.send(retry_response).await {
tracing::error!(err = err.to_string(), "Failed to send retry response");
tracing::error!(?err, "Failed to send retry response");
}
}
queue.append(&mut reprioritized_queue);
Expand Down
2 changes: 1 addition & 1 deletion rust/main/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ impl BaseAgent for Relayer {
);
}
// run server
let custom_routes = relayer_server::Server::new()
let custom_routes = relayer_server::Server::new(self.origin_chains.len())
.with_op_retry(sender.clone())
.with_message_queue(prep_queues)
.routes();
Expand Down
18 changes: 6 additions & 12 deletions rust/main/agents/relayer/src/server/message_retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@ use tokio::sync::{broadcast::Sender, mpsc};

const MESSAGE_RETRY_API_BASE: &str = "/message_retry";

#[derive(new)]
#[derive(Clone, Debug, new)]
pub struct MessageRetryApi {
retry_request_transmitter: Sender<MessageRetryRequest>,
}

#[derive(Clone, Debug)]
pub struct MessageRetryApiState {
pub retry_request_transmitter: Sender<MessageRetryRequest>,
relayer_chains: usize,
}

#[derive(Clone, Debug)]
Expand All @@ -36,7 +32,7 @@ pub struct MessageRetryResponse {
}

async fn retry_message(
State(state): State<MessageRetryApiState>,
State(state): State<MessageRetryApi>,
Json(retry_req_payload): Json<MatchingList>,
) -> Result<Json<MessageRetryResponse>, String> {
let uuid = uuid::Uuid::new_v4();
Expand All @@ -47,7 +43,7 @@ async fn retry_message(
// This channel is only created to service this single
// retry request so we're expecting a single response
// from each transmitter end, hence we are using a channel of size 1
let (transmitter, mut receiver) = mpsc::channel(1);
let (transmitter, mut receiver) = mpsc::channel(state.relayer_chains);
state
.retry_request_transmitter
.send(MessageRetryRequest {
Expand Down Expand Up @@ -87,9 +83,7 @@ impl MessageRetryApi {
pub fn router(&self) -> Router {
Router::new()
.route("/", routing::post(retry_message))
.with_state(MessageRetryApiState {
retry_request_transmitter: self.retry_request_transmitter.clone(),
})
.with_state(self.clone())
}

pub fn get_route(&self) -> (&'static str, Router) {
Expand Down Expand Up @@ -118,7 +112,7 @@ mod tests {
fn setup_test_server() -> TestServerSetup {
let broadcast_tx = Sender::new(ENDPOINT_MESSAGES_QUEUE_SIZE);

let message_retry_api = MessageRetryApi::new(broadcast_tx.clone());
let message_retry_api = MessageRetryApi::new(broadcast_tx.clone(), 10);
let (path, retry_router) = message_retry_api.get_route();

let app = Router::new().nest(path, retry_router);
Expand Down
3 changes: 2 additions & 1 deletion rust/main/agents/relayer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mod message_retry;

#[derive(new)]
pub struct Server {
relayer_chains: usize,
#[new(default)]
retry_transmitter: Option<Sender<MessageRetryRequest>>,
#[new(default)]
Expand All @@ -37,7 +38,7 @@ impl Server {
pub fn routes(self) -> Vec<(&'static str, Router)> {
let mut routes = vec![];
if let Some(tx) = self.retry_transmitter {
routes.push(MessageRetryApi::new(tx).get_route());
routes.push(MessageRetryApi::new(tx, self.relayer_chains).get_route());
}
if let Some(op_queues) = self.op_queues {
routes.push(ListOperationsApi::new(op_queues).get_route());
Expand Down
7 changes: 1 addition & 6 deletions rust/main/utils/run-locally/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,12 +485,7 @@ fn main() -> ExitCode {
sleep(Duration::from_secs(10));

// test retry request
let retry_req = std::thread::spawn(server::run_retry_request);
// check retry request matched at least 1 operation
let resp = retry_req
.join()
.expect("Failed to join retry request thread")
.expect("Failed to process retry request");
let resp = server::run_retry_request().expect("Failed to process retry request");
assert!(resp.matched > 0);

if !post_startup_invariants(&checkpoints_dirs) {
Expand Down
4 changes: 4 additions & 0 deletions rust/main/utils/run-locally/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub struct MessageRetryResponse {
pub matched: u64,
}

/// create tokio runtime to send a retry request to
/// relayer to retry all existing messages in the queues
pub fn run_retry_request() -> io::Result<MessageRetryResponse> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
Expand All @@ -25,6 +27,8 @@ pub fn run_retry_request() -> io::Result<MessageRetryResponse> {
.block_on(async { call_retry_request().await })
}

/// sends a request to relayer to retry all existing messages
/// in the queues
async fn call_retry_request() -> io::Result<MessageRetryResponse> {
let client = reqwest::Client::new();

Expand Down

0 comments on commit 4ff2a77

Please sign in to comment.