-
Notifications
You must be signed in to change notification settings - Fork 436
Hold in-flight monitor updates until background event processing #4377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1277,7 +1277,11 @@ enum BackgroundEvent { | |
| /// Some [`ChannelMonitorUpdate`] (s) completed before we were serialized but we still have | ||
| /// them marked pending, thus we need to run any [`MonitorUpdateCompletionAction`] (s) pending | ||
| /// on a channel. | ||
| MonitorUpdatesComplete { counterparty_node_id: PublicKey, channel_id: ChannelId }, | ||
| MonitorUpdatesComplete { | ||
| counterparty_node_id: PublicKey, | ||
| channel_id: ChannelId, | ||
| highest_update_id_completed: u64, | ||
| }, | ||
| } | ||
|
|
||
| /// A pointer to a channel that is unblocked when an event is surfaced | ||
|
|
@@ -8136,9 +8140,11 @@ impl< | |
| /// Free the background events, generally called from [`PersistenceNotifierGuard`] constructors. | ||
| /// | ||
| /// Expects the caller to have a total_consistency_lock read lock. | ||
| #[rustfmt::skip] | ||
| fn process_background_events(&self) -> NotifyOption { | ||
| debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread); | ||
| debug_assert_ne!( | ||
| self.total_consistency_lock.held_by_thread(), | ||
| LockHeldState::NotHeldByThread | ||
| ); | ||
|
|
||
| self.background_events_processed_since_startup.store(true, Ordering::Release); | ||
|
|
||
|
|
@@ -8150,10 +8156,49 @@ impl< | |
|
|
||
| for event in background_events.drain(..) { | ||
| match event { | ||
| BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, channel_id, update } => { | ||
| self.apply_post_close_monitor_update(counterparty_node_id, channel_id, funding_txo, update); | ||
| BackgroundEvent::MonitorUpdateRegeneratedOnStartup { | ||
| counterparty_node_id, | ||
| funding_txo, | ||
| channel_id, | ||
| update, | ||
| } => { | ||
| self.apply_post_close_monitor_update( | ||
| counterparty_node_id, | ||
| channel_id, | ||
| funding_txo, | ||
| update, | ||
| ); | ||
| }, | ||
| BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => { | ||
| BackgroundEvent::MonitorUpdatesComplete { | ||
| counterparty_node_id, | ||
| channel_id, | ||
| highest_update_id_completed, | ||
| } => { | ||
| // Now that we can finally handle the background event, remove all in-flight | ||
| // monitor updates for this channel that we've known to complete, as they have | ||
| // already been persisted to the monitor and can be applied to our internal | ||
| // state such that the channel resumes operation if no new updates have been | ||
| // made since. | ||
| { | ||
| let per_peer_state = self.per_peer_state.read().unwrap(); | ||
| let mut peer_state_lock; | ||
| let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); | ||
| if peer_state_mutex_opt.is_none() { | ||
| continue; | ||
| } | ||
| peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); | ||
| let peer_state = &mut *peer_state_lock; | ||
| if let btree_map::Entry::Occupied(mut e) = | ||
| peer_state.in_flight_monitor_updates.entry(channel_id) | ||
| { | ||
| let (_, in_flight_updates) = e.get_mut(); | ||
| in_flight_updates | ||
| .retain(|update| update.update_id > highest_update_id_completed); | ||
| if in_flight_updates.is_empty() { | ||
| e.remove(); | ||
| } | ||
| } | ||
| } | ||
| self.channel_monitor_updated(&channel_id, None, &counterparty_node_id); | ||
| }, | ||
| } | ||
|
|
@@ -18134,39 +18179,58 @@ impl< | |
| ($counterparty_node_id: expr, $chan_in_flight_upds: expr, $monitor: expr, | ||
| $peer_state: expr, $logger: expr, $channel_info_log: expr | ||
| ) => { { | ||
| // When all in-flight updates have completed after we were last serialized, we | ||
| // need to remove them. However, we can't guarantee that the next serialization | ||
| // will have happened after processing the | ||
| // `BackgroundEvent::MonitorUpdatesComplete`, so removing them now could lead to the | ||
| // channel never being resumed as the event would not be regenerated after another | ||
| // reload. At the same time, we don't want to resume the channel now because there | ||
| // may be post-update actions to handle. Therefore, we're forced to keep tracking | ||
| // the completed in-flight updates (but only when they have all completed) until we | ||
| // are processing the `BackgroundEvent::MonitorUpdatesComplete`. | ||
| let mut max_in_flight_update_id = 0; | ||
| let starting_len = $chan_in_flight_upds.len(); | ||
| $chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id()); | ||
| if $chan_in_flight_upds.len() < starting_len { | ||
| let num_updates_completed = $chan_in_flight_upds | ||
| .iter() | ||
| .filter(|update| { | ||
| max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id); | ||
| update.update_id <= $monitor.get_latest_update_id() | ||
| }) | ||
| .count(); | ||
| if num_updates_completed > 0 { | ||
| log_debug!( | ||
| $logger, | ||
| "{} ChannelMonitorUpdates completed after ChannelManager was last serialized", | ||
| starting_len - $chan_in_flight_upds.len() | ||
| num_updates_completed, | ||
| ); | ||
| } | ||
| let all_updates_completed = num_updates_completed == $chan_in_flight_upds.len(); | ||
|
|
||
| let funding_txo = $monitor.get_funding_txo(); | ||
| for update in $chan_in_flight_upds.iter() { | ||
| log_debug!($logger, "Replaying ChannelMonitorUpdate {} for {}channel {}", | ||
| update.update_id, $channel_info_log, &$monitor.channel_id()); | ||
| max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id); | ||
| pending_background_events.push( | ||
| BackgroundEvent::MonitorUpdateRegeneratedOnStartup { | ||
| counterparty_node_id: $counterparty_node_id, | ||
| funding_txo: funding_txo, | ||
| channel_id: $monitor.channel_id(), | ||
| update: update.clone(), | ||
| }); | ||
| } | ||
| if $chan_in_flight_upds.is_empty() { | ||
| // We had some updates to apply, but it turns out they had completed before we | ||
| // were serialized, we just weren't notified of that. Thus, we may have to run | ||
| // the completion actions for any monitor updates, but otherwise are done. | ||
| if all_updates_completed { | ||
| log_debug!($logger, "All monitor updates completed since the ChannelManager was last serialized"); | ||
| pending_background_events.push( | ||
| BackgroundEvent::MonitorUpdatesComplete { | ||
| counterparty_node_id: $counterparty_node_id, | ||
| channel_id: $monitor.channel_id(), | ||
| highest_update_id_completed: max_in_flight_update_id, | ||
| }); | ||
| } else { | ||
| $chan_in_flight_upds.retain(|update| { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do find these updates a bit subtle. It might be easier to reason about if we symmetrically didn't remove any |
||
| let replay = update.update_id > $monitor.get_latest_update_id(); | ||
| if replay { | ||
| log_debug!($logger, "Replaying ChannelMonitorUpdate {} for {}channel {}", | ||
| update.update_id, $channel_info_log, &$monitor.channel_id()); | ||
| pending_background_events.push( | ||
| BackgroundEvent::MonitorUpdateRegeneratedOnStartup { | ||
| counterparty_node_id: $counterparty_node_id, | ||
| funding_txo: funding_txo, | ||
| channel_id: $monitor.channel_id(), | ||
| update: update.clone(), | ||
| } | ||
| ); | ||
| } | ||
| replay | ||
| }); | ||
| $peer_state.closed_channel_monitor_update_ids.entry($monitor.channel_id()) | ||
| .and_modify(|v| *v = cmp::max(max_in_flight_update_id, *v)) | ||
| .or_insert(max_in_flight_update_id); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,13 @@ use crate::prelude::*; | |
|
|
||
| use crate::ln::functional_test_utils::*; | ||
|
|
||
| fn get_latest_mon_update_id<'a, 'b, 'c>( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this can be DRY'd with the same util in |
||
| node: &Node<'a, 'b, 'c>, channel_id: ChannelId, | ||
| ) -> (u64, u64) { | ||
| let monitor_id_state = node.chain_monitor.latest_monitor_update_id.lock().unwrap(); | ||
| monitor_id_state.get(&channel_id).unwrap().clone() | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_funding_peer_disconnect() { | ||
| // Test that we can lock in our funding tx while disconnected | ||
|
|
@@ -1566,3 +1573,84 @@ fn test_peer_storage() { | |
| assert!(res.is_err()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_hold_completed_inflight_monitor_updates_upon_manager_reload() { | ||
| // Test that if a `ChannelMonitorUpdate` completes after the `ChannelManager` is serialized, | ||
| // but before it is deserialized, we hold any completed in-flight updates until background event | ||
| // processing. Previously, we would remove completed monitor updates from | ||
| // `in_flight_monitor_updates` during deserialization, relying on | ||
| // [`ChannelManager::process_background_events`] to eventually be called before the | ||
| // `ChannelManager` is serialized again such that the channel is resumed and further updates can | ||
| // be made. | ||
| let chanmon_cfgs = create_chanmon_cfgs(2); | ||
| let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); | ||
| let (persister_a, persister_b); | ||
| let (chain_monitor_a, chain_monitor_b); | ||
|
|
||
| let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); | ||
| let nodes_0_deserialized_a; | ||
| let nodes_0_deserialized_b; | ||
|
|
||
| let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); | ||
| let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; | ||
|
|
||
| send_payment(&nodes[0], &[&nodes[1]], 1_000_000); | ||
|
|
||
| chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); | ||
|
|
||
| // Send a payment that will be pending due to an async monitor update. | ||
| let (route, payment_hash, _, payment_secret) = | ||
| get_route_and_payment_hash!(nodes[0], nodes[1], 1_000_000); | ||
| let payment_id = PaymentId(payment_hash.0); | ||
| let onion = RecipientOnionFields::secret_only(payment_secret); | ||
| nodes[0].node.send_payment_with_route(route, payment_hash, onion, payment_id).unwrap(); | ||
| check_added_monitors(&nodes[0], 1); | ||
|
|
||
| assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); | ||
| assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); | ||
|
|
||
| // Serialize the ChannelManager while the monitor update is still in-flight. | ||
| let node_0_serialized = nodes[0].node.encode(); | ||
|
|
||
| // Now complete the monitor update by calling force_channel_monitor_updated. | ||
| // This updates the monitor's state, but the ChannelManager still thinks it's pending. | ||
| let (_, latest_update_id) = get_latest_mon_update_id(&nodes[0], chan_id); | ||
| nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_id, latest_update_id); | ||
| let monitor_serialized_updated = get_monitor!(nodes[0], chan_id).encode(); | ||
|
|
||
| // Reload the node with the updated monitor. Upon deserialization, the ChannelManager will | ||
| // detect that the monitor update completed (monitor's update_id >= the in-flight update_id) | ||
| // and queue a `BackgroundEvent::MonitorUpdatesComplete`. | ||
| nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id()); | ||
| nodes[1].node.peer_disconnected(nodes[0].node.get_our_node_id()); | ||
| reload_node!( | ||
| nodes[0], | ||
| test_default_channel_config(), | ||
| &node_0_serialized, | ||
| &[&monitor_serialized_updated[..]], | ||
| persister_a, | ||
| chain_monitor_a, | ||
| nodes_0_deserialized_a | ||
| ); | ||
|
|
||
| // If we serialize again, even though we haven't processed any background events yet, we should | ||
| // still see the `BackgroundEvent::MonitorUpdatesComplete` be regenerated on startup. | ||
| let node_0_serialized = nodes[0].node.encode(); | ||
| reload_node!( | ||
| nodes[0], | ||
| test_default_channel_config(), | ||
| &node_0_serialized, | ||
| &[&monitor_serialized_updated[..]], | ||
| persister_b, | ||
| chain_monitor_b, | ||
| nodes_0_deserialized_b | ||
| ); | ||
|
|
||
| // Reconnect the nodes. We should finally see the `update_add_htlc` go out, as the reconnection | ||
| // should first process `BackgroundEvent::MonitorUpdatesComplete, allowing the channel to be | ||
| // resumed. | ||
| let mut reconnect_args = ReconnectArgs::new(&nodes[0], &nodes[1]); | ||
| reconnect_args.pending_htlc_adds = (0, 1); | ||
| reconnect_nodes(reconnect_args); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can drop this hunk and just pass in
Some(highest_update_id_completedto the call tochannel_monitor_updated? All tests pass at least. If not, needs a comment for why.