Skip to content

Commit 659f78a

Browse files
committed
relay-pool: add SendMessageOptions to Relay and RelayPool message sending APIs
Introduce `SendMessageOptions` and `SendMessagePolicy` to control message sending behaviors, enhancing flexibility and enabling policies like waiting for a send confirmation. Signed-off-by: Yuki Kishimoto <[email protected]>
1 parent 0d1b841 commit 659f78a

File tree

6 files changed

+157
-50
lines changed

6 files changed

+157
-50
lines changed

crates/nostr-relay-pool/src/pool/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub use self::options::RelayPoolOptions;
3030
pub use self::output::Output;
3131
use crate::monitor::Monitor;
3232
use crate::relay::flags::FlagCheck;
33-
use crate::relay::options::{RelayOptions, ReqExitPolicy, SyncOptions};
33+
use crate::relay::options::{RelayOptions, ReqExitPolicy, SendMessageOptions, SyncOptions};
3434
use crate::relay::Relay;
3535
use crate::shared::SharedState;
3636
use crate::stream::{BoxedStream, ReceiverStream};
@@ -653,13 +653,14 @@ impl RelayPool {
653653
&self,
654654
urls: I,
655655
msg: ClientMessage<'_>,
656+
opts: SendMessageOptions,
656657
) -> Result<Output<()>, Error>
657658
where
658659
I: IntoIterator<Item = U>,
659660
U: TryIntoUrl,
660661
Error: From<<U as TryIntoUrl>::Err>,
661662
{
662-
self.batch_msg_to(urls, &[msg]).await
663+
self.batch_msg_to(urls, &[msg], opts).await
663664
}
664665

665666
/// Send multiple client messages at once to specific relays
@@ -669,6 +670,7 @@ impl RelayPool {
669670
&self,
670671
urls: I,
671672
msgs: &[ClientMessage<'_>],
673+
opts: SendMessageOptions,
672674
) -> Result<Output<()>, Error>
673675
where
674676
I: IntoIterator<Item = U>,
@@ -713,7 +715,7 @@ impl RelayPool {
713715
for url in set.into_iter() {
714716
let relay: &Relay = self.internal_relay(&relays, &url)?;
715717
urls.push(url);
716-
futures.push(relay.batch_msg(msgs));
718+
futures.push(relay.batch_msg(msgs, opts));
717719
}
718720

719721
// Join futures

crates/nostr-relay-pool/src/relay/constants.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use core::time::Duration;
99

1010
pub(super) const WAIT_FOR_OK_TIMEOUT: Duration = Duration::from_secs(10);
1111
pub(super) const WAIT_FOR_AUTHENTICATION_TIMEOUT: Duration = Duration::from_secs(7);
12+
pub(super) const WAIT_FOR_MSG_SEND_CONFIRMATION_TIMEOUT: Duration = Duration::from_secs(10);
1213
pub(super) const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
1314

1415
/// Relay default notification channel size

crates/nostr-relay-pool/src/relay/inner.rs

Lines changed: 83 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@ use tokio::sync::{broadcast, oneshot, Mutex, MutexGuard, Notify, RwLock, RwLockW
2222
use super::constants::{
2323
DEFAULT_CONNECTION_TIMEOUT, JITTER_RANGE, MAX_RETRY_INTERVAL, MIN_ATTEMPTS, MIN_SUCCESS_RATE,
2424
NEGENTROPY_BATCH_SIZE_DOWN, NEGENTROPY_FRAME_SIZE_LIMIT, NEGENTROPY_HIGH_WATER_UP,
25-
NEGENTROPY_LOW_WATER_UP, PING_INTERVAL, SLEEP_INTERVAL, WAIT_FOR_OK_TIMEOUT,
26-
WEBSOCKET_TX_TIMEOUT,
25+
NEGENTROPY_LOW_WATER_UP, PING_INTERVAL, SLEEP_INTERVAL, WAIT_FOR_MSG_SEND_CONFIRMATION_TIMEOUT,
26+
WAIT_FOR_OK_TIMEOUT, WEBSOCKET_TX_TIMEOUT,
2727
};
2828
use super::flags::AtomicRelayServiceFlags;
29-
use super::options::{RelayOptions, ReqExitPolicy, SubscribeAutoCloseOptions, SyncOptions};
29+
use super::options::{
30+
RelayOptions, ReqExitPolicy, SendMessageOptions, SendMessagePolicy, SubscribeAutoCloseOptions,
31+
SyncOptions,
32+
};
3033
use super::ping::PingTracker;
3134
use super::stats::RelayConnectionStats;
3235
use super::{
@@ -1324,11 +1327,19 @@ impl InnerRelay {
13241327
}
13251328

13261329
#[inline]
1327-
pub async fn send_msg(&self, msg: ClientMessage<'_>) -> Result<(), Error> {
1328-
self.batch_msg(&[msg]).await
1330+
pub async fn send_msg(
1331+
&self,
1332+
msg: ClientMessage<'_>,
1333+
opts: SendMessageOptions,
1334+
) -> Result<(), Error> {
1335+
self.batch_msg(&[msg], opts).await
13291336
}
13301337

1331-
pub async fn batch_msg(&self, msgs: &[ClientMessage<'_>]) -> Result<(), Error> {
1338+
pub async fn batch_msg(
1339+
&self,
1340+
msgs: &[ClientMessage<'_>],
1341+
opts: SendMessageOptions,
1342+
) -> Result<(), Error> {
13321343
// Check if relay is operational
13331344
self.ensure_operational()?;
13341345

@@ -1347,30 +1358,45 @@ impl InnerRelay {
13471358
return Err(Error::ReadDisabled);
13481359
}
13491360

1350-
let (tx, rx) = oneshot::channel();
1361+
// Handle send policy
1362+
match opts.policy {
1363+
SendMessagePolicy::DontWait => {
1364+
// Send messages
1365+
self.atomic.channels.send_client_msgs(msgs, None)
1366+
}
1367+
SendMessagePolicy::WaitForSendConfirmation { timeout } => {
1368+
let (tx, rx) = oneshot::channel();
13511369

1352-
// Send messages
1353-
self.atomic.channels.send_client_msgs(msgs, Some(tx))?;
1370+
// Send messages
1371+
self.atomic.channels.send_client_msgs(msgs, Some(tx))?;
13541372

1355-
// Wait for confirmation and propagate error, if any
1356-
time::timeout(Some(Duration::from_secs(10)), rx)
1357-
.await
1358-
.ok_or(Error::Timeout)?
1359-
.map_err(|_| Error::CantReceiveSendConfirmation)
1373+
// Wait for confirmation and propagate error, if any
1374+
time::timeout(Some(timeout), rx)
1375+
.await
1376+
.ok_or(Error::Timeout)?
1377+
.map_err(|_| Error::CantReceiveSendConfirmation)
1378+
}
1379+
}
13601380
}
13611381

13621382
async fn send_neg_msg(&self, id: &SubscriptionId, message: &str) -> Result<(), Error> {
1363-
self.send_msg(ClientMessage::NegMsg {
1364-
subscription_id: Cow::Borrowed(id),
1365-
message: Cow::Borrowed(message),
1366-
})
1383+
self.send_msg(
1384+
ClientMessage::NegMsg {
1385+
subscription_id: Cow::Borrowed(id),
1386+
message: Cow::Borrowed(message),
1387+
},
1388+
SendMessageOptions::default(),
1389+
)
13671390
.await
13681391
}
13691392

13701393
async fn send_neg_close(&self, id: &SubscriptionId) -> Result<(), Error> {
1371-
self.send_msg(ClientMessage::NegClose {
1372-
subscription_id: Cow::Borrowed(id),
1373-
})
1394+
self.send_msg(
1395+
ClientMessage::NegClose {
1396+
subscription_id: Cow::Borrowed(id),
1397+
},
1398+
SendMessageOptions::default(),
1399+
)
13741400
.await
13751401
}
13761402

@@ -1387,8 +1413,11 @@ impl InnerRelay {
13871413
let mut notifications = self.internal_notification_sender.subscribe();
13881414

13891415
// Send the AUTH message
1390-
self.send_msg(ClientMessage::Auth(Cow::Borrowed(&event)))
1391-
.await?;
1416+
self.send_msg(
1417+
ClientMessage::Auth(Cow::Borrowed(&event)),
1418+
SendMessageOptions::default(),
1419+
)
1420+
.await?;
13921421

13931422
// Wait for OK
13941423
// The event ID is already checked in `wait_for_ok` method
@@ -1447,7 +1476,11 @@ impl InnerRelay {
14471476
let subscriptions = self.subscriptions().await;
14481477
for (id, filters) in subscriptions.into_iter() {
14491478
if !filters.is_empty() && self.should_resubscribe(&id).await {
1450-
self.send_msg(ClientMessage::req(id, filters)).await?;
1479+
self.send_msg(
1480+
ClientMessage::req(id, filters),
1481+
SendMessageOptions::default(),
1482+
)
1483+
.await?;
14511484
} else {
14521485
tracing::debug!("Skip re-subscription of '{id}'");
14531486
}
@@ -1495,9 +1528,13 @@ impl InnerRelay {
14951528
// Close subscription
14961529
let send_result = if to_close {
14971530
tracing::debug!(id = %id, "Auto-closing subscription.");
1498-
relay
1499-
.send_msg(ClientMessage::Close(Cow::Borrowed(&id)))
1500-
.await
1531+
let msg: ClientMessage<'_> = ClientMessage::Close(Cow::Borrowed(&id));
1532+
let opts: SendMessageOptions = SendMessageOptions::default().policy(
1533+
SendMessagePolicy::WaitForSendConfirmation {
1534+
timeout: WAIT_FOR_MSG_SEND_CONFIRMATION_TIMEOUT,
1535+
},
1536+
);
1537+
relay.send_msg(msg, opts).await
15011538
} else {
15021539
Ok(())
15031540
};
@@ -1653,7 +1690,7 @@ impl InnerRelay {
16531690
subscription_id: Cow::Borrowed(id),
16541691
filters: filters.iter().map(Cow::Borrowed).collect(),
16551692
};
1656-
let _ = self.send_msg(msg).await;
1693+
let _ = self.send_msg(msg, SendMessageOptions::default()).await;
16571694
}
16581695
}
16591696
RelayNotification::AuthenticationFailed => {
@@ -1743,7 +1780,8 @@ impl InnerRelay {
17431780
}
17441781

17451782
// Send CLOSE message
1746-
self.send_msg(ClientMessage::Close(id)).await
1783+
self.send_msg(ClientMessage::Close(id), SendMessageOptions::default())
1784+
.await
17471785
}
17481786

17491787
pub async fn unsubscribe(&self, id: &SubscriptionId) -> Result<(), Error> {
@@ -1843,7 +1881,8 @@ impl InnerRelay {
18431881
match self.state.database().event_by_id(&id).await {
18441882
Ok(Some(event)) => {
18451883
in_flight_up.insert(id);
1846-
self.send_msg(ClientMessage::event(event)).await?;
1884+
self.send_msg(ClientMessage::event(event), SendMessageOptions::default())
1885+
.await?;
18471886
num_sent += 1;
18481887
}
18491888
Ok(None) => {
@@ -1923,8 +1962,14 @@ impl InnerRelay {
19231962
self.add_auto_closing_subscription(down_sub_id.clone(), vec![filter.clone()])
19241963
.await;
19251964

1965+
// Construct send msg options
1966+
let opts: SendMessageOptions =
1967+
SendMessageOptions::default().policy(SendMessagePolicy::WaitForSendConfirmation {
1968+
timeout: WAIT_FOR_MSG_SEND_CONFIRMATION_TIMEOUT,
1969+
});
1970+
19261971
// Send msg
1927-
if let Err(e) = self.send_msg(msg).await {
1972+
if let Err(e) = self.send_msg(msg, opts).await {
19281973
// Remove previously added subscription
19291974
self.remove_subscription(down_sub_id).await;
19301975

@@ -1998,7 +2043,8 @@ impl InnerRelay {
19982043
id_size: None,
19992044
initial_message: Cow::Owned(hex::encode(initial_message)),
20002045
};
2001-
self.send_msg(open_msg).await?;
2046+
self.send_msg(open_msg, SendMessageOptions::default())
2047+
.await?;
20022048

20032049
// Check if negentropy is supported
20042050
check_negentropy_support(&sub_id, opts, &mut temp_notifications).await?;
@@ -2085,8 +2131,11 @@ impl InnerRelay {
20852131
self.remove_subscription(&down_sub_id).await;
20862132

20872133
// Close subscription
2088-
self.send_msg(ClientMessage::Close(Cow::Borrowed(&down_sub_id)))
2089-
.await?;
2134+
self.send_msg(
2135+
ClientMessage::Close(Cow::Borrowed(&down_sub_id)),
2136+
SendMessageOptions::default(),
2137+
)
2138+
.await?;
20902139
}
20912140
}
20922141
RelayMessage::Closed {

crates/nostr-relay-pool/src/relay/mod.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ pub use self::flags::{AtomicRelayServiceFlags, FlagCheck, RelayServiceFlags};
3535
use self::inner::InnerRelay;
3636
pub use self::limits::RelayLimits;
3737
pub use self::options::{
38-
RelayOptions, ReqExitPolicy, SubscribeAutoCloseOptions, SubscribeOptions, SyncDirection,
39-
SyncOptions, SyncProgress,
38+
RelayOptions, ReqExitPolicy, SendMessageOptions, SendMessagePolicy, SubscribeAutoCloseOptions,
39+
SubscribeOptions, SyncDirection, SyncOptions, SyncProgress,
4040
};
4141
pub use self::stats::RelayConnectionStats;
4242
pub use self::status::RelayStatus;
@@ -405,14 +405,22 @@ impl Relay {
405405

406406
/// Send msg to relay
407407
#[inline]
408-
pub async fn send_msg(&self, msg: ClientMessage<'_>) -> Result<(), Error> {
409-
self.inner.send_msg(msg).await
408+
pub async fn send_msg(
409+
&self,
410+
msg: ClientMessage<'_>,
411+
opts: SendMessageOptions,
412+
) -> Result<(), Error> {
413+
self.inner.send_msg(msg, opts).await
410414
}
411415

412416
/// Send multiple [`ClientMessage`] at once
413417
#[inline]
414-
pub async fn batch_msg(&self, msgs: &[ClientMessage<'_>]) -> Result<(), Error> {
415-
self.inner.batch_msg(msgs).await
418+
pub async fn batch_msg(
419+
&self,
420+
msgs: &[ClientMessage<'_>],
421+
opts: SendMessageOptions,
422+
) -> Result<(), Error> {
423+
self.inner.batch_msg(msgs, opts).await
416424
}
417425

418426
async fn _send_event(
@@ -422,7 +430,10 @@ impl Relay {
422430
) -> Result<(bool, String), Error> {
423431
// Send the EVENT message
424432
self.inner
425-
.send_msg(ClientMessage::Event(Cow::Borrowed(event)))
433+
.send_msg(
434+
ClientMessage::Event(Cow::Borrowed(event)),
435+
SendMessageOptions::default(),
436+
)
426437
.await?;
427438

428439
// Wait for OK
@@ -573,7 +584,11 @@ impl Relay {
573584
.await;
574585

575586
// Send REQ message
576-
if let Err(e) = self.inner.send_msg(msg).await {
587+
if let Err(e) = self
588+
.inner
589+
.send_msg(msg, SendMessageOptions::default())
590+
.await
591+
{
577592
// Remove previously added subscription
578593
self.inner.remove_subscription(&id).await;
579594

@@ -601,7 +616,9 @@ impl Relay {
601616
};
602617

603618
// Send REQ message
604-
self.inner.send_msg(msg).await?;
619+
self.inner
620+
.send_msg(msg, SendMessageOptions::default())
621+
.await?;
605622

606623
// No auto-close subscription: update subscription filter
607624
self.inner.update_subscription(id, filters, true).await;
@@ -689,7 +706,9 @@ impl Relay {
689706
subscription_id: Cow::Borrowed(&id),
690707
filter: Cow::Owned(filter),
691708
};
692-
self.inner.send_msg(msg).await?;
709+
self.inner
710+
.send_msg(msg, SendMessageOptions::default())
711+
.await?;
693712

694713
let mut count = 0;
695714

@@ -715,7 +734,9 @@ impl Relay {
715734
.ok_or(Error::Timeout)?;
716735

717736
// Unsubscribe
718-
self.inner.send_msg(ClientMessage::close(id)).await?;
737+
self.inner
738+
.send_msg(ClientMessage::close(id), SendMessageOptions::default())
739+
.await?;
719740

720741
Ok(count)
721742
}

crates/nostr-relay-pool/src/relay/options.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,34 @@ impl RelayOptions {
166166
}
167167
}
168168

169+
/// Send message policy
170+
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
171+
pub enum SendMessagePolicy {
172+
/// Don't wait for the send confirmation.
173+
#[default]
174+
DontWait,
175+
/// Wait for send confirmation.
176+
WaitForSendConfirmation {
177+
/// Timeout
178+
timeout: Duration,
179+
},
180+
}
181+
182+
/// Send message options
183+
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
184+
pub struct SendMessageOptions {
185+
/// Policy to use when sending messages.
186+
pub policy: SendMessagePolicy,
187+
}
188+
189+
impl SendMessageOptions {
190+
/// Set policy
191+
pub const fn policy(mut self, policy: SendMessagePolicy) -> Self {
192+
self.policy = policy;
193+
self
194+
}
195+
}
196+
169197
/// Auto-closing subscribe options
170198
#[derive(Debug, Clone, Copy, Default)]
171199
pub struct SubscribeAutoCloseOptions {

0 commit comments

Comments
 (0)