Skip to content

Commit 716c06a

Browse files
committed
Read persisted event queue state in LiquidityManager::new
We read any previously-persisted state upon construction of `LiquidityManager`.
1 parent 73ed082 commit 716c06a

File tree

6 files changed

+48
-8
lines changed

6 files changed

+48
-8
lines changed

lightning-liquidity/src/events/event_queue.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ impl<K: Deref + Clone> EventQueue<K>
3737
where
3838
K::Target: KVStore,
3939
{
40-
pub fn new(kv_store: K) -> Self {
41-
let queue = Arc::new(Mutex::new(VecDeque::new()));
40+
pub fn new(queue: VecDeque<LiquidityEvent>, kv_store: K) -> Self {
41+
let queue = Arc::new(Mutex::new(queue));
4242
let waker = Arc::new(Mutex::new(None));
4343
Self {
4444
queue,
@@ -208,7 +208,7 @@ mod tests {
208208
use std::time::Duration;
209209

210210
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
211-
let event_queue = Arc::new(EventQueue::new(kv_store));
211+
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
212212
assert_eq!(event_queue.next_event(), None);
213213

214214
let secp_ctx = Secp256k1::new();

lightning-liquidity/src/events/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
1818
mod event_queue;
1919

20-
pub(crate) use event_queue::EventQueue;
2120
pub use event_queue::MAX_EVENT_QUEUE_SIZE;
21+
pub(crate) use event_queue::{EventQueue, EventQueueDeserWrapper};
2222

2323
use crate::lsps0;
2424
use crate::lsps1;

lightning-liquidity/src/lsps0/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ where
117117

118118
#[cfg(test)]
119119
mod tests {
120+
use alloc::collections::VecDeque;
120121
use alloc::string::ToString;
121122
use alloc::sync::Arc;
122123

@@ -133,7 +134,7 @@ mod tests {
133134
let pending_messages = Arc::new(MessageQueue::new());
134135
let entropy_source = Arc::new(TestEntropy {});
135136
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
136-
let event_queue = Arc::new(EventQueue::new(kv_store));
137+
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
137138

138139
let lsps0_handler = Arc::new(LSPS0ClientHandler::new(
139140
entropy_source,

lightning-liquidity/src/lsps5/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ mod tests {
475475
let message_queue = Arc::new(MessageQueue::new());
476476

477477
let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
478-
let event_queue = Arc::new(EventQueue::new(kv_store));
478+
let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store));
479479
let client = LSPS5ClientHandler::new(
480480
test_entropy_source,
481481
Arc::clone(&message_queue),

lightning-liquidity/src/manager.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use crate::lsps5::client::{LSPS5ClientConfig, LSPS5ClientHandler};
2424
use crate::lsps5::msgs::LSPS5Message;
2525
use crate::lsps5::service::{LSPS5ServiceConfig, LSPS5ServiceHandler};
2626
use crate::message_queue::MessageQueue;
27-
use crate::persist::{read_lsps2_service_peer_states, read_lsps5_service_peer_states};
27+
use crate::persist::{
28+
read_event_queue, read_lsps2_service_peer_states, read_lsps5_service_peer_states,
29+
};
2830

2931
use crate::lsps1::client::{LSPS1ClientConfig, LSPS1ClientHandler};
3032
use crate::lsps1::msgs::LSPS1Message;
@@ -384,7 +386,8 @@ where
384386
client_config: Option<LiquidityClientConfig>, time_provider: TP,
385387
) -> Result<Self, lightning::io::Error> {
386388
let pending_messages = Arc::new(MessageQueue::new());
387-
let pending_events = Arc::new(EventQueue::new(kv_store.clone()));
389+
let persisted_queue = read_event_queue(kv_store.clone()).await?.unwrap_or_default();
390+
let pending_events = Arc::new(EventQueue::new(persisted_queue, kv_store.clone()));
388391
let ignored_peers = RwLock::new(new_hash_set());
389392

390393
let mut supported_protocols = Vec::new();

lightning-liquidity/src/persist.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
//! Types and utils for persistence.
1111
12+
use crate::events::{EventQueueDeserWrapper, LiquidityEvent};
1213
use crate::lsps2::service::PeerState as LSPS2ServicePeerState;
1314
use crate::lsps5::service::PeerState as LSPS5ServicePeerState;
1415

@@ -18,6 +19,7 @@ use lightning::util::ser::Readable;
1819

1920
use bitcoin::secp256k1::PublicKey;
2021

22+
use alloc::collections::VecDeque;
2123
use alloc::vec::Vec;
2224

2325
use core::ops::Deref;
@@ -48,6 +50,40 @@ pub const LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps2_service";
4850
/// [`LSPS5ServiceHandler`]: crate::lsps5::service::LSPS5ServiceHandler
4951
pub const LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps5_service";
5052

53+
pub(crate) async fn read_event_queue<K: Deref>(
54+
kv_store: K,
55+
) -> Result<Option<VecDeque<LiquidityEvent>>, lightning::io::Error>
56+
where
57+
K::Target: KVStore,
58+
{
59+
let read_fut = kv_store.read(
60+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
61+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
62+
LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY,
63+
);
64+
65+
let mut reader = match read_fut.await {
66+
Ok(r) => Cursor::new(r),
67+
Err(e) => {
68+
if e.kind() == lightning::io::ErrorKind::NotFound {
69+
// Key wasn't found, no error but first time running.
70+
return Ok(None);
71+
} else {
72+
return Err(e);
73+
}
74+
},
75+
};
76+
77+
let queue: EventQueueDeserWrapper = Readable::read(&mut reader).map_err(|_| {
78+
lightning::io::Error::new(
79+
lightning::io::ErrorKind::InvalidData,
80+
"Failed to deserialize liquidity event queue",
81+
)
82+
})?;
83+
84+
Ok(Some(queue.0))
85+
}
86+
5187
pub(crate) async fn read_lsps2_service_peer_states<K: Deref>(
5288
kv_store: K,
5389
) -> Result<Vec<(PublicKey, LSPS2ServicePeerState)>, lightning::io::Error>

0 commit comments

Comments
 (0)