@@ -78,26 +78,48 @@ impl MonitorUpdateId {
78
78
/// `Persist` defines behavior for persisting channel monitors: this could mean
79
79
/// writing once to disk, and/or uploading to one or more backup services.
80
80
///
81
- /// Each method can return two possible values:
82
- /// * If persistence (including any relevant `fsync()` calls) happens immediately, the
83
- /// implementation should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal
84
- /// channel operation should continue.
81
+ /// Persistence can happen in one of two ways - synchronously completing before the trait method
82
+ /// calls return or asynchronously in the background.
85
83
///
86
- /// * If persistence happens asynchronously, implementations can return
87
- /// [`ChannelMonitorUpdateStatus::InProgress`] while the update continues in the background.
88
- /// Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be called with
89
- /// the corresponding [`MonitorUpdateId`].
84
+ /// # For those implementing synchronous persistence
90
85
///
91
- /// Note that unlike the direct [`chain::Watch`] interface,
92
- /// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
86
+ /// * If persistence completes fully (including any relevant `fsync()` calls), the implementation
87
+ /// should return [`ChannelMonitorUpdateStatus::Completed`], indicating normal channel operation
88
+ /// should continue.
93
89
///
94
- /// If persistence fails for some reason, implementations should still return
95
- /// [`ChannelMonitorUpdateStatus::InProgress`] and attempt to shut down or otherwise resolve the
96
- /// situation ASAP.
90
+ /// * If persistence fails for some reason, implementations should consider returning
91
+ /// [`ChannelMonitorUpdateStatus::InProgress`] and retry all pending persistence operations in
92
+ /// the background with [`ChainMonitor::list_pending_monitor_updates`] and
93
+ /// [`ChainMonitor::get_monitor`].
97
94
///
98
- /// Third-party watchtowers may be built as a part of an implementation of this trait, with the
99
- /// advantage that you can control whether to resume channel operation depending on if an update
100
- /// has been persisted to a watchtower. For this, you may find the following methods useful:
95
+ /// Once a full [`ChannelMonitor`] has been persisted, all pending updates for that channel can
96
+ /// be marked as complete via [`ChainMonitor::channel_monitor_updated`].
97
+ ///
98
+ /// If at some point no further progress can be made towards persisting the pending updates, the
99
+ /// node should simply shut down.
100
+ ///
101
+ /// * If the persistence has failed and cannot be retried further (e.g. because of some timeout),
102
+ /// [`ChannelMonitorUpdateStatus::UnrecoverableError`] can be used, though this will result in
103
+ /// an immediate panic and future operations in LDK generally failing.
104
+ ///
105
+ /// # For those implementing asynchronous persistence
106
+ ///
107
+ /// All calls should generally spawn a background task and immediately return
108
+ /// [`ChannelMonitorUpdateStatus::InProgress`]. Once the update completes,
109
+ /// [`ChainMonitor::channel_monitor_updated`] should be called with the corresponding
110
+ /// [`MonitorUpdateId`].
111
+ ///
112
+ /// Note that unlike the direct [`chain::Watch`] interface,
113
+ /// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
114
+ ///
115
+ /// If at some point no further progress can be made towards persisting a pending update, the node
116
+ /// should simply shut down.
117
+ ///
118
+ /// # Using remote watchtowers
119
+ ///
120
+ /// Watchtowers may be updated as a part of an implementation of this trait, utilizing the async
121
+ /// update process described above while the watchtower is being updated. The following methods are
122
+ /// provided for bulding transactions for a watchtower:
101
123
/// [`ChannelMonitor::initial_counterparty_commitment_tx`],
102
124
/// [`ChannelMonitor::counterparty_commitment_txs_from_update`],
103
125
/// [`ChannelMonitor::sign_to_local_justice_tx`], [`TrustedCommitmentTransaction::revokeable_output_index`],
@@ -279,19 +301,31 @@ where C::Target: chain::Filter,
279
301
where
280
302
FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
281
303
{
304
+ let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
282
305
let funding_outpoints: HashSet < OutPoint > = HashSet :: from_iter ( self . monitors . read ( ) . unwrap ( ) . keys ( ) . cloned ( ) ) ;
283
306
for funding_outpoint in funding_outpoints. iter ( ) {
284
307
let monitor_lock = self . monitors . read ( ) . unwrap ( ) ;
285
308
if let Some ( monitor_state) = monitor_lock. get ( funding_outpoint) {
286
- self . update_monitor_with_chain_data ( header, best_height, txdata, & process, funding_outpoint, & monitor_state) ;
309
+ if self . update_monitor_with_chain_data ( header, best_height, txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
310
+ // Take the monitors lock for writing so that we poison it and any future
311
+ // operations going forward fail immediately.
312
+ core:: mem:: drop ( monitor_state) ;
313
+ core:: mem:: drop ( monitor_lock) ;
314
+ let _poison = self . monitors . write ( ) . unwrap ( ) ;
315
+ log_error ! ( self . logger, "{}" , err_str) ;
316
+ panic ! ( "{}" , err_str) ;
317
+ }
287
318
}
288
319
}
289
320
290
321
// do some followup cleanup if any funding outpoints were added in between iterations
291
322
let monitor_states = self . monitors . write ( ) . unwrap ( ) ;
292
323
for ( funding_outpoint, monitor_state) in monitor_states. iter ( ) {
293
324
if !funding_outpoints. contains ( funding_outpoint) {
294
- self . update_monitor_with_chain_data ( header, best_height, txdata, & process, funding_outpoint, & monitor_state) ;
325
+ if self . update_monitor_with_chain_data ( header, best_height, txdata, & process, funding_outpoint, & monitor_state) . is_err ( ) {
326
+ log_error ! ( self . logger, "{}" , err_str) ;
327
+ panic ! ( "{}" , err_str) ;
328
+ }
295
329
}
296
330
}
297
331
@@ -306,7 +340,10 @@ where C::Target: chain::Filter,
306
340
}
307
341
}
308
342
309
- fn update_monitor_with_chain_data < FN > ( & self , header : & BlockHeader , best_height : Option < u32 > , txdata : & TransactionData , process : FN , funding_outpoint : & OutPoint , monitor_state : & MonitorHolder < ChannelSigner > ) where FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs > {
343
+ fn update_monitor_with_chain_data < FN > (
344
+ & self , header : & BlockHeader , best_height : Option < u32 > , txdata : & TransactionData ,
345
+ process : FN , funding_outpoint : & OutPoint , monitor_state : & MonitorHolder < ChannelSigner >
346
+ ) -> Result < ( ) , ( ) > where FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs > {
310
347
let monitor = & monitor_state. monitor ;
311
348
let mut txn_outputs;
312
349
{
@@ -331,7 +368,10 @@ where C::Target: chain::Filter,
331
368
ChannelMonitorUpdateStatus :: InProgress => {
332
369
log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
333
370
pending_monitor_updates. push ( update_id) ;
334
- }
371
+ } ,
372
+ ChannelMonitorUpdateStatus :: UnrecoverableError => {
373
+ return Err ( ( ) ) ;
374
+ } ,
335
375
}
336
376
}
337
377
@@ -351,6 +391,7 @@ where C::Target: chain::Filter,
351
391
}
352
392
}
353
393
}
394
+ Ok ( ( ) )
354
395
}
355
396
356
397
/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
@@ -674,7 +715,12 @@ where C::Target: chain::Filter,
674
715
} ,
675
716
ChannelMonitorUpdateStatus :: Completed => {
676
717
log_info ! ( self . logger, "Persistence of new ChannelMonitor for channel {} completed" , log_funding_info!( monitor) ) ;
677
- }
718
+ } ,
719
+ ChannelMonitorUpdateStatus :: UnrecoverableError => {
720
+ let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
721
+ log_error ! ( self . logger, "{}" , err_str) ;
722
+ panic ! ( "{}" , err_str) ;
723
+ } ,
678
724
}
679
725
if let Some ( ref chain_source) = self . chain_source {
680
726
monitor. load_outputs_to_watch ( chain_source) ;
@@ -690,7 +736,7 @@ where C::Target: chain::Filter,
690
736
fn update_channel ( & self , funding_txo : OutPoint , update : & ChannelMonitorUpdate ) -> ChannelMonitorUpdateStatus {
691
737
// Update the monitor that watches the channel referred to by the given outpoint.
692
738
let monitors = self . monitors . read ( ) . unwrap ( ) ;
693
- match monitors. get ( & funding_txo) {
739
+ let ret = match monitors. get ( & funding_txo) {
694
740
None => {
695
741
log_error ! ( self . logger, "Failed to update channel monitor: no such monitor registered" ) ;
696
742
@@ -722,14 +768,25 @@ where C::Target: chain::Filter,
722
768
ChannelMonitorUpdateStatus :: Completed => {
723
769
log_debug ! ( self . logger, "Persistence of ChannelMonitorUpdate for channel {} completed" , log_funding_info!( monitor) ) ;
724
770
} ,
771
+ ChannelMonitorUpdateStatus :: UnrecoverableError => { /* we'll panic in a moment */ } ,
725
772
}
726
773
if update_res. is_err ( ) {
727
774
ChannelMonitorUpdateStatus :: InProgress
728
775
} else {
729
776
persist_res
730
777
}
731
778
}
779
+ } ;
780
+ if let ChannelMonitorUpdateStatus :: UnrecoverableError = ret {
781
+ // Take the monitors lock for writing so that we poison it and any future
782
+ // operations going forward fail immediately.
783
+ core:: mem:: drop ( monitors) ;
784
+ let _poison = self . monitors . write ( ) . unwrap ( ) ;
785
+ let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down." ;
786
+ log_error ! ( self . logger, "{}" , err_str) ;
787
+ panic ! ( "{}" , err_str) ;
732
788
}
789
+ ret
733
790
}
734
791
735
792
fn release_pending_monitor_events ( & self ) -> Vec < ( OutPoint , Vec < MonitorEvent > , Option < PublicKey > ) > {
@@ -973,4 +1030,25 @@ mod tests {
973
1030
do_chainsync_pauses_events ( false ) ;
974
1031
do_chainsync_pauses_events ( true ) ;
975
1032
}
1033
+
1034
+ #[ test]
1035
+ fn update_during_chainsync_poisons_channel ( ) {
1036
+ let chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
1037
+ let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
1038
+ let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
1039
+ let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
1040
+ create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
1041
+
1042
+ chanmon_cfgs[ 0 ] . persister . chain_sync_monitor_persistences . lock ( ) . unwrap ( ) . clear ( ) ;
1043
+ chanmon_cfgs[ 0 ] . persister . set_update_ret ( ChannelMonitorUpdateStatus :: UnrecoverableError ) ;
1044
+
1045
+ assert ! ( std:: panic:: catch_unwind( || {
1046
+ // Returning an UnrecoverableError should always panic immediately
1047
+ connect_blocks( & nodes[ 0 ] , 1 ) ;
1048
+ } ) . is_err( ) ) ;
1049
+ assert ! ( std:: panic:: catch_unwind( || {
1050
+ // ...and also poison our locks causing later use to panic as well
1051
+ core:: mem:: drop( nodes) ;
1052
+ } ) . is_err( ) ) ;
1053
+ }
976
1054
}
0 commit comments