From aa8bf83bb6276f2f16eac4116a46ffa1fcc9c2b2 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Tue, 3 Feb 2026 11:31:32 +0100 Subject: [PATCH] Use DeferredChainMonitor for non-VSS storage backends This change uses LDK's DeferredChainMonitor for local storage backends (SQLite, filesystem) instead of the regular ChainMonitor. The deferred variant queues watch_channel and update_channel operations for later flushing, enabling safe persistence ordering where the ChannelManager is persisted before the channel monitors. This ensures crash safety. VSS storage backends continue to use the regular ChainMonitor since VSS handles its own persistence ordering. The implementation: - Adds ChainMonitor enum that wraps both Regular and Deferred variants - Implements all required traits (Watch, Listen, Confirm, AChainMonitor, BaseMessageHandler, SendOnlyMessageHandler, EventsProvider) for the enum - Adds use_deferred_chain_monitor parameter to build_with_store_internal - Updates VSS build methods to use regular ChainMonitor (false) - Updates non-VSS build methods to use DeferredChainMonitor (true) Co-Authored-By: Claude Opus 4.5 --- Cargo.toml | 24 ++--- src/builder.rs | 61 +++++++++--- src/types.rs | 246 ++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 304 insertions(+), 27 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3dcad31a5..e08a6b7e1 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -169,15 +169,15 @@ harness = false #vss-client-ng = { path = "../vss-client" } #vss-client-ng = { git = "https://github.com/lightningdevkit/vss-client", branch = "main" } # -#[patch."https://github.com/lightningdevkit/rust-lightning"] -#lightning = { path = "../rust-lightning/lightning" } -#lightning-types = { path = "../rust-lightning/lightning-types" } -#lightning-invoice = { path = "../rust-lightning/lightning-invoice" } -#lightning-net-tokio = { path = "../rust-lightning/lightning-net-tokio" } -#lightning-persister = { path = "../rust-lightning/lightning-persister" } -#lightning-background-processor = { path = "../rust-lightning/lightning-background-processor" } -#lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip-sync" } -#lightning-block-sync = { path = "../rust-lightning/lightning-block-sync" } -#lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync" } -#lightning-liquidity = { path = "../rust-lightning/lightning-liquidity" } -#lightning-macros = { path = "../rust-lightning/lightning-macros" } +[patch."https://github.com/lightningdevkit/rust-lightning"] +lightning = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-types = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-invoice = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-net-tokio = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-persister = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-background-processor = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-rapid-gossip-sync = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-block-sync = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-transaction-sync = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-liquidity = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-macros = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } diff --git a/src/builder.rs b/src/builder.rs index 5d8a5a7a9..1beec8c70 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -599,7 +599,8 @@ impl NodeBuilder { BuildError::KVStoreSetupFailed })?; - self.build_with_store(node_entropy, vss_store) + // VSS handles its own persistence ordering, so don't use deferred chain monitor + self.build_with_store_internal(node_entropy, vss_store, false) } /// Builds a [`Node`] instance with a [VSS] backend and according to the options @@ -626,7 +627,8 @@ impl NodeBuilder { BuildError::KVStoreSetupFailed })?; - self.build_with_store(node_entropy, vss_store) + // VSS handles its own persistence ordering, so don't use deferred chain monitor + self.build_with_store_internal(node_entropy, vss_store, false) } /// Builds a [`Node`] instance with a [VSS] backend and according to the options @@ -651,12 +653,25 @@ impl NodeBuilder { BuildError::KVStoreSetupFailed })?; - self.build_with_store(node_entropy, vss_store) + // VSS handles its own persistence ordering, so don't use deferred chain monitor + self.build_with_store_internal(node_entropy, vss_store, false) } /// Builds a [`Node`] instance according to the options previously configured. + /// + /// For local storage backends (SQLite, filesystem), this uses `DeferredChainMonitor` which + /// enables safe persistence ordering where the `ChannelManager` is persisted before the + /// channel monitors. pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, + ) -> Result { + // Use deferred chain monitor for local storage backends + self.build_with_store_internal(node_entropy, kv_store, true) + } + + /// Internal method that builds a node with configurable chain monitor type. + fn build_with_store_internal( + &self, node_entropy: NodeEntropy, kv_store: S, use_deferred_chain_monitor: bool, ) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; @@ -683,6 +698,7 @@ impl NodeBuilder { runtime, logger, Arc::new(DynStoreWrapper(kv_store)), + use_deferred_chain_monitor, ) } } @@ -1028,13 +1044,18 @@ impl ArcedNodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. +/// +/// If `use_deferred_chain_monitor` is true, uses `DeferredChainMonitor` which defers monitor +/// operations until explicitly flushed. This enables safe persistence ordering where the +/// `ChannelManager` is persisted before the channel monitors. This should be used for local +/// storage backends (SQLite, etc.) but NOT for VSS which handles its own persistence ordering. fn build_with_store_internal( config: Arc, chain_data_source_config: Option<&ChainDataSourceConfig>, gossip_source_config: Option<&GossipSourceConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>, async_payments_role: Option, seed_bytes: [u8; 64], runtime: Arc, - logger: Arc, kv_store: Arc, + logger: Arc, kv_store: Arc, use_deferred_chain_monitor: bool, ) -> Result { optionally_install_rustls_cryptoprovider(); @@ -1334,15 +1355,29 @@ fn build_with_store_internal( )); // Initialize the ChainMonitor - let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( - Some(Arc::clone(&chain_source)), - Arc::clone(&tx_broadcaster), - Arc::clone(&logger), - Arc::clone(&fee_estimator), - Arc::clone(&persister), - Arc::clone(&keys_manager), - peer_storage_key, - )); + // Use DeferredChainMonitor for local storage backends to enable safe persistence ordering. + // VSS handles its own persistence ordering, so it uses the regular ChainMonitor. + let chain_monitor: Arc = if use_deferred_chain_monitor { + Arc::new(ChainMonitor::Deferred(lightning::chain::deferred::DeferredChainMonitor::new( + Some(Arc::clone(&chain_source)), + Arc::clone(&tx_broadcaster), + Arc::clone(&logger), + Arc::clone(&fee_estimator), + Arc::clone(&persister), + Arc::clone(&keys_manager), + peer_storage_key, + ))) + } else { + Arc::new(ChainMonitor::Regular(chainmonitor::ChainMonitor::new( + Some(Arc::clone(&chain_source)), + Arc::clone(&tx_broadcaster), + Arc::clone(&logger), + Arc::clone(&fee_estimator), + Arc::clone(&persister), + Arc::clone(&keys_manager), + peer_storage_key, + ))) + }; // Initialize the network graph, scorer, and router let network_graph = match network_graph_res { diff --git a/src/types.rs b/src/types.rs index b5b1ffed7..8510fc782 100644 --- a/src/types.rs +++ b/src/types.rs @@ -14,9 +14,16 @@ use bitcoin::secp256k1::PublicKey; use bitcoin::{OutPoint, ScriptBuf}; use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; use lightning::chain::chainmonitor; +use lightning::chain::channelmonitor::{ChannelMonitor as LdkChannelMonitor, MonitorEvent}; +use lightning::chain::deferred; +use lightning::chain::transaction::TransactionData; +use lightning::chain::{BestBlock, ChannelMonitorUpdateStatus, Watch}; +use lightning::events::{EventHandler, EventsProvider}; use lightning::impl_writeable_tlv_based; use lightning::ln::channel_state::ChannelDetails as LdkChannelDetails; -use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress}; +use lightning::ln::msgs::RoutingMessageHandler; +use lightning::ln::msgs::SocketAddress; +use lightning::ln::msgs::{BaseMessageHandler, Init, MessageSendEvent, SendOnlyMessageHandler}; use lightning::ln::peer_handler::IgnoringMessageHandler; use lightning::ln::types::ChannelId; use lightning::routing::gossip; @@ -206,7 +213,7 @@ pub type Persister = MonitorUpdatingPersister< Arc, >; -pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< +pub(crate) type ChainMonitorImpl = chainmonitor::ChainMonitor< InMemorySigner, Arc, Arc, @@ -216,6 +223,241 @@ pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< Arc, >; +pub(crate) type DeferredChainMonitor = deferred::DeferredChainMonitor< + InMemorySigner, + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + +/// Enum wrapper that can hold either a regular ChainMonitor or a DeferredChainMonitor. +/// DeferredChainMonitor is used for non-VSS storage backends to ensure proper persistence ordering. +pub(crate) enum ChainMonitor { + /// Regular ChainMonitor - used with VSS storage backends + Regular(ChainMonitorImpl), + /// Deferred ChainMonitor - used with local storage backends (SQLite, etc.) + Deferred(DeferredChainMonitor), +} + +impl ChainMonitor { + /// Returns the inner ChainMonitorImpl, regardless of whether this is a regular or deferred monitor. + pub(crate) fn inner(&self) -> &ChainMonitorImpl { + match self { + ChainMonitor::Regular(cm) => cm, + ChainMonitor::Deferred(dcm) => dcm.inner(), + } + } + + /// Returns the number of pending operations. Always 0 for regular ChainMonitor. + pub(crate) fn pending_operation_count(&self) -> usize { + match self { + ChainMonitor::Regular(_) => 0, + ChainMonitor::Deferred(dcm) => { + use lightning::chain::chainmonitor::AChainMonitor; + dcm.pending_operation_count() + }, + } + } + + /// Flushes pending operations. No-op for regular ChainMonitor. + pub(crate) fn flush(&self, count: usize, logger: &Arc) { + match self { + ChainMonitor::Regular(_) => {}, + ChainMonitor::Deferred(dcm) => { + use lightning::chain::chainmonitor::AChainMonitor; + dcm.flush(count, logger) + }, + } + } + + /// Lists all channel monitor channel IDs. + pub(crate) fn list_monitors(&self) -> Vec { + self.inner().list_monitors() + } + + /// Gets a specific channel monitor. + pub(crate) fn get_monitor( + &self, channel_id: ChannelId, + ) -> Result, ()> { + self.inner().get_monitor(channel_id) + } + + /// Loads an existing channel monitor into the chain monitor. + pub(crate) fn load_existing_monitor( + &self, channel_id: ChannelId, monitor: LdkChannelMonitor, + ) -> Result { + self.inner().load_existing_monitor(channel_id, monitor) + } +} + +impl Watch for ChainMonitor { + fn watch_channel( + &self, channel_id: ChannelId, monitor: LdkChannelMonitor, + ) -> Result { + match self { + ChainMonitor::Regular(cm) => cm.watch_channel(channel_id, monitor), + ChainMonitor::Deferred(dcm) => dcm.watch_channel(channel_id, monitor), + } + } + + fn update_channel( + &self, channel_id: ChannelId, + update: &lightning::chain::channelmonitor::ChannelMonitorUpdate, + ) -> ChannelMonitorUpdateStatus { + match self { + ChainMonitor::Regular(cm) => cm.update_channel(channel_id, update), + ChainMonitor::Deferred(dcm) => dcm.update_channel(channel_id, update), + } + } + + fn release_pending_monitor_events( + &self, + ) -> Vec<(lightning::chain::transaction::OutPoint, ChannelId, Vec, PublicKey)> { + match self { + ChainMonitor::Regular(cm) => cm.release_pending_monitor_events(), + ChainMonitor::Deferred(dcm) => dcm.release_pending_monitor_events(), + } + } +} + +impl lightning::chain::Listen for ChainMonitor { + fn filtered_block_connected( + &self, header: &bitcoin::block::Header, txdata: &TransactionData, height: u32, + ) { + match self { + ChainMonitor::Regular(cm) => cm.filtered_block_connected(header, txdata, height), + ChainMonitor::Deferred(dcm) => dcm.filtered_block_connected(header, txdata, height), + } + } + + fn block_connected(&self, block: &bitcoin::Block, height: u32) { + match self { + ChainMonitor::Regular(cm) => cm.block_connected(block, height), + ChainMonitor::Deferred(dcm) => dcm.block_connected(block, height), + } + } + + fn blocks_disconnected(&self, fork_point: BestBlock) { + match self { + ChainMonitor::Regular(cm) => cm.blocks_disconnected(fork_point), + ChainMonitor::Deferred(dcm) => dcm.blocks_disconnected(fork_point), + } + } +} + +impl lightning::chain::Confirm for ChainMonitor { + fn transactions_confirmed( + &self, header: &bitcoin::block::Header, txdata: &TransactionData, height: u32, + ) { + match self { + ChainMonitor::Regular(cm) => cm.transactions_confirmed(header, txdata, height), + ChainMonitor::Deferred(dcm) => dcm.transactions_confirmed(header, txdata, height), + } + } + + fn transaction_unconfirmed(&self, txid: &bitcoin::Txid) { + match self { + ChainMonitor::Regular(cm) => cm.transaction_unconfirmed(txid), + ChainMonitor::Deferred(dcm) => dcm.transaction_unconfirmed(txid), + } + } + + fn best_block_updated(&self, header: &bitcoin::block::Header, height: u32) { + match self { + ChainMonitor::Regular(cm) => cm.best_block_updated(header, height), + ChainMonitor::Deferred(dcm) => dcm.best_block_updated(header, height), + } + } + + fn get_relevant_txids(&self) -> Vec<(bitcoin::Txid, u32, Option)> { + match self { + ChainMonitor::Regular(cm) => cm.get_relevant_txids(), + ChainMonitor::Deferred(dcm) => dcm.get_relevant_txids(), + } + } +} + +impl BaseMessageHandler for ChainMonitor { + fn get_and_clear_pending_msg_events(&self) -> Vec { + match self { + ChainMonitor::Regular(cm) => cm.get_and_clear_pending_msg_events(), + ChainMonitor::Deferred(dcm) => dcm.get_and_clear_pending_msg_events(), + } + } + + fn peer_disconnected(&self, their_node_id: PublicKey) { + match self { + ChainMonitor::Regular(cm) => cm.peer_disconnected(their_node_id), + ChainMonitor::Deferred(dcm) => dcm.peer_disconnected(their_node_id), + } + } + + fn provided_node_features(&self) -> lightning::types::features::NodeFeatures { + match self { + ChainMonitor::Regular(cm) => cm.provided_node_features(), + ChainMonitor::Deferred(dcm) => dcm.provided_node_features(), + } + } + + fn provided_init_features( + &self, their_node_id: PublicKey, + ) -> lightning::types::features::InitFeatures { + match self { + ChainMonitor::Regular(cm) => cm.provided_init_features(their_node_id), + ChainMonitor::Deferred(dcm) => dcm.provided_init_features(their_node_id), + } + } + + fn peer_connected( + &self, their_node_id: PublicKey, msg: &Init, inbound: bool, + ) -> Result<(), ()> { + match self { + ChainMonitor::Regular(cm) => cm.peer_connected(their_node_id, msg, inbound), + ChainMonitor::Deferred(dcm) => dcm.peer_connected(their_node_id, msg, inbound), + } + } +} + +impl SendOnlyMessageHandler for ChainMonitor {} + +impl EventsProvider for ChainMonitor { + fn process_pending_events(&self, handler: H) + where + H::Target: EventHandler, + { + match self { + ChainMonitor::Regular(cm) => cm.process_pending_events(handler), + ChainMonitor::Deferred(dcm) => dcm.process_pending_events(handler), + } + } +} + +impl lightning::chain::chainmonitor::AChainMonitor for ChainMonitor { + type Signer = InMemorySigner; + type Filter = Arc; + type Broadcaster = Arc; + type FeeEstimator = Arc; + type Logger = Arc; + type Persister = Arc; + type PersisterTarget = Persister; + type EntropySource = Arc; + + fn get_cm(&self) -> &ChainMonitorImpl { + self.inner() + } + + fn pending_operation_count(&self) -> usize { + ChainMonitor::pending_operation_count(self) + } + + fn flush(&self, count: usize, logger: &Arc) { + ChainMonitor::flush(self, count, logger) + } +} + pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager< SocketDescriptor, Arc,