diff --git a/crates/nostr-relay-pool/CHANGELOG.md b/crates/nostr-relay-pool/CHANGELOG.md index 716d3f83e..f537bfe66 100644 --- a/crates/nostr-relay-pool/CHANGELOG.md +++ b/crates/nostr-relay-pool/CHANGELOG.md @@ -23,6 +23,13 @@ --> +## Unreleased + +### Breaking changes + +- Change `Relay::send_msg` and `Relay::batch_msg` signatures (https://github.com/rust-nostr/nostr/pull/1124) +- Change `RelayPool::send_msg_to` and `RelayPool::batch_msg_to` signatures (https://github.com/rust-nostr/nostr/pull/1124) + ## v0.44.0 - 2025/11/06 ### Breaking changes diff --git a/crates/nostr-relay-pool/src/pool/mod.rs b/crates/nostr-relay-pool/src/pool/mod.rs index cfbb44a63..e5c8f5ff4 100644 --- a/crates/nostr-relay-pool/src/pool/mod.rs +++ b/crates/nostr-relay-pool/src/pool/mod.rs @@ -30,7 +30,7 @@ pub use self::options::RelayPoolOptions; pub use self::output::Output; use crate::monitor::Monitor; use crate::relay::flags::FlagCheck; -use crate::relay::options::{RelayOptions, ReqExitPolicy, SyncOptions}; +use crate::relay::options::{RelayOptions, ReqExitPolicy, SendMessageOptions, SyncOptions}; use crate::relay::Relay; use crate::shared::SharedState; use crate::stream::{BoxedStream, ReceiverStream}; @@ -653,13 +653,14 @@ impl RelayPool { &self, urls: I, msg: ClientMessage<'_>, + opts: SendMessageOptions, ) -> Result, Error> where I: IntoIterator, U: TryIntoUrl, Error: From<::Err>, { - self.batch_msg_to(urls, vec![msg]).await + self.batch_msg_to(urls, &[msg], opts).await } /// Send multiple client messages at once to specific relays @@ -668,7 +669,8 @@ impl RelayPool { pub async fn batch_msg_to( &self, urls: I, - msgs: Vec>, + msgs: &[ClientMessage<'_>], + opts: SendMessageOptions, ) -> Result, Error> where I: IntoIterator, @@ -705,12 +707,23 @@ impl RelayPool { } } + let mut urls: Vec = Vec::with_capacity(set.len()); + let mut futures = Vec::with_capacity(set.len()); let mut output: Output<()> = Output::default(); - // Batch messages and construct outputs + // Compose futures for url in set.into_iter() { let relay: &Relay = self.internal_relay(&relays, &url)?; - match relay.batch_msg(msgs.clone()) { + urls.push(url); + futures.push(relay.batch_msg(msgs, opts)); + } + + // Join futures + let list = future::join_all(futures).await; + + // Iter results and construct output + for (url, result) in urls.into_iter().zip(list.into_iter()) { + match result { Ok(..) => { // Success, insert relay url in 'success' set result output.success.insert(url); diff --git a/crates/nostr-relay-pool/src/relay/constants.rs b/crates/nostr-relay-pool/src/relay/constants.rs index 21d3d5bc0..ce3503b11 100644 --- a/crates/nostr-relay-pool/src/relay/constants.rs +++ b/crates/nostr-relay-pool/src/relay/constants.rs @@ -9,6 +9,7 @@ use core::time::Duration; pub(super) const WAIT_FOR_OK_TIMEOUT: Duration = Duration::from_secs(10); pub(super) const WAIT_FOR_AUTHENTICATION_TIMEOUT: Duration = Duration::from_secs(7); +pub(super) const WAIT_FOR_MSG_SEND_CONFIRMATION_TIMEOUT: Duration = Duration::from_secs(10); pub(super) const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60); /// Relay default notification channel size diff --git a/crates/nostr-relay-pool/src/relay/error.rs b/crates/nostr-relay-pool/src/relay/error.rs index ea9c13a77..d214ab60f 100644 --- a/crates/nostr-relay-pool/src/relay/error.rs +++ b/crates/nostr-relay-pool/src/relay/error.rs @@ -34,6 +34,8 @@ pub enum Error { Negentropy(negentropy::Error), /// Database error Database(DatabaseError), + /// Can't receive send confirmation + CantReceiveSendConfirmation, /// Generic timeout Timeout, /// Not replied to ping @@ -139,6 +141,7 @@ impl fmt::Display for Error { Self::Hex(e) => e.fmt(f), Self::Negentropy(e) => e.fmt(f), Self::Database(e) => e.fmt(f), + Self::CantReceiveSendConfirmation => f.write_str("can't receive send confirmation"), Self::Timeout => f.write_str("timeout"), Self::NotRepliedToPing => f.write_str("not replied to ping"), Self::CantParsePong => f.write_str("can't parse pong"), diff --git a/crates/nostr-relay-pool/src/relay/inner.rs b/crates/nostr-relay-pool/src/relay/inner.rs index 72c6a66b6..53af6445d 100644 --- a/crates/nostr-relay-pool/src/relay/inner.rs +++ b/crates/nostr-relay-pool/src/relay/inner.rs @@ -17,16 +17,19 @@ use negentropy::{Id, Negentropy, NegentropyStorageVector}; use nostr::secp256k1::rand::{self, Rng}; use nostr_database::prelude::*; use tokio::sync::mpsc::{self, Receiver, Sender}; -use tokio::sync::{broadcast, Mutex, MutexGuard, Notify, RwLock, RwLockWriteGuard}; +use tokio::sync::{broadcast, oneshot, Mutex, MutexGuard, Notify, RwLock, RwLockWriteGuard}; use super::constants::{ DEFAULT_CONNECTION_TIMEOUT, JITTER_RANGE, MAX_RETRY_INTERVAL, MIN_ATTEMPTS, MIN_SUCCESS_RATE, NEGENTROPY_BATCH_SIZE_DOWN, NEGENTROPY_FRAME_SIZE_LIMIT, NEGENTROPY_HIGH_WATER_UP, - NEGENTROPY_LOW_WATER_UP, PING_INTERVAL, SLEEP_INTERVAL, WAIT_FOR_OK_TIMEOUT, - WEBSOCKET_TX_TIMEOUT, + NEGENTROPY_LOW_WATER_UP, PING_INTERVAL, SLEEP_INTERVAL, WAIT_FOR_MSG_SEND_CONFIRMATION_TIMEOUT, + WAIT_FOR_OK_TIMEOUT, WEBSOCKET_TX_TIMEOUT, }; use super::flags::AtomicRelayServiceFlags; -use super::options::{RelayOptions, ReqExitPolicy, SubscribeAutoCloseOptions, SyncOptions}; +use super::options::{ + RelayOptions, ReqExitPolicy, SendMessageOptions, SendMessagePolicy, SubscribeAutoCloseOptions, + SyncOptions, +}; use super::ping::PingTracker; use super::stats::RelayConnectionStats; use super::{ @@ -58,11 +61,16 @@ struct HandleAutoClosing { reason: Option, } +struct SendMessageRequest { + msgs: Vec, + confirmation: Option>, +} + #[derive(Debug)] struct RelayChannels { nostr: ( - Sender>, - Mutex>>, + Sender, + Mutex>, ), ping: Notify, terminate: Notify, @@ -79,21 +87,28 @@ impl RelayChannels { } } - pub fn send_client_msgs(&self, msgs: Vec) -> Result<(), Error> { + fn send_client_msgs( + &self, + msgs: &[ClientMessage], + confirmation: Option>, + ) -> Result<(), Error> { // Serialize messages to JSON - let msgs: Vec = msgs.into_iter().map(|msg| msg.as_json()).collect(); + let msgs: Vec = msgs.iter().map(|msg| msg.as_json()).collect(); + + // Build request + let req: SendMessageRequest = SendMessageRequest { msgs, confirmation }; // Send self.nostr .0 - .try_send(msgs) + .try_send(req) .map_err(|_| Error::CantSendChannelMessage { channel: String::from("nostr"), }) } #[inline] - pub async fn rx_nostr(&self) -> MutexGuard<'_, Receiver>> { + async fn rx_nostr(&self) -> MutexGuard<'_, Receiver> { self.nostr.1.lock().await } @@ -700,7 +715,7 @@ impl InnerRelay { async fn connect_and_run( &self, stream: Option<(WebSocketSink, WebSocketStream)>, - rx_nostr: &mut MutexGuard<'_, Receiver>>, + rx_nostr: &mut MutexGuard<'_, Receiver>, last_ws_error: &mut Option, ) { match stream { @@ -743,7 +758,7 @@ impl InnerRelay { &self, mut ws_tx: WebSocketSink, ws_rx: WebSocketStream, - rx_nostr: &mut MutexGuard<'_, Receiver>>, + rx_nostr: &mut MutexGuard<'_, Receiver>, ) { // (Re)subscribe to relay if self.flags.can_read() { @@ -792,7 +807,7 @@ impl InnerRelay { async fn sender_message_handler( &self, ws_tx: &mut WebSocketSink, - rx_nostr: &mut MutexGuard<'_, Receiver>>, + rx_nostr: &mut MutexGuard<'_, Receiver>, ping: &PingTracker, ) -> Result<(), Error> { #[cfg(target_arch = "wasm32")] @@ -801,7 +816,7 @@ impl InnerRelay { loop { tokio::select! { // Nostr channel receiver - Some(msgs) = rx_nostr.recv() => { + Some(SendMessageRequest { msgs, confirmation }) = rx_nostr.recv() => { // Compose WebSocket text messages let msgs: Vec = msgs .into_iter() @@ -823,6 +838,14 @@ impl InnerRelay { // Send WebSocket messages send_ws_msgs(ws_tx, msgs).await?; + // Send confirmation that messages has been sent + if let Some(confirmation) = confirmation { + match confirmation.send(()) { + Ok(()) => tracing::trace!("Message confirmation sent."), + Err(_) => tracing::error!("Can't send message confirmation."), + } + } + // Increase sent bytes self.stats.add_bytes_sent(size); } @@ -1304,11 +1327,19 @@ impl InnerRelay { } #[inline] - pub fn send_msg(&self, msg: ClientMessage<'_>) -> Result<(), Error> { - self.batch_msg(vec![msg]) + pub async fn send_msg( + &self, + msg: ClientMessage<'_>, + opts: SendMessageOptions, + ) -> Result<(), Error> { + self.batch_msg(&[msg], opts).await } - pub fn batch_msg(&self, msgs: Vec>) -> Result<(), Error> { + pub async fn batch_msg( + &self, + msgs: &[ClientMessage<'_>], + opts: SendMessageOptions, + ) -> Result<(), Error> { // Check if relay is operational self.ensure_operational()?; @@ -1327,21 +1358,46 @@ impl InnerRelay { return Err(Error::ReadDisabled); } - // Send messages - self.atomic.channels.send_client_msgs(msgs) + // Handle send policy + match opts.policy { + SendMessagePolicy::DontWait => { + // Send messages + self.atomic.channels.send_client_msgs(msgs, None) + } + SendMessagePolicy::WaitForSendConfirmation { timeout } => { + let (tx, rx) = oneshot::channel(); + + // Send messages + self.atomic.channels.send_client_msgs(msgs, Some(tx))?; + + // Wait for confirmation and propagate error, if any + time::timeout(Some(timeout), rx) + .await + .ok_or(Error::Timeout)? + .map_err(|_| Error::CantReceiveSendConfirmation) + } + } } - fn send_neg_msg(&self, id: &SubscriptionId, message: &str) -> Result<(), Error> { - self.send_msg(ClientMessage::NegMsg { - subscription_id: Cow::Borrowed(id), - message: Cow::Borrowed(message), - }) + async fn send_neg_msg(&self, id: &SubscriptionId, message: &str) -> Result<(), Error> { + self.send_msg( + ClientMessage::NegMsg { + subscription_id: Cow::Borrowed(id), + message: Cow::Borrowed(message), + }, + SendMessageOptions::default(), + ) + .await } - fn send_neg_close(&self, id: &SubscriptionId) -> Result<(), Error> { - self.send_msg(ClientMessage::NegClose { - subscription_id: Cow::Borrowed(id), - }) + async fn send_neg_close(&self, id: &SubscriptionId) -> Result<(), Error> { + self.send_msg( + ClientMessage::NegClose { + subscription_id: Cow::Borrowed(id), + }, + SendMessageOptions::default(), + ) + .await } async fn auth(&self, challenge: String) -> Result<(), Error> { @@ -1357,7 +1413,11 @@ impl InnerRelay { let mut notifications = self.internal_notification_sender.subscribe(); // Send the AUTH message - self.send_msg(ClientMessage::Auth(Cow::Borrowed(&event)))?; + self.send_msg( + ClientMessage::Auth(Cow::Borrowed(&event)), + SendMessageOptions::default(), + ) + .await?; // Wait for OK // The event ID is already checked in `wait_for_ok` method @@ -1416,7 +1476,11 @@ impl InnerRelay { let subscriptions = self.subscriptions().await; for (id, filters) in subscriptions.into_iter() { if !filters.is_empty() && self.should_resubscribe(&id).await { - self.send_msg(ClientMessage::req(id, filters))?; + self.send_msg( + ClientMessage::req(id, filters), + SendMessageOptions::default(), + ) + .await?; } else { tracing::debug!("Skip re-subscription of '{id}'"); } @@ -1464,7 +1528,13 @@ impl InnerRelay { // Close subscription let send_result = if to_close { tracing::debug!(id = %id, "Auto-closing subscription."); - relay.send_msg(ClientMessage::Close(Cow::Borrowed(&id))) + let msg: ClientMessage<'_> = ClientMessage::Close(Cow::Borrowed(&id)); + let opts: SendMessageOptions = SendMessageOptions::default().policy( + SendMessagePolicy::WaitForSendConfirmation { + timeout: WAIT_FOR_MSG_SEND_CONFIRMATION_TIMEOUT, + }, + ); + relay.send_msg(msg, opts).await } else { Ok(()) }; @@ -1620,7 +1690,7 @@ impl InnerRelay { subscription_id: Cow::Borrowed(id), filters: filters.iter().map(Cow::Borrowed).collect(), }; - let _ = self.send_msg(msg); + let _ = self.send_msg(msg, SendMessageOptions::default()).await; } } RelayNotification::AuthenticationFailed => { @@ -1695,10 +1765,10 @@ impl InnerRelay { .await? } - fn _unsubscribe_long_lived_subscription( + async fn _unsubscribe_long_lived_subscription( &self, - subscriptions: &mut RwLockWriteGuard>, - id: Cow, + subscriptions: &mut RwLockWriteGuard<'_, HashMap>, + id: Cow<'_, SubscriptionId>, ) -> Result<(), Error> { // Remove the subscription from the map if let Some(sub) = subscriptions.remove(&id) { @@ -1710,12 +1780,14 @@ impl InnerRelay { } // Send CLOSE message - self.send_msg(ClientMessage::Close(id)) + self.send_msg(ClientMessage::Close(id), SendMessageOptions::default()) + .await } pub async fn unsubscribe(&self, id: &SubscriptionId) -> Result<(), Error> { let mut subscriptions = self.atomic.subscriptions.write().await; self._unsubscribe_long_lived_subscription(&mut subscriptions, Cow::Borrowed(id)) + .await } pub async fn unsubscribe_all(&self) -> Result<(), Error> { @@ -1726,14 +1798,15 @@ impl InnerRelay { // Unsubscribe for id in ids.into_iter() { - self._unsubscribe_long_lived_subscription(&mut subscriptions, Cow::Owned(id))?; + self._unsubscribe_long_lived_subscription(&mut subscriptions, Cow::Owned(id)) + .await?; } Ok(()) } #[inline(never)] - fn handle_neg_msg( + async fn handle_neg_msg( &self, subscription_id: &SubscriptionId, msg: Option>, @@ -1775,13 +1848,16 @@ impl InnerRelay { } match msg { - Some(query) => self.send_neg_msg(subscription_id, &hex::encode(query)), + Some(query) => { + self.send_neg_msg(subscription_id, &hex::encode(query)) + .await + } None => { // Mark sync as done *sync_done = true; // Send NEG-CLOSE message - self.send_neg_close(subscription_id) + self.send_neg_close(subscription_id).await } } } @@ -1805,7 +1881,8 @@ impl InnerRelay { match self.state.database().event_by_id(&id).await { Ok(Some(event)) => { in_flight_up.insert(id); - self.send_msg(ClientMessage::event(event))?; + self.send_msg(ClientMessage::event(event), SendMessageOptions::default()) + .await?; num_sent += 1; } Ok(None) => { @@ -1885,8 +1962,14 @@ impl InnerRelay { self.add_auto_closing_subscription(down_sub_id.clone(), vec![filter.clone()]) .await; + // Construct send msg options + let opts: SendMessageOptions = + SendMessageOptions::default().policy(SendMessagePolicy::WaitForSendConfirmation { + timeout: WAIT_FOR_MSG_SEND_CONFIRMATION_TIMEOUT, + }); + // Send msg - if let Err(e) = self.send_msg(msg) { + if let Err(e) = self.send_msg(msg, opts).await { // Remove previously added subscription self.remove_subscription(down_sub_id).await; @@ -1960,7 +2043,8 @@ impl InnerRelay { id_size: None, initial_message: Cow::Owned(hex::encode(initial_message)), }; - self.send_msg(open_msg)?; + self.send_msg(open_msg, SendMessageOptions::default()) + .await?; // Check if negentropy is supported check_negentropy_support(&sub_id, opts, &mut temp_notifications).await?; @@ -2006,7 +2090,8 @@ impl InnerRelay { &mut have_ids, &mut need_ids, &mut sync_done, - )?; + ) + .await?; } } RelayMessage::NegErr { @@ -2046,7 +2131,11 @@ impl InnerRelay { self.remove_subscription(&down_sub_id).await; // Close subscription - self.send_msg(ClientMessage::Close(Cow::Borrowed(&down_sub_id)))?; + self.send_msg( + ClientMessage::Close(Cow::Borrowed(&down_sub_id)), + SendMessageOptions::default(), + ) + .await?; } } RelayMessage::Closed { diff --git a/crates/nostr-relay-pool/src/relay/mod.rs b/crates/nostr-relay-pool/src/relay/mod.rs index ca790f58a..238699328 100644 --- a/crates/nostr-relay-pool/src/relay/mod.rs +++ b/crates/nostr-relay-pool/src/relay/mod.rs @@ -35,8 +35,8 @@ pub use self::flags::{AtomicRelayServiceFlags, FlagCheck, RelayServiceFlags}; use self::inner::InnerRelay; pub use self::limits::RelayLimits; pub use self::options::{ - RelayOptions, ReqExitPolicy, SubscribeAutoCloseOptions, SubscribeOptions, SyncDirection, - SyncOptions, SyncProgress, + RelayOptions, ReqExitPolicy, SendMessageOptions, SendMessagePolicy, SubscribeAutoCloseOptions, + SubscribeOptions, SyncDirection, SyncOptions, SyncProgress, }; pub use self::stats::RelayConnectionStats; pub use self::status::RelayStatus; @@ -405,14 +405,22 @@ impl Relay { /// Send msg to relay #[inline] - pub fn send_msg(&self, msg: ClientMessage<'_>) -> Result<(), Error> { - self.inner.send_msg(msg) + pub async fn send_msg( + &self, + msg: ClientMessage<'_>, + opts: SendMessageOptions, + ) -> Result<(), Error> { + self.inner.send_msg(msg, opts).await } /// Send multiple [`ClientMessage`] at once #[inline] - pub fn batch_msg(&self, msgs: Vec>) -> Result<(), Error> { - self.inner.batch_msg(msgs) + pub async fn batch_msg( + &self, + msgs: &[ClientMessage<'_>], + opts: SendMessageOptions, + ) -> Result<(), Error> { + self.inner.batch_msg(msgs, opts).await } async fn _send_event( @@ -422,7 +430,11 @@ impl Relay { ) -> Result<(bool, String), Error> { // Send the EVENT message self.inner - .send_msg(ClientMessage::Event(Cow::Borrowed(event)))?; + .send_msg( + ClientMessage::Event(Cow::Borrowed(event)), + SendMessageOptions::default(), + ) + .await?; // Wait for OK self.inner @@ -572,7 +584,11 @@ impl Relay { .await; // Send REQ message - if let Err(e) = self.inner.send_msg(msg) { + if let Err(e) = self + .inner + .send_msg(msg, SendMessageOptions::default()) + .await + { // Remove previously added subscription self.inner.remove_subscription(&id).await; @@ -600,7 +616,9 @@ impl Relay { }; // Send REQ message - self.inner.send_msg(msg)?; + self.inner + .send_msg(msg, SendMessageOptions::default()) + .await?; // No auto-close subscription: update subscription filter self.inner.update_subscription(id, filters, true).await; @@ -688,7 +706,9 @@ impl Relay { subscription_id: Cow::Borrowed(&id), filter: Cow::Owned(filter), }; - self.inner.send_msg(msg)?; + self.inner + .send_msg(msg, SendMessageOptions::default()) + .await?; let mut count = 0; @@ -714,7 +734,9 @@ impl Relay { .ok_or(Error::Timeout)?; // Unsubscribe - self.inner.send_msg(ClientMessage::close(id))?; + self.inner + .send_msg(ClientMessage::close(id), SendMessageOptions::default()) + .await?; Ok(count) } diff --git a/crates/nostr-relay-pool/src/relay/options.rs b/crates/nostr-relay-pool/src/relay/options.rs index bb643f39c..c2655f3e5 100644 --- a/crates/nostr-relay-pool/src/relay/options.rs +++ b/crates/nostr-relay-pool/src/relay/options.rs @@ -166,6 +166,34 @@ impl RelayOptions { } } +/// Send message policy +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum SendMessagePolicy { + /// Don't wait for the send confirmation. + #[default] + DontWait, + /// Wait for send confirmation. + WaitForSendConfirmation { + /// Timeout + timeout: Duration, + }, +} + +/// Send message options +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct SendMessageOptions { + /// Policy to use when sending messages. + pub policy: SendMessagePolicy, +} + +impl SendMessageOptions { + /// Set policy + pub const fn policy(mut self, policy: SendMessagePolicy) -> Self { + self.policy = policy; + self + } +} + /// Auto-closing subscribe options #[derive(Debug, Clone, Copy, Default)] pub struct SubscribeAutoCloseOptions { diff --git a/crates/nostr-sdk/CHANGELOG.md b/crates/nostr-sdk/CHANGELOG.md index fab419b22..f3bf35311 100644 --- a/crates/nostr-sdk/CHANGELOG.md +++ b/crates/nostr-sdk/CHANGELOG.md @@ -23,6 +23,12 @@ --> +## Unreleased + +### Breaking changes + +- Change `Client::batch_msg_to` signature (https://github.com/rust-nostr/nostr/pull/1124) + ## v0.44.0 - 2025/11/06 ### Breaking changes diff --git a/crates/nostr-sdk/src/client/mod.rs b/crates/nostr-sdk/src/client/mod.rs index 45f7626dc..65e5fda5b 100644 --- a/crates/nostr-sdk/src/client/mod.rs +++ b/crates/nostr-sdk/src/client/mod.rs @@ -927,7 +927,10 @@ impl Client { U: TryIntoUrl, pool::Error: From<::Err>, { - Ok(self.pool.send_msg_to(urls, msg).await?) + Ok(self + .pool + .send_msg_to(urls, msg, SendMessageOptions::default()) + .await?) } /// Batch send client messages to **specific relays** @@ -935,14 +938,17 @@ impl Client { pub async fn batch_msg_to( &self, urls: I, - msgs: Vec>, + msgs: &[ClientMessage<'_>], ) -> Result, Error> where I: IntoIterator, U: TryIntoUrl, pool::Error: From<::Err>, { - Ok(self.pool.batch_msg_to(urls, msgs).await?) + Ok(self + .pool + .batch_msg_to(urls, msgs, SendMessageOptions::default()) + .await?) } /// Send the event to relays