diff --git a/Cargo.toml b/Cargo.toml index 3dcad31a5..e08a6b7e1 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -169,15 +169,15 @@ harness = false #vss-client-ng = { path = "../vss-client" } #vss-client-ng = { git = "https://github.com/lightningdevkit/vss-client", branch = "main" } # -#[patch."https://github.com/lightningdevkit/rust-lightning"] -#lightning = { path = "../rust-lightning/lightning" } -#lightning-types = { path = "../rust-lightning/lightning-types" } -#lightning-invoice = { path = "../rust-lightning/lightning-invoice" } -#lightning-net-tokio = { path = "../rust-lightning/lightning-net-tokio" } -#lightning-persister = { path = "../rust-lightning/lightning-persister" } -#lightning-background-processor = { path = "../rust-lightning/lightning-background-processor" } -#lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip-sync" } -#lightning-block-sync = { path = "../rust-lightning/lightning-block-sync" } -#lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync" } -#lightning-liquidity = { path = "../rust-lightning/lightning-liquidity" } -#lightning-macros = { path = "../rust-lightning/lightning-macros" } +[patch."https://github.com/lightningdevkit/rust-lightning"] +lightning = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-types = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-invoice = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-net-tokio = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-persister = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-background-processor = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-rapid-gossip-sync = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-block-sync = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-transaction-sync = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-liquidity = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } +lightning-macros = { git = "https://github.com/joostjager/rust-lightning", branch = "chain-mon-deferred-writes" } diff --git a/src/builder.rs b/src/builder.rs index 5d8a5a7a9..1beec8c70 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -599,7 +599,8 @@ impl NodeBuilder { BuildError::KVStoreSetupFailed })?; - self.build_with_store(node_entropy, vss_store) + // VSS handles its own persistence ordering, so don't use deferred chain monitor + self.build_with_store_internal(node_entropy, vss_store, false) } /// Builds a [`Node`] instance with a [VSS] backend and according to the options @@ -626,7 +627,8 @@ impl NodeBuilder { BuildError::KVStoreSetupFailed })?; - self.build_with_store(node_entropy, vss_store) + // VSS handles its own persistence ordering, so don't use deferred chain monitor + self.build_with_store_internal(node_entropy, vss_store, false) } /// Builds a [`Node`] instance with a [VSS] backend and according to the options @@ -651,12 +653,25 @@ impl NodeBuilder { BuildError::KVStoreSetupFailed })?; - self.build_with_store(node_entropy, vss_store) + // VSS handles its own persistence ordering, so don't use deferred chain monitor + self.build_with_store_internal(node_entropy, vss_store, false) } /// Builds a [`Node`] instance according to the options previously configured. + /// + /// For local storage backends (SQLite, filesystem), this uses `DeferredChainMonitor` which + /// enables safe persistence ordering where the `ChannelManager` is persisted before the + /// channel monitors. pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, + ) -> Result { + // Use deferred chain monitor for local storage backends + self.build_with_store_internal(node_entropy, kv_store, true) + } + + /// Internal method that builds a node with configurable chain monitor type. + fn build_with_store_internal( + &self, node_entropy: NodeEntropy, kv_store: S, use_deferred_chain_monitor: bool, ) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; @@ -683,6 +698,7 @@ impl NodeBuilder { runtime, logger, Arc::new(DynStoreWrapper(kv_store)), + use_deferred_chain_monitor, ) } } @@ -1028,13 +1044,18 @@ impl ArcedNodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. +/// +/// If `use_deferred_chain_monitor` is true, uses `DeferredChainMonitor` which defers monitor +/// operations until explicitly flushed. This enables safe persistence ordering where the +/// `ChannelManager` is persisted before the channel monitors. This should be used for local +/// storage backends (SQLite, etc.) but NOT for VSS which handles its own persistence ordering. fn build_with_store_internal( config: Arc, chain_data_source_config: Option<&ChainDataSourceConfig>, gossip_source_config: Option<&GossipSourceConfig>, liquidity_source_config: Option<&LiquiditySourceConfig>, pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>, async_payments_role: Option, seed_bytes: [u8; 64], runtime: Arc, - logger: Arc, kv_store: Arc, + logger: Arc, kv_store: Arc, use_deferred_chain_monitor: bool, ) -> Result { optionally_install_rustls_cryptoprovider(); @@ -1334,15 +1355,29 @@ fn build_with_store_internal( )); // Initialize the ChainMonitor - let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( - Some(Arc::clone(&chain_source)), - Arc::clone(&tx_broadcaster), - Arc::clone(&logger), - Arc::clone(&fee_estimator), - Arc::clone(&persister), - Arc::clone(&keys_manager), - peer_storage_key, - )); + // Use DeferredChainMonitor for local storage backends to enable safe persistence ordering. + // VSS handles its own persistence ordering, so it uses the regular ChainMonitor. + let chain_monitor: Arc = if use_deferred_chain_monitor { + Arc::new(ChainMonitor::Deferred(lightning::chain::deferred::DeferredChainMonitor::new( + Some(Arc::clone(&chain_source)), + Arc::clone(&tx_broadcaster), + Arc::clone(&logger), + Arc::clone(&fee_estimator), + Arc::clone(&persister), + Arc::clone(&keys_manager), + peer_storage_key, + ))) + } else { + Arc::new(ChainMonitor::Regular(chainmonitor::ChainMonitor::new( + Some(Arc::clone(&chain_source)), + Arc::clone(&tx_broadcaster), + Arc::clone(&logger), + Arc::clone(&fee_estimator), + Arc::clone(&persister), + Arc::clone(&keys_manager), + peer_storage_key, + ))) + }; // Initialize the network graph, scorer, and router let network_graph = match network_graph_res { diff --git a/src/types.rs b/src/types.rs index b5b1ffed7..8510fc782 100644 --- a/src/types.rs +++ b/src/types.rs @@ -14,9 +14,16 @@ use bitcoin::secp256k1::PublicKey; use bitcoin::{OutPoint, ScriptBuf}; use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; use lightning::chain::chainmonitor; +use lightning::chain::channelmonitor::{ChannelMonitor as LdkChannelMonitor, MonitorEvent}; +use lightning::chain::deferred; +use lightning::chain::transaction::TransactionData; +use lightning::chain::{BestBlock, ChannelMonitorUpdateStatus, Watch}; +use lightning::events::{EventHandler, EventsProvider}; use lightning::impl_writeable_tlv_based; use lightning::ln::channel_state::ChannelDetails as LdkChannelDetails; -use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress}; +use lightning::ln::msgs::RoutingMessageHandler; +use lightning::ln::msgs::SocketAddress; +use lightning::ln::msgs::{BaseMessageHandler, Init, MessageSendEvent, SendOnlyMessageHandler}; use lightning::ln::peer_handler::IgnoringMessageHandler; use lightning::ln::types::ChannelId; use lightning::routing::gossip; @@ -206,7 +213,7 @@ pub type Persister = MonitorUpdatingPersister< Arc, >; -pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< +pub(crate) type ChainMonitorImpl = chainmonitor::ChainMonitor< InMemorySigner, Arc, Arc, @@ -216,6 +223,241 @@ pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< Arc, >; +pub(crate) type DeferredChainMonitor = deferred::DeferredChainMonitor< + InMemorySigner, + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + +/// Enum wrapper that can hold either a regular ChainMonitor or a DeferredChainMonitor. +/// DeferredChainMonitor is used for non-VSS storage backends to ensure proper persistence ordering. +pub(crate) enum ChainMonitor { + /// Regular ChainMonitor - used with VSS storage backends + Regular(ChainMonitorImpl), + /// Deferred ChainMonitor - used with local storage backends (SQLite, etc.) + Deferred(DeferredChainMonitor), +} + +impl ChainMonitor { + /// Returns the inner ChainMonitorImpl, regardless of whether this is a regular or deferred monitor. + pub(crate) fn inner(&self) -> &ChainMonitorImpl { + match self { + ChainMonitor::Regular(cm) => cm, + ChainMonitor::Deferred(dcm) => dcm.inner(), + } + } + + /// Returns the number of pending operations. Always 0 for regular ChainMonitor. + pub(crate) fn pending_operation_count(&self) -> usize { + match self { + ChainMonitor::Regular(_) => 0, + ChainMonitor::Deferred(dcm) => { + use lightning::chain::chainmonitor::AChainMonitor; + dcm.pending_operation_count() + }, + } + } + + /// Flushes pending operations. No-op for regular ChainMonitor. + pub(crate) fn flush(&self, count: usize, logger: &Arc) { + match self { + ChainMonitor::Regular(_) => {}, + ChainMonitor::Deferred(dcm) => { + use lightning::chain::chainmonitor::AChainMonitor; + dcm.flush(count, logger) + }, + } + } + + /// Lists all channel monitor channel IDs. + pub(crate) fn list_monitors(&self) -> Vec { + self.inner().list_monitors() + } + + /// Gets a specific channel monitor. + pub(crate) fn get_monitor( + &self, channel_id: ChannelId, + ) -> Result, ()> { + self.inner().get_monitor(channel_id) + } + + /// Loads an existing channel monitor into the chain monitor. + pub(crate) fn load_existing_monitor( + &self, channel_id: ChannelId, monitor: LdkChannelMonitor, + ) -> Result { + self.inner().load_existing_monitor(channel_id, monitor) + } +} + +impl Watch for ChainMonitor { + fn watch_channel( + &self, channel_id: ChannelId, monitor: LdkChannelMonitor, + ) -> Result { + match self { + ChainMonitor::Regular(cm) => cm.watch_channel(channel_id, monitor), + ChainMonitor::Deferred(dcm) => dcm.watch_channel(channel_id, monitor), + } + } + + fn update_channel( + &self, channel_id: ChannelId, + update: &lightning::chain::channelmonitor::ChannelMonitorUpdate, + ) -> ChannelMonitorUpdateStatus { + match self { + ChainMonitor::Regular(cm) => cm.update_channel(channel_id, update), + ChainMonitor::Deferred(dcm) => dcm.update_channel(channel_id, update), + } + } + + fn release_pending_monitor_events( + &self, + ) -> Vec<(lightning::chain::transaction::OutPoint, ChannelId, Vec, PublicKey)> { + match self { + ChainMonitor::Regular(cm) => cm.release_pending_monitor_events(), + ChainMonitor::Deferred(dcm) => dcm.release_pending_monitor_events(), + } + } +} + +impl lightning::chain::Listen for ChainMonitor { + fn filtered_block_connected( + &self, header: &bitcoin::block::Header, txdata: &TransactionData, height: u32, + ) { + match self { + ChainMonitor::Regular(cm) => cm.filtered_block_connected(header, txdata, height), + ChainMonitor::Deferred(dcm) => dcm.filtered_block_connected(header, txdata, height), + } + } + + fn block_connected(&self, block: &bitcoin::Block, height: u32) { + match self { + ChainMonitor::Regular(cm) => cm.block_connected(block, height), + ChainMonitor::Deferred(dcm) => dcm.block_connected(block, height), + } + } + + fn blocks_disconnected(&self, fork_point: BestBlock) { + match self { + ChainMonitor::Regular(cm) => cm.blocks_disconnected(fork_point), + ChainMonitor::Deferred(dcm) => dcm.blocks_disconnected(fork_point), + } + } +} + +impl lightning::chain::Confirm for ChainMonitor { + fn transactions_confirmed( + &self, header: &bitcoin::block::Header, txdata: &TransactionData, height: u32, + ) { + match self { + ChainMonitor::Regular(cm) => cm.transactions_confirmed(header, txdata, height), + ChainMonitor::Deferred(dcm) => dcm.transactions_confirmed(header, txdata, height), + } + } + + fn transaction_unconfirmed(&self, txid: &bitcoin::Txid) { + match self { + ChainMonitor::Regular(cm) => cm.transaction_unconfirmed(txid), + ChainMonitor::Deferred(dcm) => dcm.transaction_unconfirmed(txid), + } + } + + fn best_block_updated(&self, header: &bitcoin::block::Header, height: u32) { + match self { + ChainMonitor::Regular(cm) => cm.best_block_updated(header, height), + ChainMonitor::Deferred(dcm) => dcm.best_block_updated(header, height), + } + } + + fn get_relevant_txids(&self) -> Vec<(bitcoin::Txid, u32, Option)> { + match self { + ChainMonitor::Regular(cm) => cm.get_relevant_txids(), + ChainMonitor::Deferred(dcm) => dcm.get_relevant_txids(), + } + } +} + +impl BaseMessageHandler for ChainMonitor { + fn get_and_clear_pending_msg_events(&self) -> Vec { + match self { + ChainMonitor::Regular(cm) => cm.get_and_clear_pending_msg_events(), + ChainMonitor::Deferred(dcm) => dcm.get_and_clear_pending_msg_events(), + } + } + + fn peer_disconnected(&self, their_node_id: PublicKey) { + match self { + ChainMonitor::Regular(cm) => cm.peer_disconnected(their_node_id), + ChainMonitor::Deferred(dcm) => dcm.peer_disconnected(their_node_id), + } + } + + fn provided_node_features(&self) -> lightning::types::features::NodeFeatures { + match self { + ChainMonitor::Regular(cm) => cm.provided_node_features(), + ChainMonitor::Deferred(dcm) => dcm.provided_node_features(), + } + } + + fn provided_init_features( + &self, their_node_id: PublicKey, + ) -> lightning::types::features::InitFeatures { + match self { + ChainMonitor::Regular(cm) => cm.provided_init_features(their_node_id), + ChainMonitor::Deferred(dcm) => dcm.provided_init_features(their_node_id), + } + } + + fn peer_connected( + &self, their_node_id: PublicKey, msg: &Init, inbound: bool, + ) -> Result<(), ()> { + match self { + ChainMonitor::Regular(cm) => cm.peer_connected(their_node_id, msg, inbound), + ChainMonitor::Deferred(dcm) => dcm.peer_connected(their_node_id, msg, inbound), + } + } +} + +impl SendOnlyMessageHandler for ChainMonitor {} + +impl EventsProvider for ChainMonitor { + fn process_pending_events(&self, handler: H) + where + H::Target: EventHandler, + { + match self { + ChainMonitor::Regular(cm) => cm.process_pending_events(handler), + ChainMonitor::Deferred(dcm) => dcm.process_pending_events(handler), + } + } +} + +impl lightning::chain::chainmonitor::AChainMonitor for ChainMonitor { + type Signer = InMemorySigner; + type Filter = Arc; + type Broadcaster = Arc; + type FeeEstimator = Arc; + type Logger = Arc; + type Persister = Arc; + type PersisterTarget = Persister; + type EntropySource = Arc; + + fn get_cm(&self) -> &ChainMonitorImpl { + self.inner() + } + + fn pending_operation_count(&self) -> usize { + ChainMonitor::pending_operation_count(self) + } + + fn flush(&self, count: usize, logger: &Arc) { + ChainMonitor::flush(self, count, logger) + } +} + pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager< SocketDescriptor, Arc,