diff --git a/fynd-core/src/algorithm/bellman_ford.rs b/fynd-core/src/algorithm/bellman_ford.rs index 0ac30155..5ec56d37 100644 --- a/fynd-core/src/algorithm/bellman_ford.rs +++ b/fynd-core/src/algorithm/bellman_ford.rs @@ -27,7 +27,7 @@ use tycho_simulation::{ tycho_core::{models::token::Token, simulation::protocol_sim::Price}, }; -use super::{Algorithm, AlgorithmConfig, AlgorithmError, NoPathReason}; +use super::{bf_helpers, Algorithm, AlgorithmConfig, AlgorithmError, NoPathReason}; use crate::{ derived::{ computation::ComputationRequirements, @@ -140,68 +140,6 @@ impl BellmanFordAlgorithm { None } - /// Checks whether the target node or pool conflicts with the existing path to `from`. - /// Walks the predecessor chain once, checking both conditions simultaneously. - fn path_has_conflict( - from: NodeIndex, - target_node: NodeIndex, - target_pool: &ComponentId, - predecessor: &[Option<(NodeIndex, ComponentId)>], - ) -> bool { - let mut current = from; - loop { - if current == target_node { - return true; - } - match &predecessor[current.index()] { - Some((prev, cid)) => { - if cid == target_pool { - return true; - } - current = *prev; - } - None => return false, - } - } - } - - /// Reconstructs the path from token_out back to token_in by walking the predecessor - /// array. - fn reconstruct_path( - token_out: NodeIndex, - token_in: NodeIndex, - predecessor: &[Option<(NodeIndex, ComponentId)>], - ) -> Result, AlgorithmError> { - let mut path = Vec::new(); - let mut current = token_out; - let mut visited = HashSet::new(); - - while current != token_in { - if !visited.insert(current) { - return Err(AlgorithmError::Other("cycle in predecessor chain".to_string())); - } - - let idx = current.index(); - match &predecessor - .get(idx) - .and_then(|p| p.as_ref()) - { - Some((prev_node, component_id)) => { - path.push((*prev_node, current, component_id.clone())); - current = *prev_node; - } - None => { - return Err(AlgorithmError::Other(format!( - "broken predecessor chain at node {idx}" - ))); - } - } - } - - path.reverse(); - Ok(path) - } - /// Extracts the subgraph reachable from `token_in_node` within `max_hops` via BFS. /// /// Returns `(adjacency_list, token_nodes, component_ids)` or `NoPath` if the @@ -455,7 +393,7 @@ impl Algorithm for BellmanFordAlgorithm { let v_idx = v.index(); // Single predecessor walk: skip if target token or pool already in path - if Self::path_has_conflict(u, *v, component_id, &predecessor) { + if bf_helpers::path_has_conflict(u, *v, component_id, &predecessor) { continue; } @@ -541,7 +479,7 @@ impl Algorithm for BellmanFordAlgorithm { // Reconstruct path and build route directly from stored distances/gas // (no re-simulation needed since forbid-revisits guarantees relaxation // amounts match sequential execution). - let path_edges = Self::reconstruct_path(token_out_node, token_in_node, &predecessor)?; + let path_edges = bf_helpers::reconstruct_path(token_out_node, token_in_node, &predecessor)?; let mut swaps = Vec::with_capacity(path_edges.len()); for (from_node, to_node, component_id) in &path_edges { @@ -1284,20 +1222,20 @@ mod tests { pred[2] = Some((NodeIndex::new(1), "pool_b".into())); // Node conflicts: node 0 is in path, node 3 is not - assert!(BellmanFordAlgorithm::path_has_conflict( + assert!(bf_helpers::path_has_conflict( NodeIndex::new(2), NodeIndex::new(0), &"any".into(), &pred )); - assert!(!BellmanFordAlgorithm::path_has_conflict( + assert!(!bf_helpers::path_has_conflict( NodeIndex::new(2), NodeIndex::new(3), &"any".into(), &pred )); // Self-check: node 2 is itself in the "path from 2" - assert!(BellmanFordAlgorithm::path_has_conflict( + assert!(bf_helpers::path_has_conflict( NodeIndex::new(2), NodeIndex::new(2), &"any".into(), @@ -1305,19 +1243,19 @@ mod tests { )); // Pool conflicts: pool_a and pool_b are used, pool_c is not - assert!(BellmanFordAlgorithm::path_has_conflict( + assert!(bf_helpers::path_has_conflict( NodeIndex::new(2), NodeIndex::new(3), &"pool_a".into(), &pred )); - assert!(BellmanFordAlgorithm::path_has_conflict( + assert!(bf_helpers::path_has_conflict( NodeIndex::new(2), NodeIndex::new(3), &"pool_b".into(), &pred )); - assert!(!BellmanFordAlgorithm::path_has_conflict( + assert!(!bf_helpers::path_has_conflict( NodeIndex::new(2), NodeIndex::new(3), &"pool_c".into(), diff --git a/fynd-core/src/algorithm/bellman_ford_pricing.rs b/fynd-core/src/algorithm/bellman_ford_pricing.rs new file mode 100644 index 00000000..165ea1fa --- /dev/null +++ b/fynd-core/src/algorithm/bellman_ford_pricing.rs @@ -0,0 +1,456 @@ +//! Bellman-Ford SPFA for one-to-all token pricing. +//! +//! Runs a single flat-array SPFA from a source token (e.g. WETH) with a probe amount, +//! propagating to all reachable tokens within `max_hops`. Uses forbid-revisits to prevent +//! paths through arbitrage loops that would distort prices. +//! +//! Counterpart to the routing BF in [`bellman_ford`](super::bellman_ford): +//! - Routing: one source, one destination, real trade amount +//! - Pricing: one source, ALL destinations, small probe amount + +use std::collections::{HashMap, HashSet}; + +use num_bigint::BigUint; +use num_traits::Zero; +use petgraph::graph::NodeIndex; +use tracing::trace; +use tycho_simulation::tycho_core::models::token::Token; + +use super::{bf_helpers, AlgorithmError}; +use crate::{ + feed::market_data::SharedMarketData, + graph::petgraph::StableDiGraph, + types::{ComponentId, Route, Swap}, +}; + +/// Result of a one-to-all SPFA run. Contains distances and predecessors for all +/// reachable tokens from the source. +pub(crate) struct SpfaAllResult { + distance: Vec, + predecessor: Vec>, + source: NodeIndex, + token_map: HashMap, +} + +impl SpfaAllResult { + /// Returns true if `node` was reached by the SPFA. + pub fn is_reachable(&self, node: NodeIndex) -> bool { + !self.distance[node.index()].is_zero() + } + + /// Reconstructs the path from source to `dest`. + pub fn reconstruct_path( + &self, + dest: NodeIndex, + ) -> Result, AlgorithmError> { + bf_helpers::reconstruct_path(dest, self.source, &self.predecessor) + } + + /// Returns a reference to the token map built during subgraph extraction. + pub fn token_map(&self) -> &HashMap { + &self.token_map + } + + /// Returns the best forward amount reachable at `node` (test-only). + #[cfg(test)] + pub fn amount_at(&self, node: NodeIndex) -> &BigUint { + &self.distance[node.index()] + } +} + +/// Runs a flat Bellman-Ford SPFA from `source` with `amount`, propagating to all +/// reachable tokens within `max_hops`. Uses forbid-revisits to prevent paths +/// through arbitrage loops. +/// +/// One forward pass prices every token reachable from the source. +pub(crate) fn solve_one_to_all( + source: NodeIndex, + amount: BigUint, + max_hops: usize, + graph: &StableDiGraph<()>, + market: &SharedMarketData, +) -> SpfaAllResult { + let subgraph_edges = bf_helpers::extract_subgraph_edges(source, max_hops, graph); + + let max_idx = graph + .node_indices() + .map(|n| n.index()) + .max() + .unwrap_or(0) + + 1; + + if subgraph_edges.is_empty() { + return SpfaAllResult { + distance: vec![BigUint::ZERO; max_idx], + predecessor: vec![None; max_idx], + source, + token_map: HashMap::new(), + }; + } + + // Build token map for all nodes in subgraph + let subgraph_nodes: HashSet = subgraph_edges + .iter() + .flat_map(|&(from, to, _)| [from, to]) + .collect(); + + let token_map: HashMap = subgraph_nodes + .iter() + .filter_map(|&node| { + let addr = &graph[node]; + market + .get_token(addr) + .cloned() + .map(|t| (node, t)) + }) + .collect(); + + let mut distance: Vec = vec![BigUint::ZERO; max_idx]; + let mut predecessor: Vec> = vec![None; max_idx]; + + distance[source.index()] = amount; + + // Build adjacency list + let mut adj: HashMap> = HashMap::new(); + for (from, to, cid) in &subgraph_edges { + adj.entry(*from) + .or_default() + .push((*to, cid)); + } + + // SPFA: seed active set with source node + let mut active_nodes: Vec = vec![source]; + + for _round in 0..max_hops { + if active_nodes.is_empty() { + break; + } + + let mut next_active: HashSet = HashSet::new(); + + for &u in &active_nodes { + let u_idx = u.index(); + if distance[u_idx].is_zero() { + continue; + } + + let Some(token_u) = token_map.get(&u) else { + continue; + }; + + let Some(edges) = adj.get(&u) else { + continue; + }; + + for &(v, component_id) in edges { + let v_idx = v.index(); + + // Forbid token and pool revisits (single predecessor walk) + if bf_helpers::path_has_conflict(u, v, component_id, &predecessor) { + continue; + } + + let Some(token_v) = token_map.get(&v) else { + continue; + }; + + let Some(sim_state) = market.get_simulation_state(component_id) else { + continue; + }; + + let result = + match sim_state.get_amount_out(distance[u_idx].clone(), token_u, token_v) { + Ok(r) => r, + Err(e) => { + trace!( + component_id, + error = %e, + "get_amount_out failed during relaxation, skipping edge" + ); + continue; + } + }; + + let amount_out = result.amount; + + if amount_out > distance[v_idx] { + distance[v_idx] = amount_out; + predecessor[v_idx] = Some((u, component_id.clone())); + next_active.insert(v); + } + } + } + + active_nodes = next_active.into_iter().collect(); + } + + SpfaAllResult { distance, predecessor, source, token_map } +} + +/// Simulates a path to build a Route with Swaps. +/// +/// Forbid-revisits guarantees each pool appears at most once, so each step +/// uses the original pool state directly (no state-override tracking needed). +pub(crate) fn resimulate_path( + path: &[(NodeIndex, NodeIndex, ComponentId)], + amount_in: &BigUint, + market: &SharedMarketData, + token_map: &HashMap, +) -> Result<(Route, BigUint), AlgorithmError> { + let mut current_amount = amount_in.clone(); + let mut swaps = Vec::with_capacity(path.len()); + + for (from_node, to_node, component_id) in path { + let token_in = token_map + .get(from_node) + .ok_or_else(|| AlgorithmError::DataNotFound { + kind: "token", + id: Some(format!("{:?}", from_node)), + })?; + let token_out = token_map + .get(to_node) + .ok_or_else(|| AlgorithmError::DataNotFound { + kind: "token", + id: Some(format!("{:?}", to_node)), + })?; + + let component = market + .get_component(component_id) + .ok_or_else(|| AlgorithmError::DataNotFound { + kind: "component", + id: Some(component_id.clone()), + })?; + let state = market + .get_simulation_state(component_id) + .ok_or_else(|| AlgorithmError::DataNotFound { + kind: "simulation state", + id: Some(component_id.clone()), + })?; + + let result = state + .get_amount_out(current_amount.clone(), token_in, token_out) + .map_err(|e| AlgorithmError::SimulationFailed { + component_id: component_id.clone(), + error: format!("{:?}", e), + })?; + + swaps.push(Swap::new( + component_id.clone(), + component.protocol_system.clone(), + token_in.address.clone(), + token_out.address.clone(), + current_amount.clone(), + result.amount.clone(), + result.gas, + component.clone(), + state.clone_box(), + )); + + current_amount = result.amount; + } + + let route = Route::new(swaps); + Ok((route, current_amount)) +} + +#[cfg(test)] +mod tests { + use tycho_simulation::{ + tycho_common::simulation::protocol_sim::ProtocolSim, tycho_core::models::token::Token, + }; + + use super::*; + use crate::{ + algorithm::test_utils::{component, token, MockProtocolSim}, + graph::{GraphManager, PetgraphStableDiGraphManager}, + }; + + fn setup_market_and_graph( + pools: Vec<(&str, &Token, &Token, MockProtocolSim)>, + ) -> (SharedMarketData, PetgraphStableDiGraphManager<()>) { + let mut market = SharedMarketData::new(); + + for (pool_id, token_in, token_out, state) in pools { + let tokens = vec![token_in.clone(), token_out.clone()]; + let comp = component(pool_id, &tokens); + market.upsert_components(std::iter::once(comp)); + market.update_states([(pool_id.to_string(), Box::new(state) as Box)]); + market.upsert_tokens(tokens); + } + + let mut graph_manager = PetgraphStableDiGraphManager::default(); + graph_manager.initialize_graph(&market.component_topology()); + + (market, graph_manager) + } + + #[test] + fn solve_one_to_all_prices_all_reachable_tokens() { + let eth = token(0, "ETH"); + let usdc = token(1, "USDC"); + let dai = token(2, "DAI"); + + let (market, gm) = setup_market_and_graph(vec![ + ("eth_usdc", ð, &usdc, MockProtocolSim::new(2000.0)), + ("usdc_dai", &usdc, &dai, MockProtocolSim::new(1.0)), + ]); + + let graph = gm.graph(); + let eth_node = graph + .node_indices() + .find(|&n| graph[n] == eth.address) + .unwrap(); + + let result = solve_one_to_all(eth_node, BigUint::from(100u64), 2, graph, &market); + + // ETH -> USDC: 100 * 2000 = 200_000 + let usdc_node = graph + .node_indices() + .find(|&n| graph[n] == usdc.address) + .unwrap(); + assert!(result.is_reachable(usdc_node)); + assert_eq!(*result.amount_at(usdc_node), BigUint::from(200_000u64)); + + // ETH -> USDC -> DAI: 200_000 * 1 = 200_000 + let dai_node = graph + .node_indices() + .find(|&n| graph[n] == dai.address) + .unwrap(); + assert!(result.is_reachable(dai_node)); + assert_eq!(*result.amount_at(dai_node), BigUint::from(200_000u64)); + } + + #[test] + fn solve_one_to_all_picks_best_path() { + // Diamond: ETH->A->TARGET (2*3=6x) vs ETH->B->TARGET (4*1=4x) + let eth = token(0, "ETH"); + let a = token(1, "A"); + let b = token(2, "B"); + let target = token(3, "TARGET"); + + let (market, gm) = setup_market_and_graph(vec![ + ("eth_a", ð, &a, MockProtocolSim::new(2.0)), + ("a_target", &a, &target, MockProtocolSim::new(3.0)), + ("eth_b", ð, &b, MockProtocolSim::new(4.0)), + ("b_target", &b, &target, MockProtocolSim::new(1.0)), + ]); + + let graph = gm.graph(); + let eth_node = graph + .node_indices() + .find(|&n| graph[n] == eth.address) + .unwrap(); + + let result = solve_one_to_all(eth_node, BigUint::from(100u64), 2, graph, &market); + + let target_node = graph + .node_indices() + .find(|&n| graph[n] == target.address) + .unwrap(); + + // Best path: ETH->A->TARGET = 100*2*3 = 600 + assert_eq!(*result.amount_at(target_node), BigUint::from(600u64)); + } + + #[test] + fn solve_one_to_all_respects_max_hops() { + let eth = token(0, "ETH"); + let a = token(1, "A"); + let b = token(2, "B"); + let c = token(3, "C"); + + let (market, gm) = setup_market_and_graph(vec![ + ("eth_a", ð, &a, MockProtocolSim::new(2.0)), + ("a_b", &a, &b, MockProtocolSim::new(3.0)), + ("b_c", &b, &c, MockProtocolSim::new(4.0)), + ]); + + let graph = gm.graph(); + let eth_node = graph + .node_indices() + .find(|&n| graph[n] == eth.address) + .unwrap(); + + // max_hops=2: can reach A (1 hop) and B (2 hops), but NOT C (3 hops) + let result = solve_one_to_all(eth_node, BigUint::from(100u64), 2, graph, &market); + + let c_node = graph + .node_indices() + .find(|&n| graph[n] == c.address) + .unwrap(); + assert!(!result.is_reachable(c_node), "C should not be reachable with max_hops=2"); + } + + #[test] + fn reconstruct_and_resimulate_round_trip() { + let eth = token(0, "ETH"); + let usdc = token(1, "USDC"); + + let (market, gm) = + setup_market_and_graph(vec![("pool", ð, &usdc, MockProtocolSim::new(2000.0))]); + + let graph = gm.graph(); + let eth_node = graph + .node_indices() + .find(|&n| graph[n] == eth.address) + .unwrap(); + let usdc_node = graph + .node_indices() + .find(|&n| graph[n] == usdc.address) + .unwrap(); + + let result = solve_one_to_all(eth_node, BigUint::from(100u64), 2, graph, &market); + + // Reconstruct path + let path = result + .reconstruct_path(usdc_node) + .unwrap(); + assert_eq!(path.len(), 1); + assert_eq!(path[0].2, "pool"); + + // Re-simulate + let (route, amount_out) = + resimulate_path(&path, &BigUint::from(100u64), &market, result.token_map()).unwrap(); + assert_eq!(route.swaps().len(), 1); + assert_eq!(amount_out, BigUint::from(200_000u64)); + } + + #[test] + fn forbid_revisits_prevents_cycles() { + // ETH -> A -> ETH would be a token revisit; should be forbidden + let eth = token(0, "ETH"); + let a = token(1, "A"); + let b = token(2, "B"); + + let (market, gm) = setup_market_and_graph(vec![ + ("eth_a", ð, &a, MockProtocolSim::new(2.0)), + ("a_b", &a, &b, MockProtocolSim::new(3.0)), + ]); + + let graph = gm.graph(); + let eth_node = graph + .node_indices() + .find(|&n| graph[n] == eth.address) + .unwrap(); + + // With forbid-revisits, ETH should not appear as reachable + // (it's the source, distance is set to probe amount, not a revisit result) + let result = solve_one_to_all(eth_node, BigUint::from(100u64), 4, graph, &market); + + // A should be reachable (1 hop) + let a_node = graph + .node_indices() + .find(|&n| graph[n] == a.address) + .unwrap(); + assert!(result.is_reachable(a_node)); + assert_eq!(*result.amount_at(a_node), BigUint::from(200u64)); + + // B should be reachable (2 hops) + let b_node = graph + .node_indices() + .find(|&n| graph[n] == b.address) + .unwrap(); + assert!(result.is_reachable(b_node)); + assert_eq!(*result.amount_at(b_node), BigUint::from(600u64)); + } +} diff --git a/fynd-core/src/algorithm/bf_helpers.rs b/fynd-core/src/algorithm/bf_helpers.rs new file mode 100644 index 00000000..7a8bc417 --- /dev/null +++ b/fynd-core/src/algorithm/bf_helpers.rs @@ -0,0 +1,153 @@ +//! Shared helpers for Bellman-Ford based algorithms (routing and pricing). +//! +//! Both `bellman_ford.rs` (A-to-B routing) and `bellman_ford_pricing.rs` (one-to-all pricing) +//! use forbid-revisits SPFA and share predecessor-chain utilities. + +use std::collections::{HashSet, VecDeque}; + +use petgraph::{graph::NodeIndex, prelude::EdgeRef}; + +use super::AlgorithmError; +use crate::{graph::petgraph::StableDiGraph, types::ComponentId}; + +/// Checks whether extending the path at `from` to `target_node` via `target_pool` +/// would create a node or pool revisit. Single walk of the predecessor chain. +pub(crate) fn path_has_conflict( + from: NodeIndex, + target_node: NodeIndex, + target_pool: &ComponentId, + predecessor: &[Option<(NodeIndex, ComponentId)>], +) -> bool { + let mut current = from; + loop { + if current == target_node { + return true; + } + match &predecessor[current.index()] { + Some((prev, cid)) => { + if cid == target_pool { + return true; + } + current = *prev; + } + None => return false, + } + } +} + +/// Reconstructs the path from `dest` back to `source` by walking the predecessor array. +pub(crate) fn reconstruct_path( + dest: NodeIndex, + source: NodeIndex, + predecessor: &[Option<(NodeIndex, ComponentId)>], +) -> Result, AlgorithmError> { + let mut path = Vec::new(); + let mut current = dest; + let mut visited = HashSet::new(); + + while current != source { + if !visited.insert(current) { + return Err(AlgorithmError::Other("cycle in predecessor chain".to_string())); + } + + let idx = current.index(); + match predecessor + .get(idx) + .and_then(|p| p.as_ref()) + { + Some((prev_node, component_id)) => { + path.push((*prev_node, current, component_id.clone())); + current = *prev_node; + } + None => { + return Err(AlgorithmError::Other(format!( + "broken predecessor chain at node {idx}" + ))); + } + } + } + + path.reverse(); + Ok(path) +} + +/// Extracts subgraph edges via BFS from `start` up to `max_depth` hops. +pub(crate) fn extract_subgraph_edges( + start: NodeIndex, + max_depth: usize, + graph: &StableDiGraph<()>, +) -> Vec<(NodeIndex, NodeIndex, ComponentId)> { + let mut visited = HashSet::new(); + let mut queue = VecDeque::new(); + let mut edges = Vec::new(); + + visited.insert(start); + queue.push_back((start, 0usize)); + + while let Some((node, depth)) = queue.pop_front() { + if depth >= max_depth { + continue; + } + + for edge in graph.edges(node) { + let target = edge.target(); + let component_id = &edge.weight().component_id; + + edges.push((node, target, component_id.clone())); + + if visited.insert(target) { + queue.push_back((target, depth + 1)); + } + } + } + + edges +} + +#[cfg(test)] +mod tests { + use petgraph::graph::NodeIndex; + + use super::*; + + #[test] + fn path_has_conflict_detects_node_and_pool() { + // Path: 0 -[pool_a]-> 1 -[pool_b]-> 2 + let mut pred: Vec> = vec![None; 4]; + pred[1] = Some((NodeIndex::new(0), "pool_a".into())); + pred[2] = Some((NodeIndex::new(1), "pool_b".into())); + + // Node conflict: node 0 is in path + assert!(path_has_conflict(NodeIndex::new(2), NodeIndex::new(0), &"any".into(), &pred)); + // No conflict: node 3 is not in path + assert!(!path_has_conflict(NodeIndex::new(2), NodeIndex::new(3), &"any".into(), &pred)); + // Self-check: node 2 is itself in the path + assert!(path_has_conflict(NodeIndex::new(2), NodeIndex::new(2), &"any".into(), &pred)); + // Pool conflict: pool_a used + assert!(path_has_conflict(NodeIndex::new(2), NodeIndex::new(3), &"pool_a".into(), &pred)); + // Pool conflict: pool_b used + assert!(path_has_conflict(NodeIndex::new(2), NodeIndex::new(3), &"pool_b".into(), &pred)); + // No pool conflict: pool_c not used + assert!(!path_has_conflict(NodeIndex::new(2), NodeIndex::new(3), &"pool_c".into(), &pred)); + } + + #[test] + fn reconstruct_path_simple() { + // Path: 0 -[pool_a]-> 1 -[pool_b]-> 2 + let mut pred: Vec> = vec![None; 3]; + pred[1] = Some((NodeIndex::new(0), "pool_a".into())); + pred[2] = Some((NodeIndex::new(1), "pool_b".into())); + + let path = reconstruct_path(NodeIndex::new(2), NodeIndex::new(0), &pred).unwrap(); + assert_eq!(path.len(), 2); + assert_eq!(path[0], (NodeIndex::new(0), NodeIndex::new(1), "pool_a".into())); + assert_eq!(path[1], (NodeIndex::new(1), NodeIndex::new(2), "pool_b".into())); + } + + #[test] + fn reconstruct_path_broken_chain() { + let pred: Vec> = vec![None; 3]; + let result = reconstruct_path(NodeIndex::new(2), NodeIndex::new(0), &pred); + assert!(result.is_err()); + } +} diff --git a/fynd-core/src/algorithm/mod.rs b/fynd-core/src/algorithm/mod.rs index 774cdd18..53c281a0 100644 --- a/fynd-core/src/algorithm/mod.rs +++ b/fynd-core/src/algorithm/mod.rs @@ -19,6 +19,8 @@ //! 3. Register it in `registry.rs` pub mod bellman_ford; +pub mod bellman_ford_pricing; +pub(crate) mod bf_helpers; pub mod most_liquid; #[cfg(test)] diff --git a/fynd-core/src/derived/computations/token_gas_price.rs b/fynd-core/src/derived/computations/token_gas_price.rs index e467695b..fe033e11 100644 --- a/fynd-core/src/derived/computations/token_gas_price.rs +++ b/fynd-core/src/derived/computations/token_gas_price.rs @@ -1,71 +1,55 @@ -//! Computes the `mid_price` of tokens relative to a gas token (e.g., ETH), selecting paths -//! by the lowest spread (the most reliable price) derived from full simulation of both buy and sell -//! directions. +//! Computes the `mid_price` of tokens relative to a gas token (e.g., ETH), using +//! Bellman-Ford SPFA to find the optimal path per token and full simulation of both +//! buy and sell directions to derive spread and mid-price. //! //! # Algorithm //! -//! 1. **Path Discovery (DFS)**: Enumerate all paths from gas_token to each reachable token, scoring -//! by spot-price spread: `|forward_spot - 1/reverse_spot|`. Lower spread = better score. +//! 1. **BF Forward Pass (one-to-all)**: Run SPFA from gas_token with a probe amount. Each token's +//! distance = the best amount reachable via simulation during relaxation. This replaces DFS path +//! enumeration AND spot-price scoring in a single pass. //! -//! 2. **Sort**: Order paths per token by spread score (lowest spread first). -//! -//! 3. **Round-Robin Simulation**: For each token, simulate paths in ranked order and compute their -//! spread and mid_price by simulating both directions on the same path. Pick the path with the -//! tightest spread for each token, as this indicates the most reliable/liquid route, and provide -//! its mid_price as the token's price. +//! 2. **Reverse Simulation**: For each priced token, reverse the winning path and simulate the sell +//! direction. Compute spread and mid_price from forward + reverse amounts. //! //! # Price Formulas //! //! For a path P from gas_token to target: -//! - `buy_out` = simulate(P, probe_amount) → tokens received -//! - `sell_out` = simulate(reverse(P), buy_out) → gas_token received back +//! - `buy_out` = simulate(P, probe_amount) -> tokens received +//! - `sell_out` = simulate(reverse(P), buy_out) -> gas_token received back //! - `buy_price` = buy_out / (probe_amount + gas_cost) //! - `sell_price` = buy_out / (sell_out - gas_cost) //! - `mid_price` = (buy_price + sell_price) / 2 //! - `spread` = |sell_price - buy_price| -//! -//! # Dependencies -//! -//! This computation depends on [`SpotPrices`](crate::derived::types::SpotPrices) being -//! available in the [`DerivedDataStore`](crate::derived::store::DerivedDataStore). -//! Ensure `SpotPriceComputation` runs before this computation. use std::collections::{HashMap, HashSet}; use async_trait::async_trait; use num_bigint::BigUint; use num_traits::ToPrimitive; -use petgraph::{graph::NodeIndex, prelude::EdgeRef}; +use petgraph::graph::NodeIndex; use tracing::{debug, instrument, trace, Span}; use tycho_simulation::{ tycho_common::models::Address, tycho_core::simulation::protocol_sim::Price, }; use crate::{ + algorithm::bellman_ford_pricing::{resimulate_path, solve_one_to_all, SpfaAllResult}, derived::{ computation::{ComputationId, DerivedComputation}, error::ComputationError, manager::{ChangedComponents, SharedDerivedDataRef}, - types::{SpotPriceKey, SpotPrices, TokenGasPrices, TokenPriceEntry, TokenPricesWithDeps}, + types::{TokenGasPrices, TokenPriceEntry, TokenPricesWithDeps}, }, feed::market_data::{SharedMarketData, SharedMarketDataRef}, - graph::{GraphManager, Path, PetgraphStableDiGraphManager}, + graph::{GraphManager, PetgraphStableDiGraphManager}, types::ComponentId, - MostLiquidAlgorithm, }; -/// A path with its score -#[derive(Clone)] -struct CandidatePath<'a> { - path: Path<'a, ()>, - score: f64, -} - -/// Computes token prices relative to the gas token. Returns the buy price for the path -/// with the lowest spread (most reliable) that we managed to find. +/// Computes token prices relative to the gas token using Bellman-Ford SPFA. /// -/// Uses DFS to discover paths, spot prices for ranking, and full simulation -/// for accurate output amounts and spread calculation. +/// Runs a single BF forward pass to find the optimal path per token, then +/// simulates the reverse direction on each winning path to compute spread +/// and mid-price. #[derive(Debug, Clone)] pub struct TokenGasPriceComputation { /// The gas token address (e.g., ETH). @@ -102,99 +86,7 @@ impl TokenGasPriceComputation { Self { gas_token, ..self } } - /// DFS to discover all paths from gas_token, scored by spot-price spread. - fn discover_paths<'a>( - &self, - graph_manager: &'a PetgraphStableDiGraphManager<()>, - spot_prices: &SpotPrices, - ) -> Result>>, ComputationError> { - let graph = graph_manager.graph(); - - // If gas token has no pools, it won't be in the graph → no paths to discover - let Ok(entry_node) = graph_manager.find_node(&self.gas_token) else { - return Ok(HashMap::new()); - }; - - let mut paths_by_token: HashMap> = HashMap::new(); - - // DFS state - struct DfsFrame<'a> { - token_node: NodeIndex, - path: Path<'a, ()>, - forward_spot: f64, - reverse_spot: f64, - } - - let mut stack = vec![DfsFrame { - token_node: entry_node, - path: Path::new(), - forward_spot: 1.0, - reverse_spot: 1.0, - }]; - - while let Some(frame) = stack.pop() { - // Token that we reached in this frame - let token_reached = &graph[frame.token_node]; - - // Record non-empty paths (skip the starting node's empty path) - if !frame.path.is_empty() { - // Compute spread from spot prices: - // buy_price = forward_spot (target per gas when buying) - // sell_price = 1/reverse_spot (target per gas when selling) - // spread = |buy_price - sell_price| - // Score = spread directly (lower = better, 0 for symmetric pools) - let buy_price = frame.forward_spot; - let sell_price = 1.0 / frame.reverse_spot; - let spot_spread = (buy_price - sell_price).abs(); - - paths_by_token - .entry(token_reached.clone()) - .or_default() - .push(CandidatePath { path: frame.path.clone(), score: spot_spread }); - } - - // Stop exploring further if max depth reached - if frame.path.len() >= self.max_hops { - continue; - } - - // Explore neighbors - for edge in graph.edges(frame.token_node) { - let next_node = edge.target(); - let next_token = &graph[next_node]; - - let mut new_path = frame.path.clone(); - new_path.add_hop(token_reached, edge.weight(), next_token); - - let component_id = edge.weight().component_id.clone(); - - // Look up spot prices for this edge - let fwd_key: SpotPriceKey = - (component_id.clone(), token_reached.clone(), next_token.clone()); - let rev_key: SpotPriceKey = - (component_id.clone(), next_token.clone(), token_reached.clone()); - - // Skip edges with missing spot prices (pool may have failed spot price computation) - let Some(&fwd_spot) = spot_prices.get(&fwd_key) else { - continue; - }; - let Some(&rev_spot) = spot_prices.get(&rev_key) else { - continue; - }; - - stack.push(DfsFrame { - token_node: next_node, - path: new_path, - forward_spot: frame.forward_spot * fwd_spot, - reverse_spot: frame.reverse_spot * rev_spot, - }); - } - } - - Ok(paths_by_token) - } - - /// Compute the spread and mid_price for a given path by simulating both directions. + /// Computes spread and mid_price for a given BF path by simulating both directions. /// /// Returns (spread_ratio, mid_price, path_components) where: /// - spread_ratio: |sell - buy|, lower = more reliable @@ -202,53 +94,39 @@ impl TokenGasPriceComputation { /// - path_components: component IDs used in this path (for incremental invalidation) fn compute_spread_and_mid_price( &self, - path: Path<()>, + forward_path: &[(NodeIndex, NodeIndex, ComponentId)], market: &SharedMarketData, gas_price: &BigUint, + spfa_result: &SpfaAllResult, ) -> Result<(f64, Price, HashSet), ComputationError> { - // Extract component IDs from path edges for dependency tracking - let path_components: HashSet = path - .edge_data + let path_components: HashSet = forward_path .iter() - .map(|edge| edge.component_id.clone()) + .map(|(_, _, cid)| cid.clone()) .collect(); - // Forward: gas_token → target_token - let buy_result = - MostLiquidAlgorithm::simulate_path(&path, market, None, self.simulation_amount.clone()) - .map_err(|e| { - ComputationError::SimulationFailed(format!("buy simulation failed: {}", e)) - })?; - let buy_gas_units = buy_result.route().total_gas(); - let buy_gas_cost = &buy_gas_units * gas_price; // Convert gas units to actual cost - let buy_out = buy_result - .into_route() - .into_swaps() - .into_iter() - .last() - .ok_or(ComputationError::Internal("no output from buy simulation".into()))? - .amount_out() - .clone(); - - // Reverse: target_token → gas_token - let reversed_path = path.reversed(); - - let sell_result = - MostLiquidAlgorithm::simulate_path(&reversed_path, market, None, buy_out.clone()) - .map_err(|e| { - ComputationError::SimulationFailed(format!("sell simulation failed: {}", e)) - })?; - let sell_gas_units = sell_result.route().total_gas(); - let sell_gas_cost = &sell_gas_units * gas_price; // Convert gas units to actual cost - let sell_out = sell_result - .into_route() - .into_swaps() - .into_iter() - .last() - .ok_or(ComputationError::Internal("no output from sell simulation".into()))? - .amount_out() - .clone(); - - // Convert to f64 for mid_price calculation + + let token_map = spfa_result.token_map(); + + // Forward: gas_token -> target_token + let (forward_route, buy_out) = + resimulate_path(forward_path, &self.simulation_amount, market, token_map).map_err( + |e| ComputationError::SimulationFailed(format!("buy simulation failed: {}", e)), + )?; + let buy_gas_cost = forward_route.total_gas() * gas_price; + + // Reverse: target_token -> gas_token + let reversed_path: Vec<_> = forward_path + .iter() + .rev() + .map(|(from, to, cid)| (*to, *from, cid.clone())) + .collect(); + + let (reverse_route, sell_out) = + resimulate_path(&reversed_path, &buy_out, market, token_map).map_err(|e| { + ComputationError::SimulationFailed(format!("sell simulation failed: {}", e)) + })?; + let sell_gas_cost = reverse_route.total_gas() * gas_price; + + // Convert to f64 for spread calculation let buy_out_f = buy_out .to_f64() .ok_or(ComputationError::Internal("overflow computing buy_out".into()))?; @@ -282,8 +160,6 @@ impl TokenGasPriceComputation { let spread = (sell_price - buy_price).abs(); // Compute mid_price in numerator/denominator form (precise BigUint arithmetic) - // numerator = buy_out * (sell_out - sell_gas_cost) + buy_out * (sim_amount + buy_gas_cost) - // denominator = 2 * (sim_amount + buy_gas_cost) * (sell_out - sell_gas_cost) let sell_out_net = &sell_out - &sell_gas_cost; // Safe: checked above let buy_price_precise = Price { numerator: &buy_out * &sell_out_net + @@ -296,152 +172,86 @@ impl TokenGasPriceComputation { Ok((spread, buy_price_precise, path_components)) } - /// Core simulation logic: discovers paths, runs round-robin simulation, - /// returns best prices with dependency tracking and block number. + /// Core pricing logic: runs BF forward pass, then reverse-simulates each winning path. /// - /// Takes two brief read locks on market: - /// 1. Clone topology + gas_price + block (cheap) - /// 2. `extract_subset` with only the components on candidate paths - /// - /// Path discovery (cheap DFS) runs twice to avoid holding borrows across await - /// points. The expensive part — EVM simulation — runs lock-free on the subset. - /// - /// # Arguments - /// - /// * `market`: The market data to simulate token prices on. - /// * `spot_prices`: The spot prices to use for the simulation. - /// * `filter_tokens`: An optional set of tokens to filter the simulation by. If None, all - /// tokens are simulated. - /// - /// # Returns - /// - /// A tuple containing the best prices and the block number. + /// If `filter_tokens` is `Some`, only prices those tokens (incremental mode). + /// If `None`, prices all reachable tokens (full mode). #[allow(clippy::type_complexity)] - async fn simulate_token_prices( + fn simulate_token_prices( &self, - market: &SharedMarketDataRef, - spot_prices: &SpotPrices, + market: &SharedMarketData, + gas_price: &BigUint, filter_tokens: Option<&HashSet
>, - ) -> Result<(HashMap)>, u64), ComputationError> { - // Brief lock 1: topology + gas_price + block (all cheap clones) - let (topology, gas_price, block) = { - let guard = market.read().await; - let topology = guard.component_topology(); - let block = guard - .last_updated() - .map(|b| b.number()) - .unwrap_or(0); - let gas_price = guard - .gas_price() - .ok_or(ComputationError::MissingDependency("gas_price"))? - .effective_gas_price(); - (topology, gas_price, block) - }; - - // Discover paths to find which components candidate paths need (cheap DFS) - let needed_component_ids = { - let mut graph_manager = PetgraphStableDiGraphManager::new(); - graph_manager.initialize_graph(&topology); - let mut paths = self.discover_paths(&graph_manager, spot_prices)?; - if let Some(tokens) = filter_tokens { - paths.retain(|token, _| tokens.contains(token)); - } - paths - .values() - .flatten() - .flat_map(|c| { - c.path - .edge_data - .iter() - .map(|e| e.component_id.clone()) - }) - .collect::>() - }; - - // Brief lock 2: extract only the simulation states we need - let subset = { - market - .read() - .await - .extract_subset(&needed_component_ids) - }; - - // Rediscover paths from subset + simulate (no lock, expensive EVM simulation) + ) -> Result)>, ComputationError> { let mut graph_manager = PetgraphStableDiGraphManager::new(); - graph_manager.initialize_graph(&subset.component_topology()); - let mut paths_by_token = self.discover_paths(&graph_manager, spot_prices)?; - - // Optionally filter to only requested tokens - if let Some(tokens) = filter_tokens { - paths_by_token.retain(|token, _| tokens.contains(token)); - } + graph_manager.initialize_graph(&market.component_topology()); + let graph = graph_manager.graph(); - // Collect all component IDs from every candidate path per token. - // This ensures path_components captures any pool that could flip which path is best, - // not just pools on the currently-selected path. - let all_candidate_components: HashMap> = paths_by_token - .iter() - .map(|(token, candidates)| { - let components = candidates - .iter() - .flat_map(|c| { - c.path - .edge_data - .iter() - .map(|e| e.component_id.clone()) - }) - .collect::>(); - (token.clone(), components) - }) - .collect(); + // If gas token has no pools, it won't be in the graph + let Ok(source_node) = graph_manager.find_node(&self.gas_token) else { + return Ok(HashMap::new()); + }; - // Sort each token's paths: lowest spread last (for popping) - for paths in paths_by_token.values_mut() { - paths.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap()); - } + // BF forward pass: prices all tokens in one traversal + let spfa_result = solve_one_to_all( + source_node, + self.simulation_amount.clone(), + self.max_hops, + graph, + market, + ); - // Round-robin: pop one candidate per token each round, keep best by spread + let token_map = spfa_result.token_map(); let mut best_prices: HashMap)> = HashMap::new(); - let mut candidates_exhausted = false; - while !candidates_exhausted { - candidates_exhausted = true; + for (&node, token) in token_map { + // Skip source token and unreachable tokens + if token.address == self.gas_token || !spfa_result.is_reachable(node) { + continue; + } - for (token, candidate_paths) in paths_by_token.iter_mut() { - let Some(candidate) = candidate_paths.pop() else { + // Optionally filter to only requested tokens + if let Some(filter) = filter_tokens { + if !filter.contains(&token.address) { continue; - }; - candidates_exhausted = false; - - match self.compute_spread_and_mid_price(candidate.path, &subset, &gas_price) { - Ok((spread, price, components)) => { - let is_better = best_prices - .get(token) - .map(|(existing_spread, _, _)| spread < *existing_spread) - .unwrap_or(true); - if is_better { - trace!( - token = ?token, - spread_ratio = spread, - "found better price (lower spread)" - ); - best_prices.insert(token.clone(), (spread, price, components)); - } - } - Err(_) => continue, } } - } - // Extend each token's path_components with all candidate path components so - // incremental recomputation fires when any competing path's pool changes. - for (token, (_, _, components)) in best_prices.iter_mut() { - if let Some(all_comps) = all_candidate_components.get(token) { - components.extend(all_comps.iter().cloned()); + // Reconstruct the winning forward path + let forward_path = match spfa_result.reconstruct_path(node) { + Ok(p) => p, + Err(e) => { + trace!( + token = ?token.address, + error = %e, + "failed to reconstruct path, skipping" + ); + continue; + } + }; + + // Compute spread and mid-price via forward + reverse simulation + match self.compute_spread_and_mid_price(&forward_path, market, gas_price, &spfa_result) + { + Ok((spread, price, components)) => { + trace!( + token = ?token.address, + spread_ratio = spread, + "computed token price" + ); + best_prices.insert(token.address.clone(), (spread, price, components)); + } + Err(e) => { + trace!( + token = ?token.address, + error = %e, + "price computation failed, skipping" + ); + } } } - Ok((best_prices, block)) + Ok(best_prices) } /// Attempts incremental recomputation for state-only changes. @@ -456,28 +266,19 @@ impl TokenGasPriceComputation { store: &SharedDerivedDataRef, changed: &ChangedComponents, ) -> Result, ComputationError> { - // Read all needed data from store in a single lock acquisition. - let (existing_deps, existing_prices, spot_prices) = { - let store_guard = store.read().await; - - // Need existing deps to do incremental computation. - let Some(existing_deps) = store_guard.token_prices_deps().cloned() else { - return Ok(None); // No deps stored yet, need full compute - }; - let Some(existing_prices) = store_guard.token_prices().cloned() else { - return Ok(None); - }; - let spot_prices = store_guard - .spot_prices() - .ok_or(ComputationError::MissingDependency("spot_prices"))? - .clone(); + let store_guard = store.read().await; - (existing_deps, existing_prices, spot_prices) + // Need existing deps to do incremental computation + let Some(existing_deps) = store_guard.token_prices_deps() else { + return Ok(None); // No deps stored yet, need full compute + }; + let Some(existing_prices) = store_guard.token_prices() else { + return Ok(None); }; let changed_components = changed.all_changed_ids(); - // Find tokens whose paths intersect with changed components. + // Find tokens whose paths intersect with changed components let tokens_to_recompute: HashSet
= existing_deps .iter() .filter(|(_, entry)| { @@ -489,18 +290,32 @@ impl TokenGasPriceComputation { .collect(); if tokens_to_recompute.is_empty() { - return Ok(Some(existing_prices)); + return Ok(Some(existing_prices.clone())); } + let existing_prices = existing_prices.clone(); + let existing_deps = existing_deps.clone(); + drop(store_guard); + debug!( affected_tokens = tokens_to_recompute.len(), total_tokens = existing_prices.len(), "incremental token price recomputation" ); - let (best_prices, block) = self - .simulate_token_prices(market, &spot_prices, Some(&tokens_to_recompute)) - .await?; + let market = market.read().await; + let block = market + .last_updated() + .map(|b| b.number()) + .unwrap_or(0); + + let gas_price = market + .gas_price() + .ok_or(ComputationError::MissingDependency("gas_price"))? + .effective_gas_price(); + + let best_prices = + self.simulate_token_prices(&market, &gas_price, Some(&tokens_to_recompute))?; // Merge results into existing prices and deps let mut result = existing_prices; @@ -545,27 +360,27 @@ impl DerivedComputation for TokenGasPriceComputation { // For topology changes or full recompute, do a full computation // For state-only changes, use incremental computation if !changed.is_full_recompute && !changed.is_topology_change() { - // Try incremental computation if we have existing path dependencies if let Some(result) = self .try_incremental_compute(market, store, changed) .await? { return Ok(result); } - // Fall through to full compute if incremental is not possible } - // Read spot prices from store (independent of market lock). - let spot_prices = store - .read() - .await - .spot_prices() - .ok_or(ComputationError::MissingDependency("spot_prices"))? - .clone(); + let market = market.read().await; + + let block = market + .last_updated() + .map(|b| b.number()) + .unwrap_or(0); + + let gas_price = market + .gas_price() + .ok_or(ComputationError::MissingDependency("gas_price"))? + .effective_gas_price(); - let (best_prices, block) = self - .simulate_token_prices(market, &spot_prices, None) - .await?; + let best_prices = self.simulate_token_prices(&market, &gas_price, None)?; // Build token prices with dependencies for incremental computation let mut token_prices_with_deps = TokenPricesWithDeps::new(); @@ -608,7 +423,7 @@ mod tests { use super::*; use crate::{ algorithm::test_utils::{component, market_read, setup_market, token, MockProtocolSim}, - derived::{computations::spot_price::SpotPriceComputation, store::DerivedData}, + derived::store::DerivedData, }; // ==================== Constants ==================== @@ -620,203 +435,21 @@ mod tests { // ==================== Test Helpers ==================== - /// Sets up a complete test environment: market with pools + precomputed spot prices. + /// Sets up a complete test environment: market with pools. /// Returns (market_guard, store) ready for computation. async fn setup_test_env( pools: Vec<(&str, &Token, &Token, MockProtocolSim)>, ) -> (SharedMarketDataRef, SharedDerivedDataRef) { - let (wrapped_market, _) = setup_market(pools.clone()); - + let (wrapped_market, _) = setup_market(pools); let wrapped_store = DerivedData::new_shared(); - let spot_comp = SpotPriceComputation::new(); - let changed = ChangedComponents { - added: pools - .iter() - .map(|(id, t1, t2, _)| { - (id.to_string(), vec![t1.address.clone(), t2.address.clone()]) - }) - .collect(), - removed: vec![], - updated: vec![], - is_full_recompute: true, - }; - let spot_prices = spot_comp - .compute(&wrapped_market, &wrapped_store, &changed) - .await - .expect("spot price computation should succeed"); - wrapped_store - .try_write() - .unwrap() - .set_spot_prices(spot_prices, 0); - (wrapped_market, wrapped_store) } - async fn setup_graph_and_spot_prices( - pools: Vec<(&str, &Token, &Token, MockProtocolSim)>, - ) -> (PetgraphStableDiGraphManager<()>, SpotPrices) { - let (market, derived) = setup_test_env(pools).await; - let market = market_read(&market); - - let mut graph = PetgraphStableDiGraphManager::new(); - graph.initialize_graph(&market.component_topology()); - - let spot_prices = derived - .try_write() - .unwrap() - .spot_prices() - .unwrap() - .clone(); - (graph, spot_prices) - } - /// Creates a computation configured for the given gas token with standard settings. fn computation_for(gas_token: &Address) -> TokenGasPriceComputation { TokenGasPriceComputation::new(gas_token.clone(), 2, BigUint::from(SIM_AMOUNT)) } - // ==================== discover_paths tests ==================== - - #[tokio::test] - async fn test_discover_paths_single_hop() { - let eth = token(0, "ETH"); - let usdc = token(1, "USDC"); - - let (graph_manager, spot_prices) = - setup_graph_and_spot_prices(vec![("pool", ð, &usdc, MockProtocolSim::new(2000.0))]) - .await; - - let computation = computation_for(ð.address); - let paths = computation - .discover_paths(&graph_manager, &spot_prices) - .unwrap(); - - // Exactly 1 path to USDC (single hop via "pool") - let usdc_paths = &paths[&usdc.address]; - assert_eq!(usdc_paths.len(), 1, "should have exactly 1 path to USDC"); - - let path = &usdc_paths[0]; - assert_eq!(path.path.len(), 1, "path should be single hop"); - assert_eq!(path.path.edge_data[0].component_id, "pool"); - - // For a symmetric pool, spread = 0 - assert_eq!(path.score, 0.0); - } - - #[tokio::test] - async fn test_discover_paths_multi_hop() { - let eth = token(0, "ETH"); - let mid = token(2, "MID"); - let target = token(3, "TARGET"); - - let (graph, spot_prices) = setup_graph_and_spot_prices(vec![ - ("hop1", ð, &mid, MockProtocolSim::new(2.0)), - ("hop2", &mid, &target, MockProtocolSim::new(3.0)), - ]) - .await; - - let computation = computation_for(ð.address); - let paths = computation - .discover_paths(&graph, &spot_prices) - .unwrap(); - - // MID: exactly 1 path (1-hop via hop1) - let mid_paths = &paths[&mid.address]; - assert_eq!(mid_paths.len(), 1, "should have exactly 1 path to MID"); - assert_eq!(mid_paths[0].path.len(), 1, "MID path should be 1 hop"); - assert_eq!(mid_paths[0].path.edge_data[0].component_id, "hop1"); - assert_eq!(mid_paths[0].score, 0.0); - - // TARGET: exactly 1 path (2-hop via hop1 → hop2) - let target_paths = &paths[&target.address]; - assert_eq!(target_paths.len(), 1, "should have exactly 1 path to TARGET"); - assert_eq!(target_paths[0].path.len(), 2, "TARGET path should be 2 hops"); - assert_eq!(target_paths[0].path.edge_data[0].component_id, "hop1"); - assert_eq!(target_paths[0].path.edge_data[1].component_id, "hop2"); - assert_eq!(target_paths[0].score, 0.0); - } - - #[tokio::test] - async fn test_discover_paths_respects_max_hops() { - let eth = token(0, "ETH"); - let a = token(2, "A"); - let b = token(3, "B"); - let c = token(4, "C"); - - let (graph, spot_prices) = setup_graph_and_spot_prices(vec![ - ("eth_a", ð, &a, MockProtocolSim::new(2.0)), - ("a_b", &a, &b, MockProtocolSim::new(2.0)), - ("b_c", &b, &c, MockProtocolSim::new(2.0)), - ]) - .await; - - // max_hops = 2 - let computation = computation_for(ð.address); - let paths = computation - .discover_paths(&graph, &spot_prices) - .unwrap(); - - // A: exactly 1 path (1 hop via eth_a) - let a_paths = &paths[&a.address]; - assert_eq!(a_paths.len(), 1, "should have exactly 1 path to A"); - assert_eq!(a_paths[0].path.len(), 1, "A path should be 1 hop"); - assert_eq!(a_paths[0].path.edge_data[0].component_id, "eth_a"); - assert_eq!(a_paths[0].score, 0.0); - - // B: exactly 1 path (2 hops via eth_a → a_b) - let b_paths = &paths[&b.address]; - assert_eq!(b_paths.len(), 1, "should have exactly 1 path to B"); - assert_eq!(b_paths[0].path.len(), 2, "B path should be 2 hops"); - assert_eq!(b_paths[0].path.edge_data[0].component_id, "eth_a"); - assert_eq!(b_paths[0].path.edge_data[1].component_id, "a_b"); - assert_eq!(b_paths[0].score, 0.0); - - // C: not reachable (would require 3 hops, exceeds max_hops=2) - assert!(!paths.contains_key(&c.address), "C should NOT be reachable (3 hops)"); - } - - #[tokio::test] - async fn test_discover_paths_returns_multiple_candidates() { - let eth = token(0, "ETH"); - let usdc = token(1, "USDC"); - - // Two pools with different spot prices - let (graph, spot_prices) = setup_graph_and_spot_prices(vec![ - ("pool_low", ð, &usdc, MockProtocolSim::new(1000.0)), - ("pool_high", ð, &usdc, MockProtocolSim::new(2000.0)), - ]) - .await; - - let computation = computation_for(ð.address); - let paths = computation - .discover_paths(&graph, &spot_prices) - .unwrap(); - - // Exactly 2 paths to USDC (one via each pool) - let usdc_paths = &paths[&usdc.address]; - assert_eq!(usdc_paths.len(), 2, "should have exactly 2 paths to USDC"); - - // MockProtocolSim's spot_price is symmetric: forward_spot = 1/reverse_spot, - // so spread = |forward - 1/reverse| = 0 for all pools. - // TODO: Test with asymmetric simulation component to verify non-zero spread ranking. - for path in usdc_paths { - assert_eq!(path.path.len(), 1, "path should be single hop"); - assert_eq!(path.score, 0.0, "symmetric mock produces zero spread"); - } - - // Verify both pools are discovered (order is arbitrary when scores are equal) - let component_ids: Vec<_> = usdc_paths - .iter() - .map(|p| { - p.path.edge_data[0] - .component_id - .as_str() - }) - .collect(); - assert!(component_ids.contains(&"pool_low")); - assert!(component_ids.contains(&"pool_high")); - } - // ==================== compute_spread_and_mid_price tests ==================== #[tokio::test] @@ -827,22 +460,22 @@ mod tests { // Non-trivial setup: 10% fee + significant gas (10% of sim_amount) // gas_units = 1e15, gas_cost = 1e15 * 100 = 1e17 (10% of 1e18) // - // Forward (ETH→USDC): + // Forward (ETH->USDC): // buy_out = 1e18 * 2000 * 0.9 = 1.8e21 // buy_gas_cost = 1e17 // - // Reverse (USDC→ETH): + // Reverse (USDC->ETH): // sell_out = 1.8e21 / 2000 * 0.9 = 8.1e17 // sell_gas_cost = 1e17 // // buy_price = buy_out / (sim_amount + buy_gas_cost) - // = 1.8e21 / (1e18 + 1e17) = 1.8e21 / 1.1e18 = 18000/11 ≈ 1636.36 + // = 1.8e21 / (1e18 + 1e17) = 1.8e21 / 1.1e18 = 18000/11 // // sell_price = buy_out / (sell_out - sell_gas_cost) - // = 1.8e21 / (8.1e17 - 1e17) = 1.8e21 / 7.1e17 = 180000/71 ≈ 2535.21 + // = 1.8e21 / (8.1e17 - 1e17) = 1.8e21 / 7.1e17 = 180000/71 // - // spread = |sell_price - buy_price| = 180000/71 - 18000/11 = 702000/781 ≈ 898.85 - // mid_price = (buy_price + sell_price) / 2 ≈ 2085.79 + // spread = |sell_price - buy_price| = 180000/71 - 18000/11 + // mid_price = (buy_price + sell_price) / 2 let gas_units: u64 = 1_000_000_000_000_000; // 1e15 let (market, _) = setup_test_env(vec![( "pool", @@ -855,22 +488,27 @@ mod tests { .await; let market = market_read(&market); - // Build path manually using graph - let mut graph = PetgraphStableDiGraphManager::new(); - graph.initialize_graph(&market.component_topology()); + // Build graph and run BF forward pass + let mut graph_manager = PetgraphStableDiGraphManager::new(); + graph_manager.initialize_graph(&market.component_topology()); + let graph = graph_manager.graph(); + let source = graph_manager + .find_node(ð.address) + .unwrap(); - let eth_node = graph.find_node(ð.address).unwrap(); - let path_edges: Vec<_> = graph.graph().edges(eth_node).collect(); - assert_eq!(path_edges.len(), 1); + let spfa_result = solve_one_to_all(source, BigUint::from(SIM_AMOUNT), 2, graph, &market); - let edge = path_edges[0].weight(); - let mut path = Path::new(); - path.add_hop(ð.address, edge, &usdc.address); + let dest = graph_manager + .find_node(&usdc.address) + .unwrap(); + let forward_path = spfa_result + .reconstruct_path(dest) + .unwrap(); let gas_price = BigUint::from(GAS_PRICE); let computation = computation_for(ð.address); let (spread, mid_price, _path_components) = computation - .compute_spread_and_mid_price(path, &market, &gas_price) + .compute_spread_and_mid_price(&forward_path, &market, &gas_price, &spfa_result) .unwrap(); // Expected values from exact fractions @@ -944,7 +582,7 @@ mod tests { } #[tokio::test] - async fn test_compute_selects_best_path_by_spread() { + async fn test_compute_selects_best_path_by_output() { // Diamond topology: two paths to C // // A (10% fee on eth_a) @@ -956,19 +594,18 @@ mod tests { // Only first hops have fees; second hops (a_c, b_c) are fee-free. // Gas = 0 to simplify calculations. // - // Path via A (eth_a=10% fee, a_c=0% fee): - // Forward: 1e18 * 2 * 0.9 * 5 = 9e18 - // Reverse: 9e18 / 5 / 2 * 0.9 = 0.81e18 - // buy_price = 9, sell_price = 9/0.81 = 100/9 - // spread_A = |100/9 - 9| = 19/9 ≈ 2.11 + // BF picks by highest forward output: + // + // Path via A: 1e18 * 2 * 0.9 * 5 = 9e18 (higher output) + // Path via B: 1e18 * 3 * 0.95 * 2 = 5.7e18 // - // Path via B (eth_b=5% fee, b_c=0% fee): - // Forward: 1e18 * 3 * 0.95 * 2 = 5.7e18 = (57/10)e18 - // Reverse: 5.7e18 / 2 / 3 * 0.95 = 0.9025e18 = (361/400)e18 - // buy_price = 57/10, sell_price = (57/10)/(361/400) = 2280/361 - // spread_B = |2280/361 - 57/10| = 2223/3610 ≈ 0.62 + // BF selects path via A for C. // - // spread_B < spread_A → Path via B selected. + // C via A: + // buy_out = 9e18 + // sell: C->A (9e18/5 = 1.8e18) -> A->ETH (1.8e18*0.9/2 = 0.81e18) + // buy_price = 9, sell_price = 9/0.81 = 100/9 + // mid_price = (81+100)/18 = 181/18 let eth = token(0, "ETH"); let a = token(2, "A"); let b = token(3, "B"); @@ -1006,10 +643,10 @@ mod tests { assert_eq!(prices.len(), 4, "should have prices for ETH, A, B, C"); // A: 1-hop from ETH with 10% fee - // buy_out = 1e18 * 2 * 0.9 = 1.8e18 = (9/5)e18 - // sell_out = 1.8e18 / 2 * 0.9 = 0.81e18 = (81/100)e18 - // buy_price = 9/5, sell_price = (9/5)/(81/100) = 9*100/(5*81) = 20/9 - // mid_price = (9/5 + 20/9) / 2 = (81 + 100) / 90 = 181/90 + // buy_out = 1e18 * 2 * 0.9 = 1.8e18 + // sell_out = 1.8e18 / 2 * 0.9 = 0.81e18 + // buy_price = 9/5, sell_price = (9/5)/(81/100) = 20/9 + // mid_price = (9/5 + 20/9) / 2 = 181/90 let a_price = prices .get(&a.address) .expect("A should have price"); @@ -1021,11 +658,10 @@ mod tests { ); // B: 1-hop from ETH with 5% fee - // buy_out = 1e18 * 3 * 0.95 = 2.85e18 = (57/20)e18 - // sell_out = 2.85e18 / 3 * 0.95 = 0.9025e18 = (361/400)e18 - // buy_price = 57/20, sell_price = (57/20)/(361/400) = 57*400/(20*361) = 1140/361 - // mid_price = (57/20 + 1140/361) / 2 = (57*361 + 1140*20) / (2*20*361) - // = (20577 + 22800) / 14440 = 43377/14440 + // buy_out = 1e18 * 3 * 0.95 = 2.85e18 + // sell_out = 2.85e18 / 3 * 0.95 = 0.9025e18 + // buy_price = 57/20, sell_price = 1140/361 + // mid_price = 43377/14440 let b_price = prices .get(&b.address) .expect("B should have price"); @@ -1036,40 +672,19 @@ mod tests { "B mid_price should be 43377/14440 = {expected_b}, got {b_ratio}" ); - // C: Path via B selected (lower spread) - // buy_out = 1e18 * 3 * 0.95 * 2 = 5.7e18 = (57/10)e18 - // sell_out = 5.7e18 / 2 / 3 * 0.95 = 0.9025e18 = (361/400)e18 - // buy_price = 57/10, sell_price = (57/10)/(361/400) = 2280/361 - // mid_price = (57/10 + 2280/361) / 2 = (20577 + 22800) / 7220 = 43377/7220 + // C: BF selects path via A (higher output: 9e18 > 5.7e18) + // buy_out = 9e18 + // sell_out = 0.81e18 + // buy_price = 9, sell_price = 100/9 + // mid_price = 181/18 let c_price = prices .get(&c.address) .expect("C should have price"); let c_ratio = c_price.numerator.to_f64().unwrap() / c_price.denominator.to_f64().unwrap(); - let expected_c = 43377.0 / 7220.0; + let expected_c = 181.0 / 18.0; assert!( (c_ratio - expected_c).abs() < 1e-10, - "C mid_price should be 43377/7220 = {expected_c} (via B), got {c_ratio}" - ); - } - - #[tokio::test] - async fn test_compute_missing_spot_prices_returns_error() { - let eth = token(0, "ETH"); - let usdc = token(1, "USDC"); - - // Create market without spot prices set - let (market, _) = setup_market(vec![("pool", ð, &usdc, MockProtocolSim::new(2000.0))]); - let derived = DerivedData::new_shared(); // No spot prices - let changed = ChangedComponents::default(); - - let computation = computation_for(ð.address); - let result = computation - .compute(&market, &derived, &changed) - .await; - - assert!( - matches!(result, Err(ComputationError::MissingDependency("spot_prices"))), - "should return MissingDependency for spot_prices" + "C mid_price should be 181/18 = {expected_c} (via A, highest output), got {c_ratio}" ); } @@ -1102,176 +717,96 @@ mod tests { } #[tokio::test] - async fn test_path_components_includes_all_candidate_paths() { - // Diamond topology: two paths to token_a - // - // pool_direct: ETH → token_a (fee-free, ratio=2, lower spread → selected) - // pool_indirect_1 + pool_indirect_2: ETH → token_b → token_a (higher spread) - // - // After full compute, token_a's path_components must include all three pool IDs - // even though only pool_direct is on the best path. + async fn test_compute_missing_gas_price_returns_error() { let eth = token(0, "ETH"); - let token_a = token(1, "A"); - let token_b = token(2, "B"); + let usdc = token(1, "USDC"); - let (market, derived) = setup_test_env(vec![ - ("pool_direct", ð, &token_a, MockProtocolSim::new(2.0).with_gas(0)), - ( - "pool_indirect_1", - ð, - &token_b, - MockProtocolSim::new(3.0) - .with_fee(0.1) - .with_gas(0), - ), - ("pool_indirect_2", &token_b, &token_a, MockProtocolSim::new(1.0).with_gas(0)), - ]) - .await; - let changed = ChangedComponents::default(); + // Create market without gas price set + let mut market = SharedMarketData::new(); + let comp = component("pool", &[eth.clone(), usdc.clone()]); + market.upsert_components(std::iter::once(comp)); + market.update_states([("pool".to_string(), Box::new(MockProtocolSim::new(2000.0)) as _)]); + market.upsert_tokens([eth.clone(), usdc.clone()]); + let market = SharedMarketData::new_shared(); + + let derived = DerivedData::new_shared(); + let changed = ChangedComponents { + added: std::collections::HashMap::from([( + "pool".to_string(), + vec![eth.address.clone(), usdc.address.clone()], + )]), + removed: vec![], + updated: vec![], + is_full_recompute: true, + }; let computation = computation_for(ð.address); - computation + let result = computation .compute(&market, &derived, &changed) - .await - .unwrap(); - - // Inspect stored deps to verify path_components - let store = derived.read().await; - let deps = store - .token_prices_deps() - .expect("deps should be stored"); - let entry = deps - .get(&token_a.address) - .expect("token_a should have deps"); + .await; assert!( - entry - .path_components - .contains("pool_direct"), - "path_components should contain pool_direct (best path)" - ); - assert!( - entry - .path_components - .contains("pool_indirect_1"), - "path_components should contain pool_indirect_1 (competing path)" - ); - assert!( - entry - .path_components - .contains("pool_indirect_2"), - "path_components should contain pool_indirect_2 (competing path)" + matches!(result, Err(ComputationError::MissingDependency("gas_price"))), + "should return MissingDependency for gas_price" ); } #[tokio::test] - async fn test_incremental_recompute_triggered_by_competing_path_pool() { - // Same diamond topology as above. - // After full compute, changing pool_indirect_1 (not on best path) must - // put token_a in tokens_to_recompute because it's now in path_components. + async fn test_compute_respects_max_hops() { let eth = token(0, "ETH"); - let token_a = token(1, "A"); - let token_b = token(2, "B"); + let a = token(2, "A"); + let b = token(3, "B"); + let c = token(4, "C"); let (market, derived) = setup_test_env(vec![ - ("pool_direct", ð, &token_a, MockProtocolSim::new(2.0).with_gas(0)), - ( - "pool_indirect_1", - ð, - &token_b, - MockProtocolSim::new(3.0) - .with_fee(0.1) - .with_gas(0), - ), - ("pool_indirect_2", &token_b, &token_a, MockProtocolSim::new(1.0).with_gas(0)), + ("eth_a", ð, &a, MockProtocolSim::new(2.0)), + ("a_b", &a, &b, MockProtocolSim::new(2.0)), + ("b_c", &b, &c, MockProtocolSim::new(2.0)), ]) .await; + let changed = ChangedComponents::default(); - // Full compute to store deps - let full_changed = ChangedComponents::default(); + // max_hops = 2 let computation = computation_for(ð.address); - computation - .compute(&market, &derived, &full_changed) + let prices = computation + .compute(&market, &derived, &changed) .await .unwrap(); - // Incremental change: only pool_indirect_1 updated - let incremental_changed = ChangedComponents { - added: HashMap::new(), - removed: vec![], - updated: vec!["pool_indirect_1".to_string()], - is_full_recompute: false, - }; - - let store = derived.read().await; - let deps = store - .token_prices_deps() - .expect("deps should be stored"); - let changed_ids = incremental_changed.all_changed_ids(); - - let tokens_to_recompute: HashSet
= deps - .iter() - .filter(|(_, entry)| { - !entry - .path_components - .is_disjoint(&changed_ids) - }) - .map(|(addr, _)| addr.clone()) - .collect(); - - assert!( - tokens_to_recompute.contains(&token_a.address), - "token_a should be scheduled for recomputation when pool_indirect_1 changes" - ); - assert!( - tokens_to_recompute.contains(&token_b.address), - "token_b should be scheduled for recomputation when pool_indirect_1 changes" - ); + // A (1 hop) and B (2 hops) should be priced, C (3 hops) should not + assert!(prices.contains_key(&a.address), "A should be priced (1 hop)"); + assert!(prices.contains_key(&b.address), "B should be priced (2 hops)"); + assert!(!prices.contains_key(&c.address), "C should NOT be priced (3 hops)"); } #[tokio::test] - async fn test_compute_missing_gas_price_returns_error() { + async fn test_compute_multiple_pools_same_pair() { let eth = token(0, "ETH"); let usdc = token(1, "USDC"); - // Create market without gas price set - let mut market = SharedMarketData::new(); - let comp = component("pool", &[eth.clone(), usdc.clone()]); - market.upsert_components(std::iter::once(comp)); - market.update_states([("pool".to_string(), Box::new(MockProtocolSim::new(2000.0)) as _)]); - market.upsert_tokens([eth.clone(), usdc.clone()]); - let market = SharedMarketData::new_shared(); - - // Compute spot prices - let derived = DerivedData::new_shared(); - let changed = ChangedComponents { - added: std::collections::HashMap::from([( - "pool".to_string(), - vec![eth.address.clone(), usdc.address.clone()], - )]), - removed: vec![], - updated: vec![], - is_full_recompute: true, - }; + // Two pools with different spot prices; BF picks higher output + let (market, derived) = setup_test_env(vec![ + ("pool_low", ð, &usdc, MockProtocolSim::new(1000.0)), + ("pool_high", ð, &usdc, MockProtocolSim::new(2000.0)), + ]) + .await; + let changed = ChangedComponents::default(); - let spot_comp = SpotPriceComputation::new(); - let spot_prices = spot_comp + let computation = computation_for(ð.address); + let prices = computation .compute(&market, &derived, &changed) .await .unwrap(); - derived - .try_write() - .unwrap() - .set_spot_prices(spot_prices, 0); - - let computation = computation_for(ð.address); - let result = computation - .compute(&market, &derived, &changed) - .await; + // BF picks pool_high (higher output: 2000 > 1000) + let usdc_price = prices + .get(&usdc.address) + .expect("USDC should have price"); + let ratio = + usdc_price.numerator.to_f64().unwrap() / usdc_price.denominator.to_f64().unwrap(); assert!( - matches!(result, Err(ComputationError::MissingDependency("gas_price"))), - "should return MissingDependency for gas_price" + (ratio - 2000.0).abs() < 1e-6, + "mid-price should be ~2000 (via pool_high), got {ratio}" ); } } diff --git a/fynd-core/src/derived/manager.rs b/fynd-core/src/derived/manager.rs index bd59a956..7b3ad159 100644 --- a/fynd-core/src/derived/manager.rs +++ b/fynd-core/src/derived/manager.rs @@ -247,8 +247,9 @@ impl ComputationManager { /// /// **Dependency order**: /// 1. `SpotPriceComputation` - no dependencies - /// 2. `TokenGasPriceComputation` - depends on spot_prices in store - /// 3. `PoolDepthComputation` - no dependencies (runs in parallel with token prices) + /// 2. `TokenGasPriceComputation` - depends on gas_price (uses BF SPFA, no spot_prices + /// dependency) + /// 3. `PoolDepthComputation` - depends on spot_prices in store async fn compute_all(&self, changed: &ChangedComponents) { let total_start = Instant::now(); diff --git a/fynd-core/src/derived/mod.rs b/fynd-core/src/derived/mod.rs index 90cff15d..22a5cebf 100644 --- a/fynd-core/src/derived/mod.rs +++ b/fynd-core/src/derived/mod.rs @@ -18,14 +18,15 @@ //! //! ```text //! SpotPriceComputation -//! / \ -//! v v +//! / +//! v //! PoolDepthComputation TokenGasPriceComputation //! ``` //! //! - **SpotPriceComputation**: No dependencies, computes spot prices for all pools //! - **PoolDepthComputation**: Depends on `spot_prices` -//! - **TokenGasPriceComputation**: Depends on `spot_prices` and `gas_price` (from market data) +//! - **TokenGasPriceComputation**: Depends on `gas_price` (from market data); uses Bellman-Ford +//! SPFA //! //! # Example //! diff --git a/fynd-core/src/graph/mod.rs b/fynd-core/src/graph/mod.rs index b52b755b..22302f73 100644 --- a/fynd-core/src/graph/mod.rs +++ b/fynd-core/src/graph/mod.rs @@ -71,17 +71,6 @@ impl<'a, D> Path<'a, D> { .zip(self.edge_data.iter()) .map(|(tokens, edge)| (tokens[0], *edge, tokens[1])) } - - /// Creates a new reversed Path from the current one. - pub fn reversed(self) -> Self { - let reversed_tokens = self.tokens.into_iter().rev().collect(); - let reversed_edge_data = self - .edge_data - .into_iter() - .rev() - .collect(); - Self { tokens: reversed_tokens, edge_data: reversed_edge_data } - } } #[derive(Error, Debug)]