Skip to content

liquidity: Allow setting process_events callback in c_bindings #3533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 4 additions & 65 deletions lightning-liquidity/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::lsps0::ser::{
LSPS_MESSAGE_TYPE_ID,
};
use crate::lsps0::service::LSPS0ServiceHandler;
use crate::message_queue::MessageQueue;
use crate::message_queue::{MessageQueue, ProcessMessagesCallback};

use crate::lsps1::client::{LSPS1ClientConfig, LSPS1ClientHandler};
use crate::lsps1::msgs::LSPS1Message;
Expand All @@ -17,7 +17,7 @@ use crate::lsps1::service::{LSPS1ServiceConfig, LSPS1ServiceHandler};
use crate::lsps2::client::{LSPS2ClientConfig, LSPS2ClientHandler};
use crate::lsps2::msgs::LSPS2Message;
use crate::lsps2::service::{LSPS2ServiceConfig, LSPS2ServiceHandler};
use crate::prelude::{new_hash_map, new_hash_set, HashMap, HashSet, ToString, Vec};
use crate::prelude::{new_hash_map, new_hash_set, Box, HashMap, HashSet, ToString, Vec};
use crate::sync::{Arc, Mutex, RwLock};

use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
Expand Down Expand Up @@ -315,69 +315,8 @@ where {
/// ```
///
/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
#[cfg(feature = "std")]
pub fn set_process_msgs_callback(&self, callback: impl Fn() + Send + Sync + 'static) {
self.pending_messages.set_process_msgs_callback(callback)
}

/// Allows to set a callback that will be called after new messages are pushed to the message
/// queue.
///
/// Usually, you'll want to use this to call [`PeerManager::process_events`] to clear the
/// message queue. For example:
///
/// ```
/// # use lightning::io;
/// # use lightning_liquidity::LiquidityManager;
/// # use std::sync::{Arc, RwLock};
/// # use std::sync::atomic::{AtomicBool, Ordering};
/// # use std::time::SystemTime;
/// # struct MyStore {}
/// # impl lightning::util::persist::KVStore for MyStore {
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
/// # }
/// # struct MyEntropySource {}
/// # impl lightning::sign::EntropySource for MyEntropySource {
/// # fn get_secure_random_bytes(&self) -> [u8; 32] { [0u8; 32] }
/// # }
/// # struct MyEventHandler {}
/// # impl MyEventHandler {
/// # async fn handle_event(&self, _: lightning::events::Event) {}
/// # }
/// # #[derive(Eq, PartialEq, Clone, Hash)]
/// # struct MySocketDescriptor {}
/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
/// # fn disconnect_socket(&mut self) {}
/// # }
/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface;
/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator;
/// # type MyNodeSigner = dyn lightning::sign::NodeSigner;
/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup;
/// # type MyFilter = dyn lightning::chain::Filter;
/// # type MyLogger = dyn lightning::util::logger::Logger;
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
/// # type MyLiquidityManager = LiquidityManager<Arc<MyEntropySource>, Arc<MyChannelManager>, Arc<MyFilter>>;
/// # fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_logger: Arc<MyLogger>, my_peer_manager: Arc<MyPeerManager>, my_liquidity_manager: Arc<MyLiquidityManager>) {
/// let process_msgs_pm = Arc::clone(&my_peer_manager);
/// let process_msgs_callback = move || process_msgs_pm.process_events();
///
/// my_liquidity_manager.set_process_msgs_callback(process_msgs_callback);
/// # }
/// ```
///
/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
#[cfg(not(feature = "std"))]
pub fn set_process_msgs_callback(&self, callback: impl Fn() + 'static) {
self.pending_messages.set_process_msgs_callback(callback)
pub fn set_process_msgs_callback<F: 'static + ProcessMessagesCallback>(&self, callback: F) {
self.pending_messages.set_process_msgs_callback(Box::new(callback));
}

/// Blocks the current thread until next event is ready and returns it.
Expand Down
38 changes: 25 additions & 13 deletions lightning-liquidity/src/message_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ use bitcoin::secp256k1::PublicKey;
/// [`LiquidityManager`]: crate::LiquidityManager
pub struct MessageQueue {
queue: Mutex<VecDeque<(PublicKey, LSPSMessage)>>,
#[cfg(feature = "std")]
process_msgs_callback: RwLock<Option<Box<dyn Fn() + Send + Sync + 'static>>>,
#[cfg(not(feature = "std"))]
process_msgs_callback: RwLock<Option<Box<dyn Fn() + 'static>>>,
process_msgs_callback: RwLock<Option<Box<dyn ProcessMessagesCallback>>>,
}

impl MessageQueue {
Expand All @@ -24,14 +21,8 @@ impl MessageQueue {
Self { queue, process_msgs_callback }
}

#[cfg(feature = "std")]
pub(crate) fn set_process_msgs_callback(&self, callback: impl Fn() + Send + Sync + 'static) {
*self.process_msgs_callback.write().unwrap() = Some(Box::new(callback));
}

#[cfg(not(feature = "std"))]
pub(crate) fn set_process_msgs_callback(&self, callback: impl Fn() + 'static) {
*self.process_msgs_callback.write().unwrap() = Some(Box::new(callback));
pub(crate) fn set_process_msgs_callback(&self, callback: Box<dyn ProcessMessagesCallback>) {
*self.process_msgs_callback.write().unwrap() = Some(callback);
}

pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> {
Expand All @@ -45,7 +36,28 @@ impl MessageQueue {
}

if let Some(process_msgs_callback) = self.process_msgs_callback.read().unwrap().as_ref() {
(process_msgs_callback)()
process_msgs_callback.call()
}
}
}

macro_rules! define_callback { ($($bounds: path),*) => {
/// A callback which will be called to trigger network message processing.
///
/// Usually, this should call [`PeerManager::process_events`].
///
/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
pub trait ProcessMessagesCallback : $($bounds +)* {
/// The method which is called.
fn call(&self);
}

impl<F: Fn() $(+ $bounds)*> ProcessMessagesCallback for F {
fn call(&self) { (self)(); }
}
} }

#[cfg(feature = "std")]
define_callback!(Send, Sync);
#[cfg(not(feature = "std"))]
define_callback!();
Loading