From a53738fb3583a5345b53ade915b909fdf054c738 Mon Sep 17 00:00:00 2001 From: kayibal Date: Fri, 5 Jun 2026 11:01:36 +0100 Subject: [PATCH 1/3] feat(fynd-core): expose BlockStepController for gated block ingestion MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Threads tycho-simulation's new BlockStepController (propeller-heads/tycho#1067) through the Fynd solver pipeline so callers can pause block processing for deterministic testing and controlled ingestion. New API: - `FyndBuilder::build_with_step_controller()` — async method returning `(Solver, BlockStepController)`. The feed runs in step-controlled mode: each block is buffered until the caller calls `trigger_next_block()`. - `BlockStepController` is re-exported from `fynd_core` alongside `PendingBlockProcessor`. - New `SolverBuildError::StepControllerChannelClosed` error variant. Blocked on next tycho-simulation release after 0.302.0 that includes BlockStepController. Once published, bump the workspace constraint and run `cargo check --workspace` to update the lockfile. Co-Authored-By: Claude Sonnet 4.6 --- fynd-core/src/feed/tycho_feed.rs | 192 ++++++++++++++++++++++++++++++- fynd-core/src/lib.rs | 3 + fynd-core/src/solver.rs | 71 +++++++++++- 3 files changed, 263 insertions(+), 3 deletions(-) diff --git a/fynd-core/src/feed/tycho_feed.rs b/fynd-core/src/feed/tycho_feed.rs index 0290ebf9..27712608 100644 --- a/fynd-core/src/feed/tycho_feed.rs +++ b/fynd-core/src/feed/tycho_feed.rs @@ -14,7 +14,10 @@ use tokio::{ use tokio_stream::StreamExt; use tracing::{debug, info, instrument, span, trace, Instrument, Level}; use tycho_simulation::{ - evm::{pending::PendingBlockProcessor, stream::ProtocolStreamBuilder}, + evm::{ + pending::PendingBlockProcessor, + stream::{BlockStepController, ProtocolStreamBuilder}, + }, protocol::models::Update, rfq::stream::RFQStreamBuilder, tycho_client::feed::{component_tracker::ComponentFilter, SynchronizerState}, @@ -437,6 +440,193 @@ impl TychoFeed { Ok(()) } + /// Like [`run`](Self::run) but gates each block behind a [`BlockStepController`]. + /// + /// Delivers the controller (or an error string) via `controller_tx` once the stream is + /// built and before the first block is processed. The caller must call + /// [`BlockStepController::trigger_next_block`] for each block to be processed. + /// + /// Only valid when at least one non-RFQ protocol is configured. Returns + /// [`DataFeedError::Config`] if all protocols are RFQ. + pub(crate) async fn run_with_step_controller( + self, + controller_tx: oneshot::Sender>, + ) -> Result<(), DataFeedError> { + info!( + tycho_url = %self.config.tycho_url, + protocols = ?self.config.protocols, + "Starting Data Feed (with step controller)..." + ); + + if self + .config + .protocols + .iter() + .all(|p| p.starts_with("rfq:")) + { + let msg = "step controller requires at least one non-RFQ protocol".to_string(); + let _ = controller_tx.send(Err(msg.clone())); + return Err(DataFeedError::Config(msg)); + } + + let tycho_api_key = self + .config + .tycho_api_key + .clone() + .or_else(|| std::env::var("TYCHO_API_KEY").ok()); + + let all_tokens = match load_all_tokens( + self.config.tycho_url.as_str(), + !self.config.use_tls, + tycho_api_key.as_deref(), + true, + self.config.chain, + Some(self.config.min_token_quality), + self.config.traded_n_days_ago, + ) + .await + { + Ok(t) => t, + Err(e) => { + let e = DataFeedError::StreamError(e.to_string()); + let _ = controller_tx.send(Err(e.to_string())); + return Err(e); + } + }; + + debug!("Loaded {} tokens from Tycho", all_tokens.len()); + + let tvl_filter = ComponentFilter::with_tvl_range( + self.config.min_tvl / self.config.tvl_buffer_ratio, + self.config.min_tvl, + ) + .blocklist( + self.config + .blocklisted_components + .clone(), + ); + + let mut stream_builder = match register_exchanges( + ProtocolStreamBuilder::new(&self.config.tycho_url, self.config.chain) + .skip_state_decode_failures(true), + tvl_filter, + &self.config.protocols, + ) { + Ok(sb) => sb, + Err(e) => { + let _ = controller_tx.send(Err(e.to_string())); + return Err(e); + } + } + .auth_key(self.config.tycho_api_key.clone()) + .skip_state_decode_failures(true) + .min_token_quality(self.config.min_token_quality as u32); + + if self.config.partial_blocks { + stream_builder = stream_builder.enable_partial_blocks(); + } + + let stream_builder = stream_builder + .set_tokens(all_tokens.clone()) + .await; + let (stream_builder, controller) = stream_builder.with_step_controller(); + + let mut protocol_stream = match stream_builder.build().await { + Ok(stream) => { + let _ = controller_tx.send(Ok(controller)); + stream + } + Err(e) => { + let msg = e.to_string(); + let _ = controller_tx.send(Err(msg.clone())); + return Err(DataFeedError::StreamError(msg)); + } + }; + + // Spawn rfq stream (same as run()). + let (mut rfq_rx, mut rfq_handle) = if self + .config + .protocols + .iter() + .any(|p| p.starts_with("rfq:")) + { + let rfq_tokens: HashSet = all_tokens.keys().cloned().collect(); + let rfq_stream_builder = register_rfq( + RFQStreamBuilder::new() + .set_tokens(all_tokens) + .await, + self.config.chain, + self.config.min_tvl, + &self.config.protocols, + rfq_tokens, + )?; + let (rfq_tx, rfq_rx) = tokio::sync::mpsc::channel(64); + let rfq_handle: JoinHandle> = tokio::spawn(async move { + rfq_stream_builder + .build(rfq_tx) + .await + .map_err(|e| DataFeedError::StreamError(e.to_string()))?; + Ok(()) + }); + (Some(rfq_rx), Some(rfq_handle)) + } else { + (None, None) + }; + + loop { + tokio::select! { + msg = protocol_stream.next() => { + match msg { + Some(msg) => { + trace!("Received message from protocol stream: {:?}", msg); + let msg = msg.map_err(|e| DataFeedError::StreamError(e.to_string()))?; + self.handle_tycho_message(msg).await?; + } + None => { + info!("Protocol stream ended"); + break; + } + } + } + msg = async { + if let Some(rx) = &mut rfq_rx { rx.recv().await } + else { std::future::pending().await } + } => { + match msg { + Some(msg) => { + trace!("Received message from RFQ stream: {:?}", msg); + self.handle_tycho_message(msg).await?; + } + None => { + info!("RFQ stream ended"); + break; + } + } + } + rfq_result = async { + if let Some(handle) = &mut rfq_handle { handle.await } + else { std::future::pending().await } + } => { + match rfq_result { + Ok(Ok(())) => { + return Err(DataFeedError::StreamError( + "RFQ stream task ended unexpectedly".to_string(), + )); + } + Ok(Err(e)) => { + return Err(DataFeedError::StreamError(format!("RFQ stream error: {e}"))); + } + Err(e) => { + return Err(DataFeedError::StreamError(format!("RFQ task panicked: {e}"))); + } + } + } + } + } + + Ok(()) + } + /// Handles a message from Tycho stream. #[instrument(skip(self, msg))] async fn handle_tycho_message(&self, msg: Update) -> Result<(), DataFeedError> { diff --git a/fynd-core/src/lib.rs b/fynd-core/src/lib.rs index 5481995a..11a1d200 100644 --- a/fynd-core/src/lib.rs +++ b/fynd-core/src/lib.rs @@ -62,6 +62,9 @@ pub use tycho_simulation::evm::pending::PendingBlockProcessor; pub use tycho_simulation::evm::pending::PendingError; /// A pending transaction bundle passed to [`PendingBlockProcessor`] for simulation. pub use tycho_simulation::evm::pending::PendingUpdate; +/// Handle returned by [`FyndBuilder::build_with_step_controller`] that controls when each +/// buffered block is released for decoding. See [`tycho_simulation`] for the full API. +pub use tycho_simulation::evm::stream::BlockStepController; /// Implement this trait and register it via /// [`FyndBuilder::with_pending_indexer`](solver::FyndBuilder::with_pending_indexer) /// to receive raw transaction deltas during pending-block simulation. diff --git a/fynd-core/src/solver.rs b/fynd-core/src/solver.rs index 6d9af5cc..e76725ee 100644 --- a/fynd-core/src/solver.rs +++ b/fynd-core/src/solver.rs @@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize}; use tokio::{sync::broadcast, task::JoinHandle}; use tycho_execution::encoding::evm::swap_encoder::swap_encoder_registry::SwapEncoderRegistry; use tycho_simulation::{ - evm::pending::PendingBlockProcessor, + evm::{pending::PendingBlockProcessor, stream::BlockStepController}, tycho_common::{models::Chain, traits::TxDeltaIndexer, Bytes}, tycho_core::models::Address, tycho_ethereum::rpc::EthereumRpcClient, @@ -290,6 +290,10 @@ pub enum SolverBuildError { /// panicked rather than returning an error through the channel. #[error("pending processor channel closed before processor was delivered")] PendingChannelClosed, + /// The step-controller oneshot closed without delivering a value, meaning the feed task + /// panicked rather than returning an error through the channel. + #[error("step controller channel closed before controller was delivered")] + StepControllerChannelClosed, } /// Internal pool entry — either a built-in algorithm (by name) or a custom one. @@ -879,7 +883,70 @@ impl FyndBuilder { pending, )) } -} + + /// Assembles and starts all solver components, also returning a [`BlockStepController`] + /// that lets the caller control when each buffered block is released for processing. + /// + /// Intended for deterministic testing: call [`BlockStepController::trigger_next_block`] to + /// step through blocks one at a time, and [`BlockStepController::peek_next_block`] to inspect + /// a block before it is decoded. Dropping the controller ungates the stream so it runs to its + /// natural end. + /// + /// Only valid when at least one non-RFQ protocol is configured. + /// + /// # Errors + /// + /// Returns [`SolverBuildError`] if any component fails to initialize, all protocols are RFQ, + /// or the step-controller channel closes before the controller is delivered. + pub async fn build_with_step_controller( + self, + ) -> Result<(Solver, BlockStepController), SolverBuildError> { + let mut c = self.assemble_components()?; + + let (controller_tx, controller_rx) = + tokio::sync::oneshot::channel::>(); + + let feed_handle = tokio::spawn(async move { + if let Err(e) = c + .tycho_feed + .run_with_step_controller(controller_tx) + .await + { + tracing::error!(error = %e, "tycho feed error"); + } + }); + let gas_price_handle = tokio::spawn(async move { + c.gas_price_fetcher.run().await; + }); + let computation_handle = tokio::spawn(async move { + c.computation_manager + .run(c.computation_event_rx, c.computation_shutdown_rx) + .await; + }); + + let controller = controller_rx + .await + .map_err(|_| SolverBuildError::StepControllerChannelClosed)? + .map_err(SolverBuildError::FeedSetup)?; + + Ok(( + Solver { + router: c.router, + worker_pools: c.worker_pools, + market_data: c.market_data, + derived_data: c.derived_data, + feed_handle, + gas_price_handle, + computation_handle, + computation_shutdown_tx: c.computation_shutdown_tx, + chain: c.chain, + router_address: c.router_address, + market_event_tx: c.market_event_tx, + }, + controller, + )) + } +} // impl FyndBuilder /// A running solver assembled by [`FyndBuilder`]. pub struct Solver { From 9d48b7c3992274a0098aad2d1873c67c3ea22ff7 Mon Sep 17 00:00:00 2001 From: kayibal Date: Fri, 5 Jun 2026 11:50:47 +0100 Subject: [PATCH 2/3] feat(fynd-core): update tycho to 0.305.0 with BlockStepController support tycho-simulation 0.305.0 and tycho-execution 0.305.0 are now published with BlockStepController available in the evm::stream module. The 0.305.x release changed the protocol stream return type to be !Unpin; fix all three run* methods in TychoFeed to Box::pin the stream at construction so that tokio::select! futures remain Unpin. Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 38 ++++++++++++++++---------------- Cargo.toml | 4 ++-- fynd-core/src/feed/tycho_feed.rs | 9 ++++---- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26187ed7..6852399c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2965,7 +2965,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3868,7 +3868,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.3", "system-configuration", "tokio", "tower-service", @@ -4626,7 +4626,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5365,7 +5365,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.5.10", + "socket2 0.6.3", "thiserror 2.0.18", "tokio", "tracing", @@ -5403,7 +5403,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.3", "tracing", "windows-sys 0.59.0", ] @@ -6127,7 +6127,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -6186,7 +6186,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -6842,7 +6842,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -7493,9 +7493,9 @@ dependencies = [ [[package]] name = "tycho-client" -version = "0.302.0" +version = "0.305.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f84175b69d11e2f73f4e02852c3909b823d22d693c9cca36c604321eb837c4be" +checksum = "5b39e011e03d6907998e38850bf9f311dd4b98889f6a87d3a3d9fdd1983a0eb6" dependencies = [ "async-trait", "backoff", @@ -7523,9 +7523,9 @@ dependencies = [ [[package]] name = "tycho-common" -version = "0.302.5" +version = "0.305.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ef62d9f916904fdbe1ad584fbf67f362146c770b89d0492cd8a556d8c21f0b6" +checksum = "47d9e1daf2157d0a9c9a974d306608f18e0a1dbcab58451ab1bf86bbe6bd3cfc" dependencies = [ "anyhow", "async-trait", @@ -7550,9 +7550,9 @@ dependencies = [ [[package]] name = "tycho-ethereum" -version = "0.302.0" +version = "0.305.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19b6f6b1a24d44a7203e76ef120234d08fe5447cf33768a438affa5c650efcb" +checksum = "bd66a83a05de1a45375f8df88f8156e9edb91b088e197680390f8b91da45aeeb" dependencies = [ "alloy", "async-trait", @@ -7572,9 +7572,9 @@ dependencies = [ [[package]] name = "tycho-execution" -version = "0.302.5" +version = "0.305.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c79cbdd3f339697d04e4c9e10cd9505e626cb79d0cbb31d4876d5ba422c47d1" +checksum = "dc680a9f1f4ca4abef1e00233c24fe450591724c4dc6f94cedf6fe5beb0a80ff" dependencies = [ "alloy", "chrono", @@ -7593,9 +7593,9 @@ dependencies = [ [[package]] name = "tycho-simulation" -version = "0.302.0" +version = "0.305.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69b75565b6125195e8100c3ad2cc9d1100248c11ea9e31f49fb0608fd8f0376c" +checksum = "56b7d28d0a70c1a5d3a0441c0f4b18695d8eaa249f99aee8bfb0b8bde0c51254" dependencies = [ "alloy", "alloy-chains", @@ -8111,7 +8111,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 359ddb29..4e65f074 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -135,8 +135,8 @@ metrics = "0.24" metrics-exporter-prometheus = "0.16" # Tycho -tycho-execution = ">=0.302.0" -tycho-simulation = ">=0.302.0" +tycho-execution = "0.305.0" +tycho-simulation = "0.305.0" typetag = "0.2" # Graph algorithms diff --git a/fynd-core/src/feed/tycho_feed.rs b/fynd-core/src/feed/tycho_feed.rs index 27712608..c53b61f0 100644 --- a/fynd-core/src/feed/tycho_feed.rs +++ b/fynd-core/src/feed/tycho_feed.rs @@ -138,14 +138,14 @@ impl TychoFeed { stream_builder = stream_builder.enable_partial_blocks(); } - Some( + Some(Box::pin( stream_builder .set_tokens(all_tokens.clone()) .await .build() .await .map_err(|e| DataFeedError::StreamError(e.to_string()))?, - ) + )) } else { None }; @@ -337,7 +337,7 @@ impl TychoFeed { }; } - let (mut protocol_stream, pending) = match stream_builder + let (protocol_stream, pending) = match stream_builder .build_with_pending() .await { @@ -348,6 +348,7 @@ impl TychoFeed { return Err(e); } }; + let mut protocol_stream = Box::pin(protocol_stream); if pending_tx.send(Ok(pending)).is_err() { tracing::warn!( @@ -534,7 +535,7 @@ impl TychoFeed { let mut protocol_stream = match stream_builder.build().await { Ok(stream) => { let _ = controller_tx.send(Ok(controller)); - stream + Box::pin(stream) } Err(e) => { let msg = e.to_string(); From 3744b3e5560bab74116897ddf3eab81f285d8040 Mon Sep 17 00:00:00 2001 From: kayibal Date: Fri, 5 Jun 2026 12:22:18 +0100 Subject: [PATCH 3/3] feat(fynd-core): add build_with_pending_and_step_controller Combines PendingBlockProcessor and BlockStepController into a single build method for callers that need both. Adds the corresponding run_with_pending_and_step_controller feed method which calls with_step_controller() on the builder before build_with_pending(). Co-Authored-By: Claude Sonnet 4.6 --- fynd-core/src/feed/tycho_feed.rs | 204 +++++++++++++++++++++++++++++++ fynd-core/src/lib.rs | 5 +- fynd-core/src/solver.rs | 68 +++++++++++ 3 files changed, 275 insertions(+), 2 deletions(-) diff --git a/fynd-core/src/feed/tycho_feed.rs b/fynd-core/src/feed/tycho_feed.rs index c53b61f0..90a32eff 100644 --- a/fynd-core/src/feed/tycho_feed.rs +++ b/fynd-core/src/feed/tycho_feed.rs @@ -441,6 +441,210 @@ impl TychoFeed { Ok(()) } + /// Like [`run_with_pending`](Self::run_with_pending) but also gates each block behind a + /// [`BlockStepController`]. + /// + /// Both the [`PendingBlockProcessor`] and the [`BlockStepController`] are delivered before + /// the first block is processed. Only valid when at least one non-RFQ protocol is configured. + pub(crate) async fn run_with_pending_and_step_controller( + self, + pending_tx: oneshot::Sender>, + controller_tx: oneshot::Sender>, + pending_indexers: Vec<(String, Box)>, + ) -> Result<(), DataFeedError> { + info!( + tycho_url = %self.config.tycho_url, + protocols = ?self.config.protocols, + "Starting Data Feed (with pending + step controller)..." + ); + + if self + .config + .protocols + .iter() + .all(|p| p.starts_with("rfq:")) + { + let msg = "step controller requires at least one non-RFQ protocol".to_string(); + let _ = pending_tx.send(Err(msg.clone())); + let _ = controller_tx.send(Err(msg.clone())); + return Err(DataFeedError::Config(msg)); + } + + let tycho_api_key = self + .config + .tycho_api_key + .clone() + .or_else(|| std::env::var("TYCHO_API_KEY").ok()); + + let all_tokens = match load_all_tokens( + self.config.tycho_url.as_str(), + !self.config.use_tls, + tycho_api_key.as_deref(), + true, + self.config.chain, + Some(self.config.min_token_quality), + self.config.traded_n_days_ago, + ) + .await + { + Ok(t) => t, + Err(e) => { + let e = DataFeedError::StreamError(e.to_string()); + let _ = pending_tx.send(Err(e.to_string())); + let _ = controller_tx.send(Err(e.to_string())); + return Err(e); + } + }; + + debug!("Loaded {} tokens from Tycho", all_tokens.len()); + + let mut stream_builder = match register_exchanges( + ProtocolStreamBuilder::new(&self.config.tycho_url, self.config.chain) + .skip_state_decode_failures(true), + ComponentFilter::with_tvl_range( + self.config.min_tvl / self.config.tvl_buffer_ratio, + self.config.min_tvl, + ) + .blocklist( + self.config + .blocklisted_components + .clone(), + ), + &self.config.protocols, + ) { + Ok(sb) => sb, + Err(e) => { + let _ = pending_tx.send(Err(e.to_string())); + let _ = controller_tx.send(Err(e.to_string())); + return Err(e); + } + } + .auth_key(self.config.tycho_api_key.clone()) + .skip_state_decode_failures(true) + .min_token_quality(self.config.min_token_quality as u32) + .set_tokens(all_tokens.clone()) + .await; + + for (extractor, indexer) in pending_indexers { + stream_builder = match stream_builder.with_pending_indexer(&extractor, indexer) { + Ok(sb) => sb, + Err(e) => { + let e = DataFeedError::StreamError(e.to_string()); + let _ = pending_tx.send(Err(e.to_string())); + let _ = controller_tx.send(Err(e.to_string())); + return Err(e); + } + }; + } + + let (stream_builder, controller) = stream_builder.with_step_controller(); + + let mut protocol_stream = match stream_builder + .build_with_pending() + .await + { + Ok((stream, pending)) => { + if pending_tx.send(Ok(pending)).is_err() { + tracing::warn!( + "PendingBlockProcessor receiver dropped before send; continuing without \ + pending updates" + ); + } + let _ = controller_tx.send(Ok(controller)); + Box::pin(stream) + } + Err(e) => { + let msg = e.to_string(); + let _ = pending_tx.send(Err(msg.clone())); + let _ = controller_tx.send(Err(msg.clone())); + return Err(DataFeedError::StreamError(msg)); + } + }; + + // Spawn RFQ stream (same as run_with_pending()). + let (mut rfq_rx, mut rfq_handle) = if self + .config + .protocols + .iter() + .any(|p| p.starts_with("rfq:")) + { + let rfq_tokens: HashSet = all_tokens.keys().cloned().collect(); + let rfq_stream_builder = register_rfq( + RFQStreamBuilder::new() + .set_tokens(all_tokens) + .await, + self.config.chain, + self.config.min_tvl, + &self.config.protocols, + rfq_tokens, + )?; + let (rfq_tx, rfq_rx) = tokio::sync::mpsc::channel(64); + let rfq_handle: JoinHandle> = tokio::spawn(async move { + rfq_stream_builder + .build(rfq_tx) + .await + .map_err(|e| DataFeedError::StreamError(e.to_string()))?; + Ok(()) + }); + (Some(rfq_rx), Some(rfq_handle)) + } else { + (None, None) + }; + + loop { + tokio::select! { + msg = protocol_stream.next() => { + match msg { + Some(msg) => { + trace!("Received message from protocol stream: {:?}", msg); + let msg = msg.map_err(|e| DataFeedError::StreamError(e.to_string()))?; + self.handle_tycho_message(msg).await?; + } + None => { + info!("Protocol stream ended"); + break; + } + } + } + msg = async { + if let Some(rx) = &mut rfq_rx { rx.recv().await } + else { std::future::pending().await } + } => { + match msg { + Some(msg) => { + trace!("Received message from RFQ stream: {:?}", msg); + self.handle_tycho_message(msg).await?; + } + None => { + info!("RFQ stream ended"); + break; + } + } + } + rfq_result = async { + if let Some(handle) = &mut rfq_handle { handle.await } + else { std::future::pending().await } + } => { + match rfq_result { + Ok(Ok(())) => { + return Err(DataFeedError::StreamError( + "RFQ stream task ended unexpectedly".to_string(), + )); + } + Ok(Err(e)) => { + return Err(DataFeedError::StreamError(format!("RFQ stream error: {e}"))); + } + Err(e) => { + return Err(DataFeedError::StreamError(format!("RFQ task panicked: {e}"))); + } + } + } + } + } + + Ok(()) + } + /// Like [`run`](Self::run) but gates each block behind a [`BlockStepController`]. /// /// Delivers the controller (or an error string) via `controller_tx` once the stream is diff --git a/fynd-core/src/lib.rs b/fynd-core/src/lib.rs index 11a1d200..1a3a30c2 100644 --- a/fynd-core/src/lib.rs +++ b/fynd-core/src/lib.rs @@ -62,8 +62,9 @@ pub use tycho_simulation::evm::pending::PendingBlockProcessor; pub use tycho_simulation::evm::pending::PendingError; /// A pending transaction bundle passed to [`PendingBlockProcessor`] for simulation. pub use tycho_simulation::evm::pending::PendingUpdate; -/// Handle returned by [`FyndBuilder::build_with_step_controller`] that controls when each -/// buffered block is released for decoding. See [`tycho_simulation`] for the full API. +/// Handle returned by [`FyndBuilder::build_with_step_controller`] or +/// [`FyndBuilder::build_with_pending_and_step_controller`] that controls when each buffered +/// block is released for decoding. See [`tycho_simulation`] for the full API. pub use tycho_simulation::evm::stream::BlockStepController; /// Implement this trait and register it via /// [`FyndBuilder::with_pending_indexer`](solver::FyndBuilder::with_pending_indexer) diff --git a/fynd-core/src/solver.rs b/fynd-core/src/solver.rs index e76725ee..de83f084 100644 --- a/fynd-core/src/solver.rs +++ b/fynd-core/src/solver.rs @@ -946,6 +946,74 @@ impl FyndBuilder { controller, )) } + + /// Assembles and starts all solver components, returning both a [`PendingBlockProcessor`] + /// and a [`BlockStepController`]. + /// + /// Combines the capabilities of [`build_with_pending`](Self::build_with_pending) and + /// [`build_with_step_controller`](Self::build_with_step_controller). Only valid when at + /// least one non-RFQ protocol is configured. + /// + /// # Errors + /// + /// Returns [`SolverBuildError`] if any component fails to initialize, all protocols are RFQ, + /// or either delivery channel closes before its value is received. + pub async fn build_with_pending_and_step_controller( + self, + ) -> Result<(Solver, PendingBlockProcessor, BlockStepController), SolverBuildError> { + let mut c = self.assemble_components()?; + + let (pending_tx, pending_rx) = + tokio::sync::oneshot::channel::>(); + let (controller_tx, controller_rx) = + tokio::sync::oneshot::channel::>(); + + let pending_indexers = c.pending_indexers; + let feed_handle = tokio::spawn(async move { + if let Err(e) = c + .tycho_feed + .run_with_pending_and_step_controller(pending_tx, controller_tx, pending_indexers) + .await + { + tracing::error!(error = %e, "tycho feed error"); + } + }); + let gas_price_handle = tokio::spawn(async move { + c.gas_price_fetcher.run().await; + }); + let computation_handle = tokio::spawn(async move { + c.computation_manager + .run(c.computation_event_rx, c.computation_shutdown_rx) + .await; + }); + + let pending = pending_rx + .await + .map_err(|_| SolverBuildError::PendingChannelClosed)? + .map_err(SolverBuildError::FeedSetup)?; + let controller = controller_rx + .await + .map_err(|_| SolverBuildError::StepControllerChannelClosed)? + .map_err(SolverBuildError::FeedSetup)?; + + Ok(( + Solver { + router: c.router, + worker_pools: c.worker_pools, + market_data: c.market_data, + derived_data: c.derived_data, + feed_handle, + gas_price_handle, + computation_handle, + computation_shutdown_tx: c.computation_shutdown_tx, + chain: c.chain, + router_address: c.router_address, + market_event_tx: c.market_event_tx, + }, + pending, + controller, + )) + } } // impl FyndBuilder /// A running solver assembled by [`FyndBuilder`].