Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 173 additions & 100 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3271,8 +3271,8 @@ macro_rules! locked_close_channel {
}};
($self: ident, $peer_state: expr, $funded_chan: expr, $shutdown_res_mut: expr, FUNDED) => {{
if let Some((_, funding_txo, _, update)) = $shutdown_res_mut.monitor_update.take() {
handle_new_monitor_update!($self, funding_txo, update, $peer_state,
$funded_chan.context, REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER);
handle_new_monitor_update_actions_processed_later!($self, funding_txo, update, $peer_state,
$funded_chan.context);
}
// If there's a possibility that we need to generate further monitor updates for this
// channel, we need to store the last update_id of it. However, we don't want to insert
Expand Down Expand Up @@ -3628,57 +3628,122 @@ macro_rules! handle_monitor_update_completion {
} }
}

macro_rules! handle_new_monitor_update {
($self: ident, $update_res: expr, $logger: expr, $channel_id: expr, _internal, $completed: expr) => { {
debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire));
match $update_res {
ChannelMonitorUpdateStatus::UnrecoverableError => {
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!($logger, "{}", err_str);
panic!("{}", err_str);
},
ChannelMonitorUpdateStatus::InProgress => {
#[cfg(not(any(test, feature = "_externalize_tests")))]
if $self.monitor_update_type.swap(1, Ordering::Relaxed) == 2 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
log_debug!($logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
$channel_id);
false
},
ChannelMonitorUpdateStatus::Completed => {
#[cfg(not(any(test, feature = "_externalize_tests")))]
if $self.monitor_update_type.swap(2, Ordering::Relaxed) == 1 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
$completed;
true
},
}
} };
($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr, INITIAL_MONITOR) => {
macro_rules! handle_initial_monitor {
($self: ident, $update_res: expr, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => {
let logger = WithChannelContext::from(&$self.logger, &$chan.context, None);
handle_new_monitor_update!($self, $update_res, logger, $chan.context.channel_id(), _internal,
handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan))
let update_completed =
$self.handle_monitor_update_res($update_res, $chan.context.channel_id(), logger);
if update_completed {
handle_monitor_update_completion!(
$self,
$peer_state_lock,
$peer_state,
$per_peer_state_lock,
$chan
);
}
};
}

macro_rules! handle_post_close_monitor_update {
(
$self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr,
$per_peer_state_lock: expr, $counterparty_node_id: expr, $channel_id: expr
) => {{
let logger =
WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None);
let in_flight_updates;
let idx;
handle_new_monitor_update_internal!(
$self,
$funding_txo,
$update,
$peer_state,
logger,
$channel_id,
$counterparty_node_id,
in_flight_updates,
idx,
{
let _ = in_flight_updates.remove(idx);
if in_flight_updates.is_empty() {
let update_actions = $peer_state
.monitor_update_blocked_actions
.remove(&$channel_id)
.unwrap_or(Vec::new());

mem::drop($peer_state_lock);
mem::drop($per_peer_state_lock);

$self.handle_monitor_update_completion_actions(update_actions);
}
}
)
}};
}

/// Handles a new monitor update without dropping peer_state locks and calling
/// [`ChannelManager::handle_monitor_update_completion_actions`] if the monitor update completed
/// synchronously.
///
/// Useful because monitor updates need to be handled in the same mutex where the channel generated
/// them (otherwise they can end up getting applied out-of-order) but it's not always possible to
/// drop the aforementioned peer state locks at a given callsite. In this situation, use this macro
/// to apply the monitor update immediately and handle the monitor update completion actions at a
/// later time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice explanation. I strongly believe that adding comments like this will make the code base a lot easier to work with, especially for new people.

macro_rules! handle_new_monitor_update_actions_processed_later {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe save one word handle_new_monitor_update_actions_deferred?

(
$self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $chan_context: expr
) => {{
let logger = WithChannelContext::from(&$self.logger, &$chan_context, None);
let chan_id = $chan_context.channel_id();
let counterparty_node_id = $chan_context.get_counterparty_node_id();
let in_flight_updates;
let idx;
handle_new_monitor_update_internal!(
$self,
$funding_txo,
$update,
$peer_state,
logger,
chan_id,
counterparty_node_id,
in_flight_updates,
idx,
{
let _ = in_flight_updates.remove(idx);
}
)
}};
}

macro_rules! handle_new_monitor_update_internal {
(
$self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $logger: expr,
$chan_id: expr, $counterparty_node_id: expr, $in_flight_updates: ident, $update_idx: ident,
_internal_outer, $completed: expr
) => { {
$in_flight_updates = &mut $peer_state.in_flight_monitor_updates.entry($chan_id)
.or_insert_with(|| ($funding_txo, Vec::new())).1;
$completed: expr
) => {{
$in_flight_updates = &mut $peer_state
.in_flight_monitor_updates
.entry($chan_id)
.or_insert_with(|| ($funding_txo, Vec::new()))
.1;
// During startup, we push monitor updates as background events through to here in
// order to replay updates that were in-flight when we shut down. Thus, we have to
// filter for uniqueness here.
$update_idx = $in_flight_updates.iter().position(|upd| upd == &$update)
.unwrap_or_else(|| {
$update_idx =
$in_flight_updates.iter().position(|upd| upd == &$update).unwrap_or_else(|| {
$in_flight_updates.push($update);
$in_flight_updates.len() - 1
});
if $self.background_events_processed_since_startup.load(Ordering::Acquire) {
let update_res = $self.chain_monitor.update_channel($chan_id, &$in_flight_updates[$update_idx]);
handle_new_monitor_update!($self, update_res, $logger, $chan_id, _internal, $completed)
let update_res =
$self.chain_monitor.update_channel($chan_id, &$in_flight_updates[$update_idx]);
let update_completed = $self.handle_monitor_update_res(update_res, $chan_id, $logger);
if update_completed {
$completed;
}
update_completed
} else {
// We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we
// fail to persist it. This is a fairly safe assumption, however, since anything we do
Expand All @@ -3700,62 +3765,43 @@ macro_rules! handle_new_monitor_update {
$self.pending_background_events.lock().unwrap().push(event);
false
}
} };
(
$self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $chan_context: expr,
REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER
) => { {
let logger = WithChannelContext::from(&$self.logger, &$chan_context, None);
let chan_id = $chan_context.channel_id();
let counterparty_node_id = $chan_context.get_counterparty_node_id();
let in_flight_updates;
let idx;
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id,
counterparty_node_id, in_flight_updates, idx, _internal_outer,
{
let _ = in_flight_updates.remove(idx);
})
} };
(
$self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr,
$per_peer_state_lock: expr, $counterparty_node_id: expr, $channel_id: expr, POST_CHANNEL_CLOSE
) => { {
let logger = WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None);
let in_flight_updates;
let idx;
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger,
$channel_id, $counterparty_node_id, in_flight_updates, idx, _internal_outer,
{
let _ = in_flight_updates.remove(idx);
if in_flight_updates.is_empty() {
let update_actions = $peer_state.monitor_update_blocked_actions
.remove(&$channel_id).unwrap_or(Vec::new());

mem::drop($peer_state_lock);
mem::drop($per_peer_state_lock);
}};
}

$self.handle_monitor_update_completion_actions(update_actions);
}
})
} };
macro_rules! handle_new_monitor_update {
(
$self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr,
$per_peer_state_lock: expr, $chan: expr
) => { {
) => {{
let logger = WithChannelContext::from(&$self.logger, &$chan.context, None);
let chan_id = $chan.context.channel_id();
let counterparty_node_id = $chan.context.get_counterparty_node_id();
let in_flight_updates;
let idx;
handle_new_monitor_update!($self, $funding_txo, $update, $peer_state, logger, chan_id,
counterparty_node_id, in_flight_updates, idx, _internal_outer,
handle_new_monitor_update_internal!(
$self,
$funding_txo,
$update,
$peer_state,
logger,
chan_id,
counterparty_node_id,
in_flight_updates,
idx,
{
let _ = in_flight_updates.remove(idx);
if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 {
handle_monitor_update_completion!($self, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan);
handle_monitor_update_completion!(
$self,
$peer_state_lock,
$peer_state,
$per_peer_state_lock,
$chan
);
}
})
} };
}
)
}};
}

#[rustfmt::skip]
Expand Down Expand Up @@ -4432,9 +4478,9 @@ where
hash_map::Entry::Vacant(_) => {},
}

handle_new_monitor_update!(
handle_post_close_monitor_update!(
self, funding_txo, monitor_update, peer_state_lock, peer_state, per_peer_state,
counterparty_node_id, channel_id, POST_CHANNEL_CLOSE
counterparty_node_id, channel_id
);
}

Expand Down Expand Up @@ -8893,16 +8939,15 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
.push(action);
}

handle_new_monitor_update!(
handle_post_close_monitor_update!(
self,
prev_hop.funding_txo,
preimage_update,
peer_state_lock,
peer_state,
per_peer_state,
prev_hop.counterparty_node_id,
chan_id,
POST_CHANNEL_CLOSE
chan_id
);
}

Expand Down Expand Up @@ -9506,6 +9551,36 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
}
}

/// Returns whether the monitor update is completed, `false` if the update is in-progress.
fn handle_monitor_update_res<LG: Logger>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit for reviewing: keeping it in the same place as the macro makes for a better diff.

&self, update_res: ChannelMonitorUpdateStatus, channel_id: ChannelId, logger: LG,
) -> bool {
debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire));
match update_res {
ChannelMonitorUpdateStatus::UnrecoverableError => {
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
log_error!(logger, "{}", err_str);
panic!("{}", err_str);
},
ChannelMonitorUpdateStatus::InProgress => {
#[cfg(not(any(test, feature = "_externalize_tests")))]
if self.monitor_update_type.swap(1, Ordering::Relaxed) == 2 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
log_debug!(logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.",
channel_id);
false
},
ChannelMonitorUpdateStatus::Completed => {
#[cfg(not(any(test, feature = "_externalize_tests")))]
if self.monitor_update_type.swap(2, Ordering::Relaxed) == 1 {
panic!("Cannot use both ChannelMonitorUpdateStatus modes InProgress and Completed without restart");
}
true
},
}
}

/// Accepts a request to open a channel after a [`Event::OpenChannelRequest`].
///
/// The `temporary_channel_id` parameter indicates which inbound channel should be accepted,
Expand Down Expand Up @@ -10056,8 +10131,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
}

if let Some(funded_chan) = e.insert(Channel::from(chan)).as_funded_mut() {
handle_new_monitor_update!(self, persist_state, peer_state_lock, peer_state,
per_peer_state, funded_chan, INITIAL_MONITOR);
handle_initial_monitor!(self, persist_state, peer_state_lock, peer_state,
per_peer_state, funded_chan);
} else {
unreachable!("This must be a funded channel as we just inserted it.");
}
Expand Down Expand Up @@ -10220,7 +10295,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
})
{
Ok((funded_chan, persist_status)) => {
handle_new_monitor_update!(self, persist_status, peer_state_lock, peer_state, per_peer_state, funded_chan, INITIAL_MONITOR);
handle_initial_monitor!(self, persist_status, peer_state_lock, peer_state, per_peer_state, funded_chan);
Ok(())
},
Err(e) => try_channel_entry!(self, peer_state, Err(e), chan_entry),
Expand Down Expand Up @@ -10845,8 +10920,8 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
if let Some(monitor) = monitor_opt {
let monitor_res = self.chain_monitor.watch_channel(monitor.channel_id(), monitor);
if let Ok(persist_state) = monitor_res {
handle_new_monitor_update!(self, persist_state, peer_state_lock, peer_state,
per_peer_state, chan, INITIAL_MONITOR);
handle_initial_monitor!(self, persist_state, peer_state_lock, peer_state,
per_peer_state, chan);
} else {
let logger = WithChannelContext::from(&self.logger, &chan.context, None);
log_error!(logger, "Persisting initial ChannelMonitor failed, implying the channel ID was duplicated");
Expand Down Expand Up @@ -13283,16 +13358,15 @@ where
};
self.pending_background_events.lock().unwrap().push(event);
} else {
handle_new_monitor_update!(
handle_post_close_monitor_update!(
self,
channel_funding_outpoint,
update,
peer_state,
peer_state,
per_peer_state,
counterparty_node_id,
channel_id,
POST_CHANNEL_CLOSE
channel_id
);
}
},
Expand Down Expand Up @@ -13976,13 +14050,12 @@ where
insert_short_channel_id!(short_to_chan_info, funded_channel);

if let Some(monitor_update) = monitor_update_opt {
handle_new_monitor_update!(
handle_new_monitor_update_actions_processed_later!(
self,
funding_txo,
monitor_update,
peer_state,
funded_channel.context,
REMAIN_LOCKED_UPDATE_ACTIONS_PROCESSED_LATER
funded_channel.context
);
to_process_monitor_update_actions.push((
counterparty_node_id, channel_id
Expand Down