diff --git a/fynd-core/src/encoding/encoder.rs b/fynd-core/src/encoding/encoder.rs index 6b737b2f..e13fe6f0 100644 --- a/fynd-core/src/encoding/encoder.rs +++ b/fynd-core/src/encoding/encoder.rs @@ -77,6 +77,12 @@ impl TryFrom<&OrderQuote> for Solution { SolveError::FailedEncoding("successful quote must have a route".to_string()) })?; + // TODO: when a swap in this route is permissioned, read its `Swap::committed_amount_out` + // and carry it into the encoded `Solution` so the on-chain permissioned hook can be + // parameterised — the user is committed to that amount while the executed route yields the + // surplus. Hook calldata/signature encoding is a separate later extension, out of scope + // here. + let token_in = route .input_token() .ok_or_else(|| SolveError::FailedEncoding("route has no input token".to_string()))?; diff --git a/fynd-core/src/feed/mod.rs b/fynd-core/src/feed/mod.rs index 744d8260..224abac8 100644 --- a/fynd-core/src/feed/mod.rs +++ b/fynd-core/src/feed/mod.rs @@ -7,6 +7,8 @@ pub mod events; pub(crate) mod gas; /// Shared market data store (`MarketState`, `MarketData`). pub mod market_data; +/// Per-worker permission scoping for permissioned components. +pub mod permission; /// Protocol system registry: maps protocol names to their Tycho identifiers. pub mod protocol_registry; /// Tycho WebSocket feed: connects to the Tycho data stream and populates `MarketState`. diff --git a/fynd-core/src/feed/permission.rs b/fynd-core/src/feed/permission.rs new file mode 100644 index 00000000..38e16f3e --- /dev/null +++ b/fynd-core/src/feed/permission.rs @@ -0,0 +1,237 @@ +//! Per-worker permission scoping for permissioned components. +//! +//! "Permissioned" means accessible only to Fynd: such components must never appear in a normal +//! public quote, yet a dedicated worker is allowed to route through them to capture the surplus +//! they offer above the best public-market rate. Isolation is achieved by filtering each worker's +//! local graph topology/events through a `PermissionPolicy` according to its `ComponentScope` — the +//! shared `MarketState` is never duplicated. + +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use tycho_simulation::tycho_common::models::{protocol::ProtocolComponent, Address}; + +use crate::{ + feed::{events::MarketEvent, market_data::MarketState}, + types::ComponentId, +}; + +/// Classifies a [`ProtocolComponent`] as permissioned or public. +/// +/// The predicate is supplied by the caller rather than hard-coded against an id or protocol name, +/// so the notion of "permissioned" can evolve (e.g. a hook address allowlist) without touching the +/// routing core. +#[derive(Clone)] +pub struct PermissionPolicy { + /// Returns `true` when the component is permissioned and therefore excluded from public + /// quotes. + is_permissioned: Arc bool + Send + Sync>, +} + +impl PermissionPolicy { + /// Creates a policy from a predicate identifying permissioned components. + pub fn new(predicate: F) -> Self + where + F: Fn(&ProtocolComponent) -> bool + Send + Sync + 'static, + { + Self { is_permissioned: Arc::new(predicate) } + } + + /// Returns `true` if the component is permissioned. + pub fn is_permissioned(&self, component: &ProtocolComponent) -> bool { + (self.is_permissioned)(component) + } +} + +impl std::fmt::Debug for PermissionPolicy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PermissionPolicy") + .finish_non_exhaustive() + } +} + +/// The set of components a worker is allowed to see in its local graph. +/// +/// Determines whether permissioned components are filtered out before the worker builds or updates +/// its graph. Each worker gets exactly one scope for its lifetime, derived from its pool's role. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ComponentScope { + /// Public workers: permissioned components are dropped so they can never appear in a quote. + ExcludePermissioned, + /// Surplus workers: every component is visible, including permissioned ones. + IncludeAll, +} + +/// Per-worker permission scoping: which components a worker may ingest into its local graph. +/// +/// Bundles a [`ComponentScope`] with an optional [`PermissionPolicy`]. All graph filtering for a +/// worker flows through this type, so the worker never reasons about permissioned-ness itself — it +/// just hands its topology and incoming events here. +#[derive(Clone, Debug)] +pub(crate) struct PermissionContext { + /// Scope governing whether permissioned components are filtered out. + scope: ComponentScope, + /// Predicate classifying components as permissioned. `None` ⇒ no filtering is ever applied. + policy: Option, +} + +impl PermissionContext { + /// Default context: see every component, apply no permission filtering. + pub(crate) fn include_all() -> Self { + Self { scope: ComponentScope::IncludeAll, policy: None } + } + + /// Context derived from a pool's role: a scope plus the (optional) classifying policy. + pub(crate) fn new(scope: ComponentScope, policy: Option) -> Self { + Self { scope, policy } + } + + /// Returns the policy to enforce, or `None` when this worker filters nothing (scope is + /// `IncludeAll`, or no policy was configured). + fn active_policy(&self) -> Option<&PermissionPolicy> { + match (self.scope, &self.policy) { + (ComponentScope::ExcludePermissioned, Some(policy)) => Some(policy), + _ => None, + } + } + + /// Filters a full topology map to the components this worker may see. + /// + /// Public workers drop permissioned components; surplus workers (and workers with no policy) + /// receive the map unchanged. + pub(crate) fn filter_topology( + &self, + market: &MarketState, + topology: HashMap>, + ) -> HashMap> { + let Some(policy) = self.active_policy() else { + return topology; + }; + // TODO: drop entries whose ComponentId resolves (via market.get_component(id)) to a + // ProtocolComponent for which policy.is_permissioned is true. + let _ = (policy, market); + todo!("filter permissioned components from the worker's initial topology") + } + + /// Restricts a market event to the components this worker may see. + /// + /// Public workers drop permissioned component ids from the added/updated/removed lists so a + /// permissioned component is never ingested mid-stream; other workers see the event unchanged. + pub(crate) fn scope_event(&self, market: &MarketState, event: MarketEvent) -> MarketEvent { + let Some(policy) = self.active_policy() else { + return event; + }; + let MarketEvent::MarketUpdated { added_components, removed_components, updated_components } = + event; + + let added_ids: Vec = added_components + .keys() + .cloned() + .collect(); + let permitted_added: HashSet = self + .filter_component_ids(policy, market, &added_ids) + .into_iter() + .collect(); + let added_components = added_components + .into_iter() + .filter(|(id, _)| permitted_added.contains(id)) + .collect(); + let removed_components = self.filter_component_ids(policy, market, &removed_components); + let updated_components = self.filter_component_ids(policy, market, &updated_components); + + MarketEvent::MarketUpdated { added_components, removed_components, updated_components } + } + + /// Keeps only the component ids that are NOT permissioned under `policy`. + fn filter_component_ids( + &self, + policy: &PermissionPolicy, + market: &MarketState, + ids: &[ComponentId], + ) -> Vec { + // TODO: keep only ids whose ProtocolComponent (via market.get_component(id)) is NOT + // permissioned per policy.is_permissioned. + let _ = (policy, market, ids); + todo!("filter permissioned component ids from an incremental market event") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + algorithm::test_utils::{component, token}, + feed::{events::MarketEvent, market_data::MarketState}, + }; + + /// A predicate that treats the `vm:permissioned` protocol system as permissioned. + fn permissioned_protocol_policy() -> PermissionPolicy { + PermissionPolicy::new(|c: &ProtocolComponent| c.protocol_system == "vm:permissioned") + } + + fn permissioned_component(id: &str) -> ProtocolComponent { + let mut c = component(id, &[token(0x01, "A"), token(0x02, "B")]); + c.protocol_system = "vm:permissioned".to_string(); + c + } + + fn public_component(id: &str) -> ProtocolComponent { + component(id, &[token(0x01, "A"), token(0x02, "B")]) + } + + fn market_with(components: Vec) -> MarketState { + let mut market = MarketState::new(); + market.upsert_components(components); + market + } + + #[test] + #[ignore = "scaffold: filter helpers are todo!()"] + fn is_permissioned_reflects_predicate() { + let policy = permissioned_protocol_policy(); + assert!(policy.is_permissioned(&permissioned_component("perm-1"))); + assert!(!policy.is_permissioned(&public_component("pub-1"))); + } + + #[test] + #[ignore = "scaffold: filter helpers are todo!()"] + fn filter_topology_excludes_permissioned() { + let policy = permissioned_protocol_policy(); + let market = market_with(vec![public_component("pub-1"), permissioned_component("perm-1")]); + let topology = market.component_topology(); + + let public = PermissionContext::new(ComponentScope::ExcludePermissioned, Some(policy)); + let public_view = public.filter_topology(&market, topology.clone()); + assert!(public_view.contains_key("pub-1")); + assert!(!public_view.contains_key("perm-1")); + + let surplus_view = + PermissionContext::include_all().filter_topology(&market, topology.clone()); + assert_eq!(surplus_view.len(), topology.len()); + } + + #[test] + #[ignore = "scaffold: filter helpers are todo!()"] + fn scope_event_excludes_permissioned() { + let policy = permissioned_protocol_policy(); + let market = market_with(vec![public_component("pub-1"), permissioned_component("perm-1")]); + let event = MarketEvent::MarketUpdated { + added_components: HashMap::from([ + ("pub-1".to_string(), vec![]), + ("perm-1".to_string(), vec![]), + ]), + removed_components: vec!["pub-1".to_string(), "perm-1".to_string()], + updated_components: vec!["pub-1".to_string(), "perm-1".to_string()], + }; + + let public = PermissionContext::new(ComponentScope::ExcludePermissioned, Some(policy)); + let MarketEvent::MarketUpdated { added_components, removed_components, updated_components } = + public.scope_event(&market, event); + assert!(added_components.contains_key("pub-1")); + assert!(!added_components.contains_key("perm-1")); + assert_eq!(removed_components, vec!["pub-1".to_string()]); + assert_eq!(updated_components, vec!["pub-1".to_string()]); + } +} diff --git a/fynd-core/src/lib.rs b/fynd-core/src/lib.rs index b16b2d99..3582416b 100644 --- a/fynd-core/src/lib.rs +++ b/fynd-core/src/lib.rs @@ -57,7 +57,9 @@ pub use price_guard::{ config::PriceGuardConfig, provider::{ExternalPrice, PriceProvider, PriceProviderError}, }; -pub use solver::{FyndBuilder, PoolConfig, Solver, SolverBuildError, SolverParts, WaitReadyError}; +pub use solver::{ + FyndBuilder, PoolConfig, PoolRoleConfig, Solver, SolverBuildError, SolverParts, WaitReadyError, +}; /// Processes ephemeral pending bundles against live Tycho market state. Obtained by calling /// [`FyndBuilder::build_with_pending`](solver::FyndBuilder::build_with_pending). pub use tycho_simulation::evm::pending::PendingBlockProcessor; @@ -73,11 +75,13 @@ pub use types::{ BlockInfo, ClientFeeParams, ComponentId, EncodingOptions, FeeBreakdown, Order, OrderQuote, OrderSide, OrderValidationError, PermitDetails, PermitSingle, Quote, QuoteOptions, QuoteRequest, QuoteStatus, Route, RouteValidationError, SingleOrderQuote, SolveError, - SolveParams, SolveResult, Swap, TaskId, Transaction, UserTransferType, + SolveParams, SolveResult, SurplusInfo, Swap, TaskId, Transaction, UserTransferType, }; pub use worker_pool::{ pool::{WorkerPool, WorkerPoolBuilder, WorkerPoolConfig}, registry::UnknownAlgorithmError, TaskQueueHandle, }; -pub use worker_pool_router::{config::WorkerPoolRouterConfig, SolverPoolHandle, WorkerPoolRouter}; +pub use worker_pool_router::{ + config::WorkerPoolRouterConfig, PoolRole, SolverPoolHandle, WorkerPoolRouter, +}; diff --git a/fynd-core/src/solver.rs b/fynd-core/src/solver.rs index 6d9af5cc..64d40996 100644 --- a/fynd-core/src/solver.rs +++ b/fynd-core/src/solver.rs @@ -31,6 +31,7 @@ use crate::{ events::{MarketEvent, MarketEventHandler}, gas::GasPriceFetcher, market_data::MarketData, + permission::{ComponentScope, PermissionPolicy}, tycho_feed::TychoFeed, TychoFeedConfig, }, @@ -43,7 +44,9 @@ use crate::{ pool::{WorkerPool, WorkerPoolBuilder}, registry::UnknownAlgorithmError, }, - worker_pool_router::{config::WorkerPoolRouterConfig, SolverPoolHandle, WorkerPoolRouter}, + worker_pool_router::{ + config::WorkerPoolRouterConfig, PoolRole, SolverPoolHandle, WorkerPoolRouter, + }, Algorithm, Quote, QuoteRequest, SolveError, }; @@ -119,6 +122,40 @@ fn parse_connector_tokens( Ok(Some(set)) } +/// Role a configured pool plays in routing: public-only or permissioned-inclusive. +/// +/// Serialized in lowercase (`"public"` / `"surplus"`) in `worker_pools.toml`. Maps to the router's +/// `PoolRole` and the worker's `ComponentScope`: a public pool excludes permissioned components; a +/// surplus pool routes through all liquidity to capture surplus above the public rate. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum PoolRoleConfig { + /// Public pool: routes through public liquidity only (the committed reference). Default. + #[default] + Public, + /// Surplus pool: routes through all liquidity, including permissioned components, to capture + /// surplus above the public rate. + Surplus, +} + +impl PoolRoleConfig { + /// The router role this config maps to. + fn pool_role(self) -> PoolRole { + match self { + Self::Public => PoolRole::Public, + Self::Surplus => PoolRole::All, + } + } + + /// The per-worker component scope this config maps to. + fn component_scope(self) -> ComponentScope { + match self { + Self::Public => ComponentScope::ExcludePermissioned, + Self::Surplus => ComponentScope::IncludeAll, + } + } +} + /// Per-pool configuration for [`FyndBuilder::add_pool`]. #[must_use] #[derive(Debug, Clone, Serialize, Deserialize)] @@ -147,6 +184,9 @@ pub struct PoolConfig { /// Absent = no restriction. Typically 3–10 entries (e.g. WETH, USDC, USDT, DAI). #[serde(default)] connector_tokens: Option>, + /// Pool role: `public` (default) or `surplus` (permissioned-inclusive). + #[serde(default)] + role: PoolRoleConfig, } impl PoolConfig { @@ -161,6 +201,7 @@ impl PoolConfig { timeout_ms: defaults::POOL_TIMEOUT_MS, max_routes: None, connector_tokens: None, + role: PoolRoleConfig::Public, } } @@ -169,6 +210,17 @@ impl PoolConfig { &self.algorithm } + /// Returns the pool role. + pub fn role(&self) -> PoolRoleConfig { + self.role + } + + /// Sets the pool role (public or surplus). + pub fn with_role(mut self, role: PoolRoleConfig) -> Self { + self.role = role; + self + } + /// Returns the number of worker threads. pub fn num_workers(&self) -> usize { self.num_workers @@ -304,10 +356,21 @@ enum PoolEntry { timeout_ms: u64, max_routes: Option, connector_tokens: Option>, + role: PoolRoleConfig, }, Custom(CustomPoolEntry), } +impl PoolEntry { + /// Returns the configured role for this pool. + fn role(&self) -> PoolRoleConfig { + match self { + PoolEntry::BuiltIn { role, .. } => *role, + PoolEntry::Custom(custom) => custom.role, + } + } +} + /// Pool entry backed by a custom [`Algorithm`] implementation. struct CustomPoolEntry { name: String, @@ -317,6 +380,8 @@ struct CustomPoolEntry { max_hops: usize, timeout_ms: u64, max_routes: Option, + /// Pool role: public (default) or surplus. + role: PoolRoleConfig, /// Applies the custom algorithm to a `WorkerPoolBuilder`. configure: Box WorkerPoolBuilder + Send>, } @@ -367,6 +432,9 @@ pub struct FyndBuilder { price_guard_enabled: bool, price_providers: Vec>, pending_indexers: Vec<(String, Box)>, + /// Predicate identifying permissioned components. Shared by all pools: public pools exclude + /// matching components, the surplus pool includes them. `None` ⇒ no pool filters anything. + permission_policy: Option, } impl FyndBuilder { @@ -400,6 +468,7 @@ impl FyndBuilder { price_guard_enabled: false, price_providers: Vec::new(), pending_indexers: Vec::new(), + permission_policy: None, } } @@ -503,6 +572,7 @@ impl FyndBuilder { timeout_ms: defaults::POOL_TIMEOUT_MS, max_routes: None, connector_tokens: None, + role: PoolRoleConfig::Public, }); self } @@ -529,6 +599,7 @@ impl FyndBuilder { max_hops: defaults::POOL_MAX_HOPS, timeout_ms: defaults::POOL_TIMEOUT_MS, max_routes: None, + role: PoolRoleConfig::Public, configure, })); self @@ -584,6 +655,21 @@ impl FyndBuilder { self } + /// Sets the predicate that classifies components as permissioned. + /// + /// Public pools exclude matching components from their graphs; the surplus pool includes them. + /// Without a policy, no pool performs any permission filtering. + pub fn permission_policy(mut self, predicate: F) -> Self + where + F: Fn(&tycho_simulation::tycho_common::models::protocol::ProtocolComponent) -> bool + + Send + + Sync + + 'static, + { + self.permission_policy = Some(PermissionPolicy::new(predicate)); + self + } + /// Adds a named pool using the given [`PoolConfig`]. /// /// # Errors @@ -606,6 +692,7 @@ impl FyndBuilder { timeout_ms: config.timeout_ms(), max_routes: config.max_routes(), connector_tokens, + role: config.role(), }); Ok(self) } @@ -668,10 +755,21 @@ impl FyndBuilder { let mut solver_pool_handles: Vec = Vec::new(); let mut worker_pools: Vec = Vec::new(); - for pool_entry in self.pools { + // Shared across all pools: public pools exclude permissioned components, the surplus pool + // includes them. `None` ⇒ no pool filters anything (original behaviour). + let permission_policy = self.permission_policy.take(); + let pools = std::mem::take(&mut self.pools); + + for pool_entry in pools { let pool_event_rx = tycho_feed.subscribe(); let derived_rx = derived_event_tx.subscribe(); + // Derive the per-worker component scope and the router-facing pool role from the + // configured role, then thread the shared permission policy through the builder. + let role_config = pool_entry.role(); + let pool_role = role_config.pool_role(); + let component_scope = role_config.component_scope(); + let (worker_pool, task_handle) = match pool_entry { PoolEntry::BuiltIn { name, @@ -683,6 +781,7 @@ impl FyndBuilder { timeout_ms, max_routes, connector_tokens, + role: _, } => { let mut algo_cfg = AlgorithmConfig::new( min_hops, @@ -693,18 +792,22 @@ impl FyndBuilder { if let Some(tokens) = connector_tokens { algo_cfg = algo_cfg.with_connector_tokens(tokens); } - WorkerPoolBuilder::new() + let mut builder = WorkerPoolBuilder::new() .name(name) .algorithm(algorithm) .algorithm_config(algo_cfg) .num_workers(num_workers) .task_queue_capacity(task_queue_capacity) - .build( - market_data.clone(), - Arc::clone(&derived_data), - pool_event_rx, - derived_rx, - )? + .component_scope(component_scope); + if let Some(policy) = permission_policy.clone() { + builder = builder.permission_policy(policy); + } + builder.build( + market_data.clone(), + Arc::clone(&derived_data), + pool_event_rx, + derived_rx, + )? } PoolEntry::Custom(custom) => { let algo_cfg = AlgorithmConfig::new( @@ -713,11 +816,15 @@ impl FyndBuilder { Duration::from_millis(custom.timeout_ms), custom.max_routes, )?; - let builder = WorkerPoolBuilder::new() + let mut builder = WorkerPoolBuilder::new() .name(custom.name) .algorithm_config(algo_cfg) .num_workers(custom.num_workers) - .task_queue_capacity(custom.task_queue_capacity); + .task_queue_capacity(custom.task_queue_capacity) + .component_scope(component_scope); + if let Some(policy) = permission_policy.clone() { + builder = builder.permission_policy(policy); + } let builder = (custom.configure)(builder); builder.build( market_data.clone(), @@ -728,7 +835,8 @@ impl FyndBuilder { } }; - solver_pool_handles.push(SolverPoolHandle::new(worker_pool.name(), task_handle)); + solver_pool_handles + .push(SolverPoolHandle::new(worker_pool.name(), task_handle).with_role(pool_role)); worker_pools.push(worker_pool); } diff --git a/fynd-core/src/types/mod.rs b/fynd-core/src/types/mod.rs index 19ab1535..5db44511 100644 --- a/fynd-core/src/types/mod.rs +++ b/fynd-core/src/types/mod.rs @@ -25,6 +25,6 @@ pub use primitives::*; pub use quote::{ BlockInfo, ClientFeeParams, EncodingOptions, FeeBreakdown, Order, OrderQuote, OrderSide, OrderValidationError, PermitDetails, PermitSingle, Quote, QuoteOptions, QuoteRequest, - QuoteStatus, Route, RouteResult, RouteValidationError, SingleOrderQuote, SolveParams, Swap, - Transaction, UserTransferType, + QuoteStatus, Route, RouteResult, RouteValidationError, SingleOrderQuote, SolveParams, + SurplusInfo, Swap, Transaction, UserTransferType, }; diff --git a/fynd-core/src/types/quote.rs b/fynd-core/src/types/quote.rs index e05123ae..3524bc12 100644 --- a/fynd-core/src/types/quote.rs +++ b/fynd-core/src/types/quote.rs @@ -701,6 +701,41 @@ impl SingleOrderQuote { } } +/// Order-level surplus summary for a quote routed through a permissioned component: +/// `surplus_amount` is what the protocol captures (realized output minus the committed +/// public-market output the user is quoted), in the order's `token_out`. +/// +/// Informational only (observability). The value the encoder acts on is the per-leg +/// [`Swap::committed_amount_out`], since the on-chain hook captures surplus per component. +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SurplusInfo { + /// Surplus captured by the protocol: realized surplus-route output minus the committed + /// (public-reference) output. + #[serde_as(as = "DisplayFromStr")] + surplus_amount: BigUint, + /// The best public-market output the user is committed to (the quoted `amount_out`). + #[serde_as(as = "DisplayFromStr")] + committed_amount_out: BigUint, +} + +impl SurplusInfo { + /// Creates surplus info from the captured surplus and the committed public output. + pub fn new(surplus_amount: BigUint, committed_amount_out: BigUint) -> Self { + Self { surplus_amount, committed_amount_out } + } + + /// Returns the surplus amount captured by the protocol. + pub fn surplus_amount(&self) -> &BigUint { + &self.surplus_amount + } + + /// Returns the committed public-market output the user is quoted. + pub fn committed_amount_out(&self) -> &BigUint { + &self.committed_amount_out + } +} + /// Quote for a single [`Order`]. /// /// Contains the route to execute (if found), along with expected amounts, @@ -752,6 +787,14 @@ pub struct OrderQuote { /// The state overlay this quote was computed against. /// When no overlay was requested this is the block number of the base state at solve time. solved_against: StateLabel, + /// Order-level surplus summary, populated when this quote executes through a permissioned + /// pool. + /// + /// Informational only (observability); the value the encoder acts on is the per-leg + /// [`Swap::committed_amount_out`]. `None` for pure public quotes. `#[serde(skip)]` — internal + /// reporting data, not part of the wire format. + #[serde(skip)] + surplus: Option, } impl OrderQuote { @@ -786,9 +829,37 @@ impl OrderQuote { sender, receiver, solved_against, + surplus: None, } } + /// Attaches the order-level surplus summary (committed output + captured surplus). + /// + /// Set by the router when a surplus route is selected, for observability. The per-leg + /// [`Swap::committed_amount_out`] (not this) is what the encoder acts on. + // Called by the surplus overlay in `combine_with_surplus` (still scaffolded); no prod caller + // yet. + #[allow(dead_code)] + pub(crate) fn with_surplus(mut self, surplus: SurplusInfo) -> Self { + self.surplus = Some(surplus); + self + } + + /// Returns the captured surplus amount, if this quote routes through a permissioned component. + pub fn surplus_amount(&self) -> Option<&BigUint> { + self.surplus + .as_ref() + .map(SurplusInfo::surplus_amount) + } + + /// Returns the committed public-market output, if this quote routes through a permissioned + /// component. + pub fn committed_amount_out(&self) -> Option<&BigUint> { + self.surplus + .as_ref() + .map(SurplusInfo::committed_amount_out) + } + /// Sets the status of this quote. pub(crate) fn set_status(&mut self, status: QuoteStatus) { self.status = status; @@ -1482,6 +1553,15 @@ pub struct Swap { /// Decimal of the amount to be swapped in this operation (for example, 0.5 means 50%) #[serde_as(as = "DisplayFromStr")] split: f64, + /// Per-leg committed output for a permissioned swap. + /// + /// Set only on the single permissioned leg of a surplus route. The on-chain hook signs a + /// `maxExchangeRate` derived from this value (`committed_amount_out * denom / amount_in`); the + /// component then captures `amount_out - committed_amount_out` as surplus, denominated in this + /// swap's `token_out`. `None` for ordinary public swaps. In-process only — consumed by the + /// encoder; `#[serde(skip)]` so it never enters the wire format. + #[serde(skip)] + committed_amount_out: Option, } impl Swap { @@ -1509,6 +1589,7 @@ impl Swap { protocol_component, protocol_state, split: 0.0, + committed_amount_out: None, } } /// Sets the split fraction for this swap (e.g. 0.5 means 50% of a split route). @@ -1517,6 +1598,18 @@ impl Swap { self } + /// Sets the per-leg committed output for a permissioned swap. + /// + /// The router stamps this onto the single permissioned leg of a surplus route so the encoder + /// can derive the hook's `maxExchangeRate`. See [`Swap::committed_amount_out`]. + // Stamped by `combine_with_surplus`, whose body is still scaffolded (`todo!()`); no prod caller + // yet, mirroring the `OrderQuote::with_surplus` precedent. + #[allow(dead_code)] + pub(crate) fn with_committed_amount_out(mut self, committed_amount_out: BigUint) -> Self { + self.committed_amount_out = Some(committed_amount_out); + self + } + /// Returns the component ID of the liquidity pool. pub fn component_id(&self) -> &str { &self.component_id @@ -1566,6 +1659,14 @@ impl Swap { pub fn split(&self) -> &f64 { &self.split } + + /// Returns the per-leg committed output, if this is the permissioned leg of a surplus route. + /// + /// `None` for ordinary public swaps. When `Some`, the encoder derives the hook's + /// `maxExchangeRate` as `committed_amount_out * denom / amount_in`. + pub fn committed_amount_out(&self) -> Option<&BigUint> { + self.committed_amount_out.as_ref() + } } /// An encoded EVM transaction ready to be submitted on-chain. @@ -1722,6 +1823,42 @@ mod tests { assert!(id.contains('-')); // UUIDs contain dashes } + fn make_quote(amount_out: u64) -> OrderQuote { + OrderQuote::new( + "order-1".to_string(), + QuoteStatus::Success, + BigUint::from(1_000u64), + BigUint::from(amount_out), + BigUint::from(100_000u64), + BigUint::from(amount_out), + BlockInfo::new(1, "0x1".to_string(), 1), + "test".to_string(), + Bytes::from(make_address(0xAA).as_ref()), + Bytes::from(make_address(0xAA).as_ref()), + "1".to_string(), + ) + } + + #[test] + #[ignore = "scaffold: surplus wiring not yet exercised end-to-end"] + fn surplus_round_trips_through_getters() { + let committed = BigUint::from(990u64); + let surplus = BigUint::from(15u64); + let quote = + make_quote(990).with_surplus(SurplusInfo::new(surplus.clone(), committed.clone())); + + assert_eq!(quote.surplus_amount(), Some(&surplus)); + assert_eq!(quote.committed_amount_out(), Some(&committed)); + } + + #[test] + #[ignore = "scaffold: surplus wiring not yet exercised end-to-end"] + fn public_quote_has_no_surplus() { + let quote = make_quote(990); + assert_eq!(quote.surplus_amount(), None); + assert_eq!(quote.committed_amount_out(), None); + } + // ------------------------------------------------------------------------- // Route Tests // ------------------------------------------------------------------------- diff --git a/fynd-core/src/worker_pool/pool.rs b/fynd-core/src/worker_pool/pool.rs index 21dc0c79..09b1be37 100644 --- a/fynd-core/src/worker_pool/pool.rs +++ b/fynd-core/src/worker_pool/pool.rs @@ -19,6 +19,7 @@ use crate::{ feed::{ events::{MarketEvent, MarketEventHandler}, market_data::MarketData, + permission::{ComponentScope, PermissionContext, PermissionPolicy}, }, graph::EdgeWeightUpdaterWithDerived, types::internal::SolveTask, @@ -45,6 +46,10 @@ pub struct WorkerPoolConfig { algorithm_config: AlgorithmConfig, /// Task queue capacity (maximum number of pending tasks). task_queue_capacity: usize, + /// Permission scope for this pool's workers (default: include all, no filtering). + component_scope: ComponentScope, + /// Permission predicate. `None` ⇒ no filtering regardless of scope. + permission_policy: Option, } impl WorkerPoolConfig { @@ -62,6 +67,8 @@ impl Default for WorkerPoolConfig { num_workers: num_cpus::get(), algorithm_config: AlgorithmConfig::default(), task_queue_capacity: 1000, + component_scope: ComponentScope::IncludeAll, + permission_policy: None, } } } @@ -112,6 +119,8 @@ impl WorkerPool { .to_string(); // Spawn workers + let permission = + PermissionContext::new(config.component_scope, config.permission_policy.clone()); let params = SpawnWorkersParams { algorithm: algorithm.clone(), num_workers: config.num_workers, @@ -122,6 +131,7 @@ impl WorkerPool { event_rx, derived_event_rx, shutdown_tx: shutdown_tx.clone(), + permission, }; let workers = config.spawner.spawn(params)?; @@ -252,6 +262,21 @@ impl WorkerPoolBuilder { self } + /// Sets the permission scope for this pool's workers. + /// + /// `ExcludePermissioned` (public pools) drops permissioned components from each worker's graph; + /// `IncludeAll` (surplus pools) keeps them. Only has effect when a policy is also set. + pub fn component_scope(mut self, scope: ComponentScope) -> Self { + self.config.component_scope = scope; + self + } + + /// Sets the permission policy used to classify components as permissioned. + pub fn permission_policy(mut self, policy: PermissionPolicy) -> Self { + self.config.permission_policy = Some(policy); + self + } + /// Builds and spawns the worker pool. /// /// Creates an internal task queue and returns both the worker pool and a handle diff --git a/fynd-core/src/worker_pool/registry.rs b/fynd-core/src/worker_pool/registry.rs index a981613a..bd4617a1 100644 --- a/fynd-core/src/worker_pool/registry.rs +++ b/fynd-core/src/worker_pool/registry.rs @@ -25,7 +25,7 @@ use crate::{ MostLiquidAlgorithm, PathFrankWolfeAlgorithm, }, derived::{events::DerivedDataEvent, SharedDerivedDataRef}, - feed::{events::MarketEvent, market_data::MarketData}, + feed::{events::MarketEvent, market_data::MarketData, permission::PermissionContext}, types::internal::SolveTask, worker_pool::worker::SolverWorker, }; @@ -56,6 +56,11 @@ pub(crate) struct SpawnWorkersParams { pub derived_event_rx: broadcast::Receiver, /// Sender for shutdown signals. pub shutdown_tx: broadcast::Sender<()>, + /// Permission scoping applied to every worker in this pool. + /// + /// Cloned per worker so each gets its own [`PermissionContext`]. Defaults to "include all" + /// (no filtering) for public pools without permission configuration. + pub permission: PermissionContext, } /// Error returned when algorithm registration fails. @@ -159,6 +164,7 @@ where let shutdown_rx = params.shutdown_tx.subscribe(); let algorithm_name = params.algorithm.clone(); let factory = factory.clone(); + let permission = params.permission.clone(); let handle = thread::Builder::new() .name(format!("{}-worker-{}", algorithm_name, worker_id)) @@ -172,7 +178,8 @@ where let algorithm = factory(algorithm_config); let mut worker = - SolverWorker::new(market_data, derived_data, algorithm, worker_id); + SolverWorker::new(market_data, derived_data, algorithm, worker_id) + .with_permission(permission); worker.initialize_graph().await; worker @@ -241,6 +248,7 @@ mod tests { event_rx, derived_event_rx, shutdown_tx, + permission: PermissionContext::include_all(), } } @@ -278,6 +286,7 @@ mod tests { event_rx, derived_event_rx, shutdown_tx: shutdown_tx.clone(), + permission: PermissionContext::include_all(), }; let workers = @@ -318,6 +327,7 @@ mod tests { event_rx: event_tx.subscribe(), derived_event_rx: derived_event_tx.subscribe(), shutdown_tx: shutdown_tx.clone(), + permission: PermissionContext::include_all(), }); assert!(registry_err.is_err()); @@ -344,6 +354,7 @@ mod tests { event_rx: event_tx.subscribe(), derived_event_rx: derived_event_tx.subscribe(), shutdown_tx: shutdown_tx.clone(), + permission: PermissionContext::include_all(), }); assert!(workers.is_ok()); diff --git a/fynd-core/src/worker_pool/worker.rs b/fynd-core/src/worker_pool/worker.rs index 8440d142..38583983 100644 --- a/fynd-core/src/worker_pool/worker.rs +++ b/fynd-core/src/worker_pool/worker.rs @@ -26,6 +26,7 @@ use crate::{ feed::{ events::{MarketEvent, MarketEventHandler}, market_data::MarketData, + permission::PermissionContext, }, graph::{EdgeWeightUpdaterWithDerived, GraphManager}, types::internal::SolveTask, @@ -57,6 +58,12 @@ where /// Worker identifier (for logging). // TODO: make this a string to include pool name worker_id: usize, + /// Permission scoping for this worker's local graph. + /// + /// `IncludeAll` with no policy (the default) preserves the original non-filtered behaviour. + /// A public worker is configured with `ExcludePermissioned` + a `PermissionPolicy` so + /// permissioned components never enter its graph; a surplus worker uses `IncludeAll`. + permission: PermissionContext, } impl SolverWorker @@ -91,9 +98,18 @@ where ready_notify: Arc::new(Notify::new()), initialized: false, worker_id, + permission: PermissionContext::include_all(), } } + /// Configures this worker's permission scoping. + /// + /// Public workers pass `ExcludePermissioned` + a policy; surplus workers pass `IncludeAll`. + pub(crate) fn with_permission(mut self, permission: PermissionContext) -> Self { + self.permission = permission; + self + } + /// Initializes the graph from MarketState. /// /// Call this on startup or to recreate the graph from the latest market topology. @@ -102,7 +118,9 @@ where let topology = { // read lock on market data let market = self.market_data.read().await; - market.component_topology().clone() // clone to avoid holding the lock + let topology = market.component_topology().clone(); // clone to avoid holding the lock + self.permission + .filter_topology(market.base_market_state(), topology) }; self.graph_manager @@ -112,6 +130,11 @@ where /// Processes a single market event. pub async fn process_event(&mut self, event: MarketEvent) { + let event = { + let market = self.market_data.read().await; + self.permission + .scope_event(market.base_market_state(), event) + }; match event { MarketEvent::MarketUpdated { .. } => { if let Err(e) = self diff --git a/fynd-core/src/worker_pool_router/mod.rs b/fynd-core/src/worker_pool_router/mod.rs index 8aac578c..a0a34b2e 100644 --- a/fynd-core/src/worker_pool_router/mod.rs +++ b/fynd-core/src/worker_pool_router/mod.rs @@ -24,7 +24,7 @@ pub mod config; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, time::{Duration, Instant}, }; @@ -45,6 +45,19 @@ use crate::{ QuoteOptions, QuoteRequest, QuoteStatus, SolveError, SolveParams, }; +/// The role a solver pool (a group of workers) plays in a quote. +/// +/// A `Public` pool routes only through public liquidity and provides the committed (quoted) +/// reference output. The single `All` pool also routes through permissioned components and may beat +/// that reference, in which case the protocol captures the surplus. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PoolRole { + /// Routes through public liquidity only. Establishes the committed reference output. + Public, + /// Routes through all liquidity, including permissioned components; source of surplus quotes. + All, +} + /// Handle to a solver pool for dispatching orders. #[derive(Clone)] pub struct SolverPoolHandle { @@ -52,12 +65,20 @@ pub struct SolverPoolHandle { name: String, /// Queue handle for this pool. queue: TaskQueueHandle, + /// Whether this pool routes public-only or all liquidity. + role: PoolRole, } impl SolverPoolHandle { - /// Creates a new solver pool handle. + /// Creates a new solver pool handle with the default [`PoolRole::Public`] role. pub fn new(name: impl Into, queue: TaskQueueHandle) -> Self { - Self { name: name.into(), queue } + Self { name: name.into(), queue, role: PoolRole::Public } + } + + /// Sets the pool's role (e.g. [`PoolRole::All`] for the permissioned-inclusive pool). + pub fn with_role(mut self, role: PoolRole) -> Self { + self.role = role; + self } /// Returns the pool name. @@ -69,6 +90,11 @@ impl SolverPoolHandle { pub fn queue(&self) -> &TaskQueueHandle { &self.queue } + + /// Returns the pool's role. + pub fn role(&self) -> PoolRole { + self.role + } } /// Collected responses for a single order from multiple solvers. @@ -83,6 +109,34 @@ pub(crate) struct OrderResponses { failed_solvers: Vec<(String, SolveError)>, } +impl OrderResponses { + /// Returns a copy keeping only candidates from public-role pools. + /// + /// These form the committed reference and the ranked fallback chain (ranked by `rank_quotes`, + /// consumed by the price guard); surplus-pool candidates are overlaid separately by + /// `combine_with_surplus`. `failed_solvers` is retained so placeholder construction is + /// unchanged. + fn public_only(&self, pool_roles: &HashMap) -> OrderResponses { + let quotes = self + .quotes + .iter() + .filter(|(pool, _)| { + pool_roles + .get(pool) + .copied() + .unwrap_or(PoolRole::Public) == + PoolRole::Public + }) + .cloned() + .collect(); + OrderResponses { + order_id: self.order_id.clone(), + quotes, + failed_solvers: self.failed_solvers.clone(), + } + } +} + /// Orchestrates multiple solver pools to find the best quote. pub struct WorkerPoolRouter { /// All registered solver pools. @@ -160,10 +214,32 @@ impl WorkerPoolRouter { refine_gas_estimates(&mut order_responses, encoding_options)?; } - // Rank quotes for each order (sorted by refined amount_out_net_gas descending) + // Map each pool name to its role so candidate quotes can be split into public vs surplus. + let pool_roles: HashMap = self + .solver_pools + .iter() + .map(|p| (p.name().to_string(), p.role())) + .collect(); + let has_all_pool = pool_roles + .values() + .any(|r| *r == PoolRole::All); + + // Rank quotes for each order (sorted by refined amount_out_net_gas descending). + // `rank_quotes` produces the public ranking — the committed reference AND the price-guard + // fallback chain. When an `All`-role pool is configured, the surplus winner is overlaid + // onto that ranked list (prepended) by `combine_with_surplus`, so the fallbacks are + // preserved. let ranked_quotes: Vec> = order_responses .into_iter() - .map(|responses| self.rank_quotes(&responses, request.options())) + .map(|responses| { + if has_all_pool { + let public_ranked = + self.rank_quotes(&responses.public_only(&pool_roles), request.options()); + combine_with_surplus(&responses, &pool_roles, request.options(), public_ranked) + } else { + self.rank_quotes(&responses, request.options()) + } + }) .collect(); // Validate against external prices when the client explicitly enables it. @@ -283,6 +359,11 @@ impl WorkerPoolRouter { // Extract the OrderQuote from SingleOrderQuote quotes.push((pool_name.clone(), single_quote.order().clone())); + // TODO: make this gating role-aware for surplus quotes. The surplus + // route can only be priced once BOTH at least one public candidate + // (the committed reference) AND the surplus pool have responded (or the + // deadline elapses). A plain count-based early return may fire before + // the surplus pool reports, silently dropping the surplus opportunity. // Early return if min_responses reached if min_responses > 0 && quotes.len() >= min_responses { debug!( @@ -469,6 +550,44 @@ impl WorkerPoolRouter { } } +/// Overlays the surplus winner onto the ranked public fallback list for one order. +/// +/// `public_ranked` is the public-only ranking from `rank_quotes` — both the committed reference and +/// the price-guard fallback chain. If the best surplus candidate beats the committed reference +/// net-of-gas, the executed surplus quote is returned at the head of the list (its `amount_out` +/// pinned to the committed reference, an order-level `SurplusInfo` attached, and each permissioned +/// leg's `Swap::committed_amount_out` set), preserving the public candidates as fallbacks. +/// Otherwise `public_ranked` is returned unchanged, so the user is never quoted worse than the +/// public market. +/// +/// Each permissioned leg's `committed_amount_out` is its realized output reduced by the same +/// proportion as the order-level reduction; the protocol captures the difference. The per-leg +/// attribution formula and the bound that keeps the user at or above the committed reference are +/// derived in the design plan. +fn combine_with_surplus( + responses: &OrderResponses, + pool_roles: &HashMap, + options: &QuoteOptions, + public_ranked: Vec, +) -> Vec { + // The committed reference and fallback chain are already computed (`public_ranked`); this + // function only overlays the surplus winner. TODO: implement the overlay (see the design plan + // for the per-leg attribution formula and the user-never-short-changed bound): + // + // 1. best_surplus = best candidate from surplus-role pools (per `pool_roles`) by + // `amount_out_net_gas`, whose route has exactly one permissioned leg per path positioned as + // that path's terminal leg (`is_permissioned(&swap.protocol_component)`); reject + // multi-permissioned-per-path routes (out of scope for v1); respect `options.max_gas`. + // 2. committed reference = `public_ranked.first()`. If there is none, or best_surplus does not + // beat it net-of-gas, return `public_ranked` unchanged (no surplus). + // 3. Otherwise build the executed quote from the surplus route, set each permissioned leg's + // `Swap::with_committed_amount_out(...)`, pin the user `amount_out` to the committed + // reference, attach `SurplusInfo` via `OrderQuote::with_surplus`, and PREPEND it to + // `public_ranked`. debug_assert! that the user output is ≥ the committed reference. + let _ = (responses, pool_roles, options); + public_ranked +} + fn refine_gas_estimates( order_responses: &mut Vec, encoding_options: &EncodingOptions, @@ -941,4 +1060,66 @@ mod tests { assert_eq!(*result[0].amount_out_net_gas(), BigUint::from(950u64)); assert_eq!(*result[1].amount_out_net_gas(), BigUint::from(800u64)); } + + /// Builds an `OrderResponses` with a public quote and a surplus quote. + /// + /// `amount_out` doubles as `amount_out_net_gas` here for simplicity. + fn surplus_responses(public_out: u64, surplus_out: u64) -> OrderResponses { + let public = make_single_quote(public_out) + .order() + .clone(); + let surplus = make_single_quote(surplus_out) + .order() + .clone(); + OrderResponses { + order_id: "test-order".to_string(), + quotes: vec![ + ("public_pool".to_string(), public), + ("surplus_pool".to_string(), surplus), + ], + failed_solvers: vec![], + } + } + + fn surplus_pool_roles() -> HashMap { + HashMap::from([ + ("public_pool".to_string(), PoolRole::Public), + ("surplus_pool".to_string(), PoolRole::All), + ]) + } + + #[test] + #[ignore = "scaffold: surplus overlay in combine_with_surplus is todo"] + fn combine_prefers_surplus_when_it_beats_public() { + let responses = surplus_responses(900, 950); + let public_ranked = vec![make_single_quote(900).order().clone()]; + let combined = combine_with_surplus( + &responses, + &surplus_pool_roles(), + &QuoteOptions::default(), + public_ranked, + ); + + // The surplus winner is at the head: user is quoted the committed public output, protocol + // captures the surplus. The public candidate remains as a fallback. + assert_eq!(*combined[0].amount_out(), BigUint::from(900u64)); + assert_eq!(combined[0].committed_amount_out(), Some(&BigUint::from(900u64))); + assert_eq!(combined[0].surplus_amount(), Some(&BigUint::from(50u64))); + } + + #[test] + #[ignore = "scaffold: surplus overlay in combine_with_surplus is todo"] + fn combine_falls_back_to_public_when_surplus_does_not_beat_it() { + let responses = surplus_responses(950, 900); + let public_ranked = vec![make_single_quote(950).order().clone()]; + let combined = combine_with_surplus( + &responses, + &surplus_pool_roles(), + &QuoteOptions::default(), + public_ranked, + ); + + assert_eq!(*combined[0].amount_out(), BigUint::from(950u64)); + assert_eq!(combined[0].surplus_amount(), None); + } } diff --git a/fynd-rpc-types/src/lib.rs b/fynd-rpc-types/src/lib.rs index f017965d..0a4efbd1 100644 --- a/fynd-rpc-types/src/lib.rs +++ b/fynd-rpc-types/src/lib.rs @@ -1700,6 +1700,10 @@ mod conversions { } impl From for OrderQuote { + // NOTE: `surplus_amount` and `committed_amount_out` (permissioned-component surplus) are + // intentionally NOT mapped onto this public response DTO — they are internal (the per-leg + // committed amount reaches the encoder; the order-level surplus is for observability). + // Exposing them would leak the captured surplus to clients. fn from(core: fynd_core::OrderQuote) -> Self { let order_id = core.order_id().to_string(); let status = core.status().into(); diff --git a/fynd-rpc/src/config.rs b/fynd-rpc/src/config.rs index 6a611b94..1872ac40 100644 --- a/fynd-rpc/src/config.rs +++ b/fynd-rpc/src/config.rs @@ -23,6 +23,15 @@ num_workers = 3 task_queue_capacity = 1000 max_hops = 2 timeout_ms = 500 + +# Example: a permissioned-inclusive "surplus" pool (see repo-root worker_pools.toml). +# Public pools (role omitted; defaults to "public") never see permissioned components. +# [pools.surplus] +# algorithm = "bellman_ford" +# num_workers = 3 +# max_hops = 2 +# timeout_ms = 500 +# role = "surplus" "#; /// Worker pools configuration loaded from TOML file. diff --git a/worker_pools.toml b/worker_pools.toml index d3f90c09..aea61045 100644 --- a/worker_pools.toml +++ b/worker_pools.toml @@ -12,6 +12,16 @@ task_queue_capacity = 1000 max_hops = 2 timeout_ms = 500 +# Example: a permissioned-inclusive "surplus" pool. `role = "surplus"` lets this pool route +# through permissioned components to capture surplus above the best public rate. +# Public pools (role omitted; defaults to "public") never see permissioned components. +# [pools.surplus] +# algorithm = "bellman_ford" +# num_workers = 3 +# max_hops = 2 +# timeout_ms = 500 +# role = "surplus" + # Example: connector_tokens restricts intermediate hops to trusted, liquid tokens, # reducing on-chain reversion risk. Absent = all tokens reachable (default behaviour). # Run `fynd derive-connector-tokens --top-n 10` to generate a fresh list for your chain.