diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 3fa2073d5ba..8e616927679 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -49,13 +49,6 @@ use crate::prelude::*; use crate::sync::{Arc, Mutex}; use bitcoin::hashes::Hash; -fn get_latest_mon_update_id<'a, 'b, 'c>( - node: &Node<'a, 'b, 'c>, channel_id: ChannelId, -) -> (u64, u64) { - let monitor_id_state = node.chain_monitor.latest_monitor_update_id.lock().unwrap(); - monitor_id_state.get(&channel_id).unwrap().clone() -} - #[test] fn test_monitor_and_persister_update_fail() { // Test that if both updating the `ChannelMonitor` and persisting the updated @@ -212,7 +205,7 @@ fn do_test_simple_monitor_temporary_update_fail(disconnect: bool) { } chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[0], channel_id); + let (latest_update, _) = nodes[0].chain_monitor.get_latest_mon_update_id(channel_id); nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); check_added_monitors(&nodes[0], 0); @@ -404,7 +397,7 @@ fn do_test_monitor_temporary_update_fail(disconnect_count: usize) { // Now fix monitor updating... chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[0], channel_id); + let (latest_update, _) = nodes[0].chain_monitor.get_latest_mon_update_id(channel_id); nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); check_added_monitors(&nodes[0], 0); @@ -757,7 +750,7 @@ fn test_monitor_update_fail_cs() { assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], channel_id); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(channel_id); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); check_added_monitors(&nodes[1], 0); let responses = nodes[1].node.get_and_clear_pending_msg_events(); @@ -792,7 +785,7 @@ fn test_monitor_update_fail_cs() { } chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[0], channel_id); + let (latest_update, _) = nodes[0].chain_monitor.get_latest_mon_update_id(channel_id); nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); check_added_monitors(&nodes[0], 0); @@ -868,7 +861,7 @@ fn test_monitor_update_fail_no_rebroadcast() { check_added_monitors(&nodes[1], 1); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], channel_id); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(channel_id); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors(&nodes[1], 0); @@ -938,7 +931,7 @@ fn test_monitor_update_raa_while_paused() { assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors(&nodes[0], 1); - let (latest_update, _) = get_latest_mon_update_id(&nodes[0], channel_id); + let (latest_update, _) = nodes[0].chain_monitor.get_latest_mon_update_id(channel_id); nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); check_added_monitors(&nodes[0], 0); @@ -1080,7 +1073,7 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) { // Restore monitor updating, ensuring we immediately get a fail-back update and a // update_add update. chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], chan_2.2); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(chan_2.2); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_2.2, latest_update); check_added_monitors(&nodes[1], 0); expect_and_process_pending_htlcs_and_htlc_handling_failed( @@ -1354,7 +1347,7 @@ fn test_monitor_update_fail_reestablish() { assert_eq!(bs_channel_upd.contents.channel_flags & 2, 0); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], chan_1.2); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(chan_1.2); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_1.2, latest_update); check_added_monitors(&nodes[1], 0); @@ -1439,7 +1432,7 @@ fn raa_no_response_awaiting_raa_state() { assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors(&nodes[1], 1); - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], channel_id); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(channel_id); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); // nodes[1] should be AwaitingRAA here! check_added_monitors(&nodes[1], 0); @@ -1568,7 +1561,7 @@ fn claim_while_disconnected_monitor_update_fail() { // Now un-fail the monitor, which will result in B sending its original commitment update, // receiving the commitment update from A, and the resulting commitment dances. chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], channel_id); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(channel_id); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); check_added_monitors(&nodes[1], 0); @@ -1697,7 +1690,7 @@ fn monitor_failed_no_reestablish_response() { get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_b_id); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], channel_id); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(channel_id); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); check_added_monitors(&nodes[1], 0); let bs_responses = get_revoke_commit_msgs(&nodes[1], &node_a_id); @@ -1795,7 +1788,7 @@ fn first_message_on_recv_ordering() { assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], channel_id); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(channel_id); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); check_added_monitors(&nodes[1], 0); @@ -1894,7 +1887,7 @@ fn test_monitor_update_fail_claim() { // Now restore monitor updating on the 0<->1 channel and claim the funds on B. let channel_id = chan_1.2; - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], channel_id); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(channel_id); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); check_added_monitors(&nodes[1], 0); @@ -2023,7 +2016,7 @@ fn test_monitor_update_on_pending_forwards() { check_added_monitors(&nodes[1], 1); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], chan_1.2); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(chan_1.2); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_1.2, latest_update); check_added_monitors(&nodes[1], 0); @@ -2094,7 +2087,7 @@ fn monitor_update_claim_fail_no_response() { assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], channel_id); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(channel_id); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000); check_added_monitors(&nodes[1], 0); @@ -2166,7 +2159,7 @@ fn do_during_funding_monitor_fail( assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[0], channel_id); + let (latest_update, _) = nodes[0].chain_monitor.get_latest_mon_update_id(channel_id); nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); check_added_monitors(&nodes[0], 0); expect_channel_pending_event(&nodes[0], &node_b_id); @@ -2221,7 +2214,7 @@ fn do_during_funding_monitor_fail( } chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], channel_id); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(channel_id); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); check_added_monitors(&nodes[1], 0); @@ -2339,7 +2332,7 @@ fn test_path_paused_mpp() { // And check that, after we successfully update the monitor for chan_2 we can pass the second // HTLC along to nodes[3] and claim the whole payment back to nodes[0]. - let (latest_update, _) = get_latest_mon_update_id(&nodes[0], chan_2_id); + let (latest_update, _) = nodes[0].chain_monitor.get_latest_mon_update_id(chan_2_id); nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_2_id, latest_update); let mut events = nodes[0].node.get_and_clear_pending_msg_events(); @@ -2787,7 +2780,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) { // If we finish updating the monitor, we should free the holding cell right away (this did // not occur prior to #756). This should result in a new monitor update. chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (mon_id, _) = get_latest_mon_update_id(&nodes[0], chan_id); + let (mon_id, _) = nodes[0].chain_monitor.get_latest_mon_update_id(chan_id); nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_id, mon_id); expect_payment_claimed!(nodes[0], payment_hash_0, 100_000); check_added_monitors(&nodes[0], 1); @@ -3039,7 +3032,7 @@ fn test_temporary_error_during_shutdown() { chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[0], channel_id); + let (latest_update, _) = nodes[0].chain_monitor.get_latest_mon_update_id(channel_id); nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); nodes[1].node.handle_closing_signed( node_a_id, @@ -3049,7 +3042,7 @@ fn test_temporary_error_during_shutdown() { assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], channel_id); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(channel_id); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update); nodes[0].node.handle_closing_signed( @@ -3095,7 +3088,7 @@ fn double_temp_error() { // `claim_funds` results in a ChannelMonitorUpdate. nodes[1].node.claim_funds(payment_preimage_1); check_added_monitors(&nodes[1], 1); - let (latest_update_1, _) = get_latest_mon_update_id(&nodes[1], channel_id); + let (latest_update_1, _) = nodes[1].chain_monitor.get_latest_mon_update_id(channel_id); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); // Previously, this would've panicked due to a double-call to `Channel::monitor_update_failed`, @@ -3104,7 +3097,7 @@ fn double_temp_error() { check_added_monitors(&nodes[1], 1); chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); - let (latest_update_2, _) = get_latest_mon_update_id(&nodes[1], channel_id); + let (latest_update_2, _) = nodes[1].chain_monitor.get_latest_mon_update_id(channel_id); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(channel_id, latest_update_1); assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors(&nodes[1], 0); @@ -3511,7 +3504,7 @@ fn do_test_blocked_chan_preimage_release(completion_mode: BlockedUpdateComplMode reconnect_nodes(a_b_reconnect); reconnect_nodes(ReconnectArgs::new(&nodes[2], &nodes[1])); } else if completion_mode == BlockedUpdateComplMode::Async { - let (latest_update, _) = get_latest_mon_update_id(&nodes[1], chan_id_2); + let (latest_update, _) = nodes[1].chain_monitor.get_latest_mon_update_id(chan_id_2); nodes[1] .chain_monitor .chain_monitor @@ -3689,7 +3682,7 @@ fn do_test_inverted_mon_completion_order( // (Finally) complete the A <-> B ChannelMonitorUpdate, ensuring the preimage is durably on // disk in the proper ChannelMonitor, unblocking the B <-> C ChannelMonitor updating // process. - let (_, ab_update_id) = get_latest_mon_update_id(&nodes[1], chan_id_ab); + let (_, ab_update_id) = nodes[1].chain_monitor.get_latest_mon_update_id(chan_id_ab); nodes[1] .chain_monitor .chain_monitor @@ -3722,7 +3715,7 @@ fn do_test_inverted_mon_completion_order( // ChannelMonitorUpdate hasn't yet completed. reconnect_nodes(ReconnectArgs::new(&nodes[0], &nodes[1])); - let (_, ab_update_id) = get_latest_mon_update_id(&nodes[1], chan_id_ab); + let (_, ab_update_id) = nodes[1].chain_monitor.get_latest_mon_update_id(chan_id_ab); nodes[1] .chain_monitor .chain_monitor @@ -3935,7 +3928,7 @@ fn do_test_durable_preimages_on_closed_channel( // Once the blocked `ChannelMonitorUpdate` *finally* completes, the pending // `PaymentForwarded` event will finally be released. - let (_, ab_update_id) = get_latest_mon_update_id(&nodes[1], chan_id_ab); + let (_, ab_update_id) = nodes[1].chain_monitor.get_latest_mon_update_id(chan_id_ab); nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_id_ab, ab_update_id); // If the A<->B channel was closed before we reload, we'll replay the claim against it on @@ -4047,7 +4040,7 @@ fn do_test_reload_mon_update_completion_actions(close_during_reload: bool) { mine_transaction_without_consistency_checks(&nodes[1], &as_closing_tx[0]); } - let (_, bc_update_id) = get_latest_mon_update_id(&nodes[1], chan_id_bc); + let (_, bc_update_id) = nodes[1].chain_monitor.get_latest_mon_update_id(chan_id_bc); let mut events = nodes[1].node.get_and_clear_pending_events(); assert_eq!(events.len(), if close_during_reload { 2 } else { 1 }); expect_payment_forwarded( @@ -4072,7 +4065,7 @@ fn do_test_reload_mon_update_completion_actions(close_during_reload: bool) { // Once we run event processing the monitor should free, check that it was indeed the B<->C // channel which was updated. check_added_monitors(&nodes[1], if close_during_reload { 2 } else { 1 }); - let (_, post_ev_bc_update_id) = get_latest_mon_update_id(&nodes[1], chan_id_bc); + let (_, post_ev_bc_update_id) = nodes[1].chain_monitor.get_latest_mon_update_id(chan_id_bc); assert!(bc_update_id != post_ev_bc_update_id); // Finally, check that there's nothing left to do on B<->C reconnect and the channel operates @@ -4162,7 +4155,7 @@ fn do_test_glacial_peer_cant_hang(hold_chan_a: bool) { // ...but once we complete the A<->B channel preimage persistence, the B<->C channel // unlocks and we send both peers commitment updates. - let (ab_update_id, _) = get_latest_mon_update_id(&nodes[1], chan_id_ab); + let (ab_update_id, _) = nodes[1].chain_monitor.get_latest_mon_update_id(chan_id_ab); assert!(nodes[1] .chain_monitor .chain_monitor @@ -5122,7 +5115,7 @@ fn test_mpp_claim_to_holding_cell() { check_added_monitors(&nodes[3], 2); // Complete the B <-> D monitor update, freeing the first fulfill. - let (latest_id, _) = get_latest_mon_update_id(&nodes[3], chan_3_id); + let (latest_id, _) = nodes[3].chain_monitor.get_latest_mon_update_id(chan_3_id); nodes[3].chain_monitor.chain_monitor.channel_monitor_updated(chan_3_id, latest_id).unwrap(); let mut b_claim = get_htlc_update_msgs(&nodes[3], &node_b_id); @@ -5133,7 +5126,7 @@ fn test_mpp_claim_to_holding_cell() { // Finally, complete the C <-> D monitor update. Previously, this unlock failed to be processed // due to the existence of the blocked RAA update above. - let (latest_id, _) = get_latest_mon_update_id(&nodes[3], chan_4_id); + let (latest_id, _) = nodes[3].chain_monitor.get_latest_mon_update_id(chan_4_id); nodes[3].chain_monitor.chain_monitor.channel_monitor_updated(chan_4_id, latest_id).unwrap(); // Once we process monitor events (in this case by checking for the `PaymentClaimed` event, the // RAA monitor update blocked above will be released. diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 532514a3ae9..1c7a99300f2 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -1277,7 +1277,11 @@ enum BackgroundEvent { /// Some [`ChannelMonitorUpdate`] (s) completed before we were serialized but we still have /// them marked pending, thus we need to run any [`MonitorUpdateCompletionAction`] (s) pending /// on a channel. - MonitorUpdatesComplete { counterparty_node_id: PublicKey, channel_id: ChannelId }, + MonitorUpdatesComplete { + counterparty_node_id: PublicKey, + channel_id: ChannelId, + highest_update_id_completed: u64, + }, } /// A pointer to a channel that is unblocked when an event is surfaced @@ -8136,9 +8140,11 @@ impl< /// Free the background events, generally called from [`PersistenceNotifierGuard`] constructors. /// /// Expects the caller to have a total_consistency_lock read lock. - #[rustfmt::skip] fn process_background_events(&self) -> NotifyOption { - debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread); + debug_assert_ne!( + self.total_consistency_lock.held_by_thread(), + LockHeldState::NotHeldByThread + ); self.background_events_processed_since_startup.store(true, Ordering::Release); @@ -8150,11 +8156,34 @@ impl< for event in background_events.drain(..) { match event { - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, channel_id, update } => { - self.apply_post_close_monitor_update(counterparty_node_id, channel_id, funding_txo, update); + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, + funding_txo, + channel_id, + update, + } => { + self.apply_post_close_monitor_update( + counterparty_node_id, + channel_id, + funding_txo, + update, + ); }, - BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id, channel_id } => { - self.channel_monitor_updated(&channel_id, None, &counterparty_node_id); + BackgroundEvent::MonitorUpdatesComplete { + counterparty_node_id, + channel_id, + highest_update_id_completed, + } => { + // Now that we can finally handle the background event, remove all in-flight + // monitor updates for this channel that we've known to complete, as they have + // already been persisted to the monitor and can be applied to our internal + // state such that the channel resumes operation if no new updates have been + // made since. + self.channel_monitor_updated( + &channel_id, + Some(highest_update_id_completed), + &counterparty_node_id, + ); }, } } @@ -18134,39 +18163,58 @@ impl< ($counterparty_node_id: expr, $chan_in_flight_upds: expr, $monitor: expr, $peer_state: expr, $logger: expr, $channel_info_log: expr ) => { { + // When all in-flight updates have completed after we were last serialized, we + // need to remove them. However, we can't guarantee that the next serialization + // will have happened after processing the + // `BackgroundEvent::MonitorUpdatesComplete`, so removing them now could lead to the + // channel never being resumed as the event would not be regenerated after another + // reload. At the same time, we don't want to resume the channel now because there + // may be post-update actions to handle. Therefore, we're forced to keep tracking + // the completed in-flight updates (but only when they have all completed) until we + // are processing the `BackgroundEvent::MonitorUpdatesComplete`. let mut max_in_flight_update_id = 0; - let starting_len = $chan_in_flight_upds.len(); - $chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id()); - if $chan_in_flight_upds.len() < starting_len { + let num_updates_completed = $chan_in_flight_upds + .iter() + .filter(|update| { + max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id); + update.update_id <= $monitor.get_latest_update_id() + }) + .count(); + if num_updates_completed > 0 { log_debug!( $logger, "{} ChannelMonitorUpdates completed after ChannelManager was last serialized", - starting_len - $chan_in_flight_upds.len() + num_updates_completed, ); } + let all_updates_completed = num_updates_completed == $chan_in_flight_upds.len(); + let funding_txo = $monitor.get_funding_txo(); - for update in $chan_in_flight_upds.iter() { - log_debug!($logger, "Replaying ChannelMonitorUpdate {} for {}channel {}", - update.update_id, $channel_info_log, &$monitor.channel_id()); - max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id); - pending_background_events.push( - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { - counterparty_node_id: $counterparty_node_id, - funding_txo: funding_txo, - channel_id: $monitor.channel_id(), - update: update.clone(), - }); - } - if $chan_in_flight_upds.is_empty() { - // We had some updates to apply, but it turns out they had completed before we - // were serialized, we just weren't notified of that. Thus, we may have to run - // the completion actions for any monitor updates, but otherwise are done. + if all_updates_completed { + log_debug!($logger, "All monitor updates completed since the ChannelManager was last serialized"); pending_background_events.push( BackgroundEvent::MonitorUpdatesComplete { counterparty_node_id: $counterparty_node_id, channel_id: $monitor.channel_id(), + highest_update_id_completed: max_in_flight_update_id, }); } else { + $chan_in_flight_upds.retain(|update| { + let replay = update.update_id > $monitor.get_latest_update_id(); + if replay { + log_debug!($logger, "Replaying ChannelMonitorUpdate {} for {}channel {}", + update.update_id, $channel_info_log, &$monitor.channel_id()); + pending_background_events.push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: $counterparty_node_id, + funding_txo: funding_txo, + channel_id: $monitor.channel_id(), + update: update.clone(), + } + ); + } + replay + }); $peer_state.closed_channel_monitor_update_ids.entry($monitor.channel_id()) .and_modify(|v| *v = cmp::max(max_in_flight_update_id, *v)) .or_insert(max_in_flight_update_id); diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index a8206dfe850..c0432051a62 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -1566,3 +1566,84 @@ fn test_peer_storage() { assert!(res.is_err()); } +#[test] +fn test_hold_completed_inflight_monitor_updates_upon_manager_reload() { + // Test that if a `ChannelMonitorUpdate` completes after the `ChannelManager` is serialized, + // but before it is deserialized, we hold any completed in-flight updates until background event + // processing. Previously, we would remove completed monitor updates from + // `in_flight_monitor_updates` during deserialization, relying on + // [`ChannelManager::process_background_events`] to eventually be called before the + // `ChannelManager` is serialized again such that the channel is resumed and further updates can + // be made. + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let (persister_a, persister_b); + let (chain_monitor_a, chain_monitor_b); + + let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); + let nodes_0_deserialized_a; + let nodes_0_deserialized_b; + + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1).2; + + send_payment(&nodes[0], &[&nodes[1]], 1_000_000); + + chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + + // Send a payment that will be pending due to an async monitor update. + let (route, payment_hash, _, payment_secret) = + get_route_and_payment_hash!(nodes[0], nodes[1], 1_000_000); + let payment_id = PaymentId(payment_hash.0); + let onion = RecipientOnionFields::secret_only(payment_secret); + nodes[0].node.send_payment_with_route(route, payment_hash, onion, payment_id).unwrap(); + check_added_monitors(&nodes[0], 1); + + assert!(nodes[0].node.get_and_clear_pending_msg_events().is_empty()); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + + // Serialize the ChannelManager while the monitor update is still in-flight. + let node_0_serialized = nodes[0].node.encode(); + + // Now complete the monitor update by calling force_channel_monitor_updated. + // This updates the monitor's state, but the ChannelManager still thinks it's pending. + let (_, latest_update_id) = nodes[0].chain_monitor.get_latest_mon_update_id(chan_id); + nodes[0].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_id, latest_update_id); + let monitor_serialized_updated = get_monitor!(nodes[0], chan_id).encode(); + + // Reload the node with the updated monitor. Upon deserialization, the ChannelManager will + // detect that the monitor update completed (monitor's update_id >= the in-flight update_id) + // and queue a `BackgroundEvent::MonitorUpdatesComplete`. + nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id()); + nodes[1].node.peer_disconnected(nodes[0].node.get_our_node_id()); + reload_node!( + nodes[0], + test_default_channel_config(), + &node_0_serialized, + &[&monitor_serialized_updated[..]], + persister_a, + chain_monitor_a, + nodes_0_deserialized_a + ); + + // If we serialize again, even though we haven't processed any background events yet, we should + // still see the `BackgroundEvent::MonitorUpdatesComplete` be regenerated on startup. + let node_0_serialized = nodes[0].node.encode(); + reload_node!( + nodes[0], + test_default_channel_config(), + &node_0_serialized, + &[&monitor_serialized_updated[..]], + persister_b, + chain_monitor_b, + nodes_0_deserialized_b + ); + + // Reconnect the nodes. We should finally see the `update_add_htlc` go out, as the reconnection + // should first process `BackgroundEvent::MonitorUpdatesComplete, allowing the channel to be + // resumed. + let mut reconnect_args = ReconnectArgs::new(&nodes[0], &nodes[1]); + reconnect_args.pending_htlc_adds = (0, 1); + reconnect_nodes(reconnect_args); +} + diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 34f5d5fe36e..d7f23d32e2a 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -581,6 +581,11 @@ impl<'a> TestChainMonitor<'a> { self.added_monitors.lock().unwrap().push((channel_id, monitor)); self.chain_monitor.load_existing_monitor(channel_id, new_monitor) } + + pub fn get_latest_mon_update_id(&self, channel_id: ChannelId) -> (u64, u64) { + let monitor_id_state = self.latest_monitor_update_id.lock().unwrap(); + monitor_id_state.get(&channel_id).unwrap().clone() + } } impl<'a> chain::Watch for TestChainMonitor<'a> { fn watch_channel(