From a104e657a29b572aee71f838736b33c00d016e35 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Wed, 28 Jan 2026 08:43:13 -0600 Subject: [PATCH 1/3] prefactor: Refactor read_payments to be generic across other types --- src/io/utils.rs | 62 ++++++++++++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/src/io/utils.rs b/src/io/utils.rs index d2f70377b..5fe74e2ef 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -223,21 +223,17 @@ where }) } -/// Read previously persisted payments information from the store. -pub(crate) async fn read_payments( - kv_store: &DynStore, logger: L, -) -> Result, std::io::Error> +/// Generic helper to read persisted items from a KV store namespace. +async fn read_objects_from_store( + kv_store: &DynStore, logger: L, primary_namespace: &str, secondary_namespace: &str, +) -> Result, std::io::Error> where + T: Readable, L::Target: LdkLogger, { - let mut res = Vec::new(); + let mut stored_keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?; - let mut stored_keys = KVStore::list( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - ) - .await?; + let mut res = Vec::with_capacity(stored_keys.len()); const BATCH_SIZE: usize = 50; @@ -246,52 +242,44 @@ where // Fill JoinSet with tasks if possible while set.len() < BATCH_SIZE && !stored_keys.is_empty() { if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); + let fut = KVStore::read(&*kv_store, primary_namespace, secondary_namespace, &next_key); set.spawn(fut); debug_assert!(set.len() <= BATCH_SIZE); } } + let type_name = std::any::type_name::(); + while let Some(read_res) = set.join_next().await { // Exit early if we get an IO error. let reader = read_res .map_err(|e| { - log_error!(logger, "Failed to read PaymentDetails: {}", e); + log_error!(logger, "Failed to read {type_name}: {e}"); set.abort_all(); e })? .map_err(|e| { - log_error!(logger, "Failed to read PaymentDetails: {}", e); + log_error!(logger, "Failed to read {type_name}: {e}"); set.abort_all(); e })?; // Refill set for every finished future, if we still have something to do. if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read( - &*kv_store, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, - &next_key, - ); + let fut = KVStore::read(&*kv_store, primary_namespace, secondary_namespace, &next_key); set.spawn(fut); debug_assert!(set.len() <= BATCH_SIZE); } // Handle result. - let payment = PaymentDetails::read(&mut &*reader).map_err(|e| { - log_error!(logger, "Failed to deserialize PaymentDetails: {}", e); + let item = T::read(&mut &*reader).map_err(|e| { + log_error!(logger, "Failed to deserialize {type_name}: {e}"); std::io::Error::new( std::io::ErrorKind::InvalidData, - "Failed to deserialize PaymentDetails", + format!("Failed to deserialize {type_name}"), ) })?; - res.push(payment); + res.push(item); } debug_assert!(set.is_empty()); @@ -300,6 +288,22 @@ where Ok(res) } +/// Read previously persisted payments information from the store. +pub(crate) async fn read_payments( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + /// Read `OutputSweeper` state from the store. pub(crate) async fn read_output_sweeper( broadcaster: Arc, fee_estimator: Arc, From 06aee728f119701b7c717353d26e9eb712d2ff91 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Wed, 28 Jan 2026 08:44:37 -0600 Subject: [PATCH 2/3] Add storage for forwarded payments Routing nodes and LSPs want to track forwarded payments so they can run accounting on fees earned and track profitability across time. We now store these to make it easier to track and allows for future accounting utils in the future. This shouldn't effect edge user nodes as they should never be forwarding payments. Implementation is mostly just copied how we currently handle normal payments and adapted for forwarded payments. --- bindings/ldk_node.udl | 20 +++++++ src/builder.rs | 52 ++++++++++++----- src/event.rs | 57 ++++++++++++++++--- src/ffi/types.rs | 23 +++++++- src/io/mod.rs | 4 ++ src/io/utils.rs | 18 +++++- src/lib.rs | 40 +++++++++++-- src/payment/mod.rs | 3 +- src/payment/store.rs | 127 ++++++++++++++++++++++++++++++++++++++++++ src/types.rs | 2 + 10 files changed, 314 insertions(+), 32 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 6bd031379..d9827f0be 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -189,6 +189,8 @@ interface Node { void remove_payment([ByRef]PaymentId payment_id); BalanceDetails list_balances(); sequence list_payments(); + ForwardedPaymentDetails? forwarded_payment([ByRef]ForwardedPaymentId forwarded_payment_id); + sequence list_forwarded_payments(); sequence list_peers(); sequence list_channels(); NetworkGraph network_graph(); @@ -507,6 +509,21 @@ dictionary PaymentDetails { u64 latest_update_timestamp; }; +dictionary ForwardedPaymentDetails { + ForwardedPaymentId id; + ChannelId prev_channel_id; + ChannelId next_channel_id; + UserChannelId? prev_user_channel_id; + UserChannelId? next_user_channel_id; + PublicKey? prev_node_id; + PublicKey? next_node_id; + u64? total_fee_earned_msat; + u64? skimmed_fee_msat; + boolean claim_from_onchain_tx; + u64? outbound_amount_forwarded_msat; + u64 forwarded_at_timestamp; +}; + dictionary RouteParametersConfig { u64? max_total_routing_fee_msat; u32 max_total_cltv_expiry_delta; @@ -894,6 +911,9 @@ typedef string OfferId; [Custom] typedef string PaymentId; +[Custom] +typedef string ForwardedPaymentId; + [Custom] typedef string PaymentHash; diff --git a/src/builder.rs b/src/builder.rs index 5d8a5a7a9..3351942be 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -55,13 +55,15 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ - read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, - read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, - read_scorer, write_node_metrics, + read_event_queue, read_external_pathfinding_scores_from_cache, read_forwarded_payments, + read_network_graph, read_node_metrics, read_output_sweeper, read_payments, read_peer_info, + read_pending_payments, read_scorer, write_node_metrics, }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + self, FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -75,9 +77,9 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, - KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, - Persister, SyncAndAsyncKVStore, + AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, ForwardedPaymentStore, + GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, + PendingPaymentStore, Persister, SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1060,14 +1062,19 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res) = - runtime.block_on(async move { - tokio::join!( - read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), - read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), - read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)) - ) - }); + let ( + payment_store_res, + forwarded_payment_store_res, + node_metris_res, + pending_payment_store_res, + ) = runtime.block_on(async move { + tokio::join!( + read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_forwarded_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), + read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)) + ) + }); // Initialize the status fields. let node_metrics = match node_metris_res { @@ -1096,6 +1103,20 @@ fn build_with_store_internal( }, }; + let forwarded_payment_store = match forwarded_payment_store_res { + Ok(forwarded_payments) => Arc::new(ForwardedPaymentStore::new( + forwarded_payments, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read forwarded payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); @@ -1782,6 +1803,7 @@ fn build_with_store_internal( scorer, peer_store, payment_store, + forwarded_payment_store, is_running, node_metrics, om_mailbox, diff --git a/src/event.rs b/src/event.rs index 6f0ed8e09..a6e494a62 100644 --- a/src/event.rs +++ b/src/event.rs @@ -10,6 +10,7 @@ use core::task::{Poll, Waker}; use std::collections::VecDeque; use std::ops::Deref; use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; @@ -45,10 +46,13 @@ use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; use crate::payment::store::{ - PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, + ForwardedPaymentDetails, ForwardedPaymentId, PaymentDetails, PaymentDetailsUpdate, + PaymentDirection, PaymentKind, PaymentStatus, }; use crate::runtime::Runtime; -use crate::types::{CustomTlvRecord, DynStore, OnionMessenger, PaymentStore, Sweeper, Wallet}; +use crate::types::{ + CustomTlvRecord, DynStore, ForwardedPaymentStore, OnionMessenger, PaymentStore, Sweeper, Wallet, +}; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, UserChannelId, @@ -487,6 +491,7 @@ where network_graph: Arc, liquidity_source: Option>>>, payment_store: Arc, + forwarded_payment_store: Arc, peer_store: Arc>, runtime: Arc, logger: L, @@ -506,10 +511,10 @@ where channel_manager: Arc, connection_manager: Arc>, output_sweeper: Arc, network_graph: Arc, liquidity_source: Option>>>, - payment_store: Arc, peer_store: Arc>, - static_invoice_store: Option, onion_messenger: Arc, - om_mailbox: Option>, runtime: Arc, logger: L, - config: Arc, + payment_store: Arc, forwarded_payment_store: Arc, + peer_store: Arc>, static_invoice_store: Option, + onion_messenger: Arc, om_mailbox: Option>, + runtime: Arc, logger: L, config: Arc, ) -> Self { Self { event_queue, @@ -521,6 +526,7 @@ where network_graph, liquidity_source, payment_store, + forwarded_payment_store, peer_store, logger, runtime, @@ -1364,9 +1370,44 @@ where .await; } + // Store the forwarded payment details + let prev_channel_id_value = prev_channel_id + .expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."); + let next_channel_id_value = next_channel_id + .expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."); + + // PaymentForwarded does not have a unique id, so we generate a random one here. + let mut id_bytes = [0u8; 32]; + rng().fill(&mut id_bytes); + + let forwarded_at_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("SystemTime::now() should come after SystemTime::UNIX_EPOCH") + .as_secs(); + + let forwarded_payment = ForwardedPaymentDetails { + id: ForwardedPaymentId(id_bytes), + prev_channel_id: prev_channel_id_value, + next_channel_id: next_channel_id_value, + prev_user_channel_id: prev_user_channel_id.map(UserChannelId), + next_user_channel_id: next_user_channel_id.map(UserChannelId), + prev_node_id, + next_node_id, + total_fee_earned_msat, + skimmed_fee_msat, + claim_from_onchain_tx, + outbound_amount_forwarded_msat, + forwarded_at_timestamp, + }; + + self.forwarded_payment_store.insert(forwarded_payment).map_err(|e| { + log_error!(self.logger, "Failed to store forwarded payment: {e}"); + ReplayEvent() + })?; + let event = Event::PaymentForwarded { - prev_channel_id: prev_channel_id.expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."), - next_channel_id: next_channel_id.expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."), + prev_channel_id: prev_channel_id_value, + next_channel_id: next_channel_id_value, prev_user_channel_id: prev_user_channel_id.map(UserChannelId), next_user_channel_id: next_user_channel_id.map(UserChannelId), prev_node_id, diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 2a349a967..d63c42bd5 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -54,9 +54,10 @@ pub use crate::graph::{ChannelInfo, ChannelUpdateInfo, NodeAnnouncementInfo, Nod pub use crate::liquidity::{LSPS1OrderStatus, LSPS2ServiceConfig}; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; pub use crate::payment::store::{ - ConfirmationStatus, LSPFeeLimits, PaymentDirection, PaymentKind, PaymentStatus, + ConfirmationStatus, ForwardedPaymentId, LSPFeeLimits, PaymentDirection, PaymentKind, + PaymentStatus, }; -pub use crate::payment::UnifiedPaymentResult; +pub use crate::payment::{ForwardedPaymentDetails, UnifiedPaymentResult}; use crate::{hex_utils, SocketAddress, UniffiCustomTypeConverter, UserChannelId}; impl UniffiCustomTypeConverter for PublicKey { @@ -722,6 +723,24 @@ impl UniffiCustomTypeConverter for PaymentId { } } +impl UniffiCustomTypeConverter for ForwardedPaymentId { + type Builtin = String; + + fn into_custom(val: Self::Builtin) -> uniffi::Result { + if let Some(bytes_vec) = hex_utils::to_vec(&val) { + let bytes_res = bytes_vec.try_into(); + if let Ok(bytes) = bytes_res { + return Ok(ForwardedPaymentId(bytes)); + } + } + Err(Error::InvalidPaymentId.into()) + } + + fn from_custom(obj: Self) -> Self::Builtin { + hex_utils::to_string(&obj.0) + } +} + impl UniffiCustomTypeConverter for PaymentHash { type Builtin = String; diff --git a/src/io/mod.rs b/src/io/mod.rs index e080d39f7..b381c7261 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -27,6 +27,10 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments"; pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The forwarded payment information will be persisted under this prefix. +pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "forwarded_payments"; +pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// The node metrics will be persisted under this key. pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = ""; pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/io/utils.rs b/src/io/utils.rs index 5fe74e2ef..f94122434 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -46,7 +46,7 @@ use crate::io::{ NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, }; use crate::logger::{log_error, LdkLogger, Logger}; -use crate::payment::PendingPaymentDetails; +use crate::payment::{ForwardedPaymentDetails, PendingPaymentDetails}; use crate::peer_store::PeerStore; use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper}; @@ -304,6 +304,22 @@ where .await } +/// Read previously persisted forwarded payments information from the store. +pub(crate) async fn read_forwarded_payments( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + /// Read `OutputSweeper` state from the store. pub(crate) async fn read_output_sweeper( broadcaster: Arc, fee_estimator: Arc, diff --git a/src/lib.rs b/src/lib.rs index d2222d949..0bea78471 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -153,16 +153,16 @@ use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use payment::asynchronous::om_mailbox::OnionMessageMailbox; use payment::asynchronous::static_invoice_store::StaticInvoiceStore; use payment::{ - Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment, - UnifiedPayment, + Bolt11Payment, Bolt12Payment, ForwardedPaymentDetails, ForwardedPaymentId, OnchainPayment, + PaymentDetails, SpontaneousPayment, UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; use rand::Rng; use runtime::Runtime; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, - Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, + ForwardedPaymentStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, + PeerManager, Router, Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId}; pub use { @@ -222,6 +222,7 @@ pub struct Node { scorer: Arc>, peer_store: Arc>>, payment_store: Arc, + forwarded_payment_store: Arc, is_running: Arc>, node_metrics: Arc>, om_mailbox: Option>, @@ -573,6 +574,7 @@ impl Node { Arc::clone(&self.network_graph), self.liquidity_source.clone(), Arc::clone(&self.payment_store), + Arc::clone(&self.forwarded_payment_store), Arc::clone(&self.peer_store), static_invoice_store, Arc::clone(&self.onion_messenger), @@ -1692,6 +1694,34 @@ impl Node { self.payment_store.list_filter(|_| true) } + /// Retrieve the details of a specific forwarded payment with the given id. + /// + /// Returns `Some` if the forwarded payment was known and `None` otherwise. + pub fn forwarded_payment( + &self, forwarded_payment_id: &ForwardedPaymentId, + ) -> Option { + self.forwarded_payment_store.get(forwarded_payment_id) + } + + /// Retrieves all forwarded payments that match the given predicate. + /// + /// For example, to list all forwarded payments that earned at least 1000 msat in fees: + /// ```ignore + /// node.list_forwarded_payments_with_filter(|p| { + /// p.total_fee_earned_msat.unwrap_or(0) >= 1000 + /// }); + /// ``` + pub fn list_forwarded_payments_with_filter bool>( + &self, f: F, + ) -> Vec { + self.forwarded_payment_store.list_filter(f) + } + + /// Retrieves all forwarded payments. + pub fn list_forwarded_payments(&self) -> Vec { + self.forwarded_payment_store.list_filter(|_| true) + } + /// Retrieves a list of known peers. pub fn list_peers(&self) -> Vec { let mut peers = Vec::new(); diff --git a/src/payment/mod.rs b/src/payment/mod.rs index 42b5aff3b..39a9a336a 100644 --- a/src/payment/mod.rs +++ b/src/payment/mod.rs @@ -22,6 +22,7 @@ pub use onchain::OnchainPayment; pub use pending_payment_store::PendingPaymentDetails; pub use spontaneous::SpontaneousPayment; pub use store::{ - ConfirmationStatus, LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, + ConfirmationStatus, ForwardedPaymentDetails, ForwardedPaymentId, LSPFeeLimits, PaymentDetails, + PaymentDirection, PaymentKind, PaymentStatus, }; pub use unified::{UnifiedPayment, UnifiedPaymentResult}; diff --git a/src/payment/store.rs b/src/payment/store.rs index 15e94190c..8722b9a8f 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -7,9 +7,11 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use bitcoin::secp256k1::PublicKey; use bitcoin::{BlockHash, Txid}; use lightning::ln::channelmanager::PaymentId; use lightning::ln::msgs::DecodeError; +use lightning::ln::types::ChannelId; use lightning::offers::offer::OfferId; use lightning::util::ser::{Readable, Writeable}; use lightning::{ @@ -21,6 +23,7 @@ use lightning_types::string::UntrustedString; use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate}; use crate::hex_utils; +use crate::types::UserChannelId; /// Represents a payment. #[derive(Clone, Debug, PartialEq, Eq)] @@ -762,3 +765,127 @@ mod tests { } } } + +/// A unique identifier for a forwarded payment. +/// +/// This will be a randomly generated 32-byte identifier. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ForwardedPaymentId(pub [u8; 32]); + +impl StorableObjectId for ForwardedPaymentId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +impl Writeable for ForwardedPaymentId { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + self.0.write(writer) + } +} + +impl Readable for ForwardedPaymentId { + fn read(reader: &mut R) -> Result { + Ok(Self(Readable::read(reader)?)) + } +} + +/// Details of a payment that has been forwarded through this node. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ForwardedPaymentDetails { + /// A unique identifier for this forwarded payment. + pub id: ForwardedPaymentId, + /// The channel id of the incoming channel between the previous node and us. + pub prev_channel_id: ChannelId, + /// The channel id of the outgoing channel between the next node and us. + pub next_channel_id: ChannelId, + /// The `user_channel_id` of the incoming channel between the previous node and us. + /// + /// This is only None for events generated or serialized by versions prior to 0.3.0. + pub prev_user_channel_id: Option, + /// The `user_channel_id` of the outgoing channel between the next node and us. + /// + /// This will be `None` if the payment was settled via an on-chain transaction or if the + /// event was generated or serialized by versions prior to 0.3.0. + pub next_user_channel_id: Option, + /// The node id of the previous node. + pub prev_node_id: Option, + /// The node id of the next node. + pub next_node_id: Option, + /// The total fee, in milli-satoshis, which was earned as a result of the payment. + /// + /// Note that if we force-closed the channel over which we forwarded an HTLC while the HTLC + /// was pending, the amount the next hop claimed will have been rounded down to the nearest + /// whole satoshi. Thus, the fee calculated here may be higher than expected as we still + /// claimed the full value in millisatoshis from the source. + /// + /// If the channel which sent us the payment has been force-closed, we will claim the funds + /// via an on-chain transaction. In that case we do not yet know the on-chain transaction + /// fees which we will spend and will instead set this to `None`. It is possible duplicate + /// `PaymentForwarded` events are generated for the same payment iff `total_fee_earned_msat` + /// is `None`. + pub total_fee_earned_msat: Option, + /// The share of the total fee, in milli-satoshis, which was withheld in addition to the + /// forwarding fee. + /// + /// This will be `None` if no fee was skimmed from the forwarded HTLC. + pub skimmed_fee_msat: Option, + /// If this is `true`, the forwarded HTLC was claimed by our counterparty via an on-chain + /// transaction. + pub claim_from_onchain_tx: bool, + /// The final amount forwarded, in milli-satoshis, after the fee is deducted. + /// + /// The caveat described above the total_fee_earned_msat field applies here as well. + pub outbound_amount_forwarded_msat: Option, + /// The timestamp, in seconds since start of the UNIX epoch, when the payment was forwarded. + pub forwarded_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ForwardedPaymentDetails, { + (0, id, required), + (2, prev_channel_id, required), + (4, next_channel_id, required), + (6, prev_user_channel_id, option), + (8, next_user_channel_id, option), + (10, prev_node_id, option), + (12, next_node_id, option), + (14, total_fee_earned_msat, option), + (16, skimmed_fee_msat, option), + (18, claim_from_onchain_tx, required), + (20, outbound_amount_forwarded_msat, option), + (22, forwarded_at_timestamp, required), +}); + +/// A no-op update type for [`ForwardedPaymentDetails`]. +/// +/// Forwarded payments are immutable once stored, so updates are not supported. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) struct ForwardedPaymentDetailsUpdate { + id: ForwardedPaymentId, +} + +impl StorableObjectUpdate for ForwardedPaymentDetailsUpdate { + fn id(&self) -> ForwardedPaymentId { + self.id + } +} + +impl StorableObject for ForwardedPaymentDetails { + type Id = ForwardedPaymentId; + type Update = ForwardedPaymentDetailsUpdate; + + fn id(&self) -> Self::Id { + self.id + } + + fn update(&mut self, _update: &Self::Update) -> bool { + // Forwarded payments are immutable, so updates are no-ops. + false + } + + fn to_update(&self) -> Self::Update { + ForwardedPaymentDetailsUpdate { id: self.id } + } +} diff --git a/src/types.rs b/src/types.rs index b5b1ffed7..13e74f09f 100644 --- a/src/types.rs +++ b/src/types.rs @@ -39,6 +39,7 @@ use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; +use crate::payment::store::ForwardedPaymentDetails; use crate::payment::{PaymentDetails, PendingPaymentDetails}; use crate::runtime::RuntimeSpawner; @@ -320,6 +321,7 @@ pub(crate) type BumpTransactionEventHandler = >; pub(crate) type PaymentStore = DataStore>; +pub(crate) type ForwardedPaymentStore = DataStore>; /// A local, potentially user-provided, identifier of a channel. /// From c3f7714435bfef642169f8784e3c87b9af614c99 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Wed, 4 Feb 2026 14:55:32 -0600 Subject: [PATCH 3/3] Add per-channel forwarding statistics Track aggregate stats (fees earned, payment counts, amounts) per channel. Add ForwardedPaymentTrackingMode config: Stats (default) for lightweight metrics only, or Detailed to also store individual payment records. Co-Authored-By: Claude Opus 4.5 --- bindings/ldk_node.udl | 23 ++++++ src/builder.rs | 34 +++++++-- src/config.rs | 28 +++++++- src/event.rs | 115 +++++++++++++++++++++++------- src/ffi/types.rs | 5 +- src/io/mod.rs | 5 ++ src/io/utils.rs | 22 +++++- src/lib.rs | 54 ++++++++++++-- src/payment/mod.rs | 4 +- src/payment/store.rs | 162 ++++++++++++++++++++++++++++++++++++++++++ src/types.rs | 3 +- 11 files changed, 408 insertions(+), 47 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index d9827f0be..501e4edd6 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -13,6 +13,7 @@ dictionary Config { u64 probing_liquidity_limit_multiplier; AnchorChannelsConfig? anchor_channels_config; RouteParametersConfig? route_parameters; + ForwardedPaymentTrackingMode forwarded_payment_tracking_mode; }; dictionary AnchorChannelsConfig { @@ -191,6 +192,9 @@ interface Node { sequence list_payments(); ForwardedPaymentDetails? forwarded_payment([ByRef]ForwardedPaymentId forwarded_payment_id); sequence list_forwarded_payments(); + ForwardedPaymentTrackingMode forwarded_payment_tracking_mode(); + ChannelForwardingStats? channel_forwarding_stats([ByRef]ChannelId channel_id); + sequence list_channel_forwarding_stats(); sequence list_peers(); sequence list_channels(); NetworkGraph network_graph(); @@ -488,6 +492,11 @@ enum PaymentStatus { "Failed", }; +enum ForwardedPaymentTrackingMode { + "Detailed", + "Stats", +}; + dictionary LSPFeeLimits { u64? max_total_opening_fee_msat; u64? max_proportional_opening_fee_ppm_msat; @@ -524,6 +533,20 @@ dictionary ForwardedPaymentDetails { u64 forwarded_at_timestamp; }; +dictionary ChannelForwardingStats { + ChannelId channel_id; + PublicKey? counterparty_node_id; + u64 inbound_payments_forwarded; + u64 outbound_payments_forwarded; + u64 total_inbound_amount_msat; + u64 total_outbound_amount_msat; + u64 total_fee_earned_msat; + u64 total_skimmed_fee_msat; + u64 onchain_claims_count; + u64 first_forwarded_at_timestamp; + u64 last_forwarded_at_timestamp; +}; + dictionary RouteParametersConfig { u64? max_total_routing_fee_msat; u32 max_total_cltv_expiry_delta; diff --git a/src/builder.rs b/src/builder.rs index 3351942be..de0eb527a 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -55,13 +55,15 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ - read_event_queue, read_external_pathfinding_scores_from_cache, read_forwarded_payments, - read_network_graph, read_node_metrics, read_output_sweeper, read_payments, read_peer_info, - read_pending_payments, read_scorer, write_node_metrics, + read_channel_forwarding_stats, read_event_queue, read_external_pathfinding_scores_from_cache, + read_forwarded_payments, read_network_graph, read_node_metrics, read_output_sweeper, + read_payments, read_peer_info, read_pending_payments, read_scorer, write_node_metrics, }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + self, CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, @@ -77,9 +79,10 @@ use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, ForwardedPaymentStore, - GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, - PendingPaymentStore, Persister, SyncAndAsyncKVStore, + AsyncPersister, ChainMonitor, ChannelForwardingStatsStore, ChannelManager, DynStore, + DynStoreWrapper, ForwardedPaymentStore, GossipSync, Graph, KeysManager, MessageRouter, + OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, Persister, + SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1065,12 +1068,14 @@ fn build_with_store_internal( let ( payment_store_res, forwarded_payment_store_res, + channel_forwarding_stats_res, node_metris_res, pending_payment_store_res, ) = runtime.block_on(async move { tokio::join!( read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), read_forwarded_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_channel_forwarding_stats(&*kv_store_ref, Arc::clone(&logger_ref)), read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)) ) @@ -1117,6 +1122,20 @@ fn build_with_store_internal( }, }; + let channel_forwarding_stats_store = match channel_forwarding_stats_res { + Ok(stats) => Arc::new(ChannelForwardingStatsStore::new( + stats, + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read channel forwarding stats from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); @@ -1804,6 +1823,7 @@ fn build_with_store_internal( peer_store, payment_store, forwarded_payment_store, + channel_forwarding_stats_store, is_running, node_metrics, om_mailbox, diff --git a/src/config.rs b/src/config.rs index 103b74657..d38542273 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,6 +21,24 @@ use lightning::util::config::{ use crate::logger::LogLevel; +/// The mode used for tracking forwarded payments. +/// +/// This determines how much detail is stored about payment forwarding activity. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum ForwardedPaymentTrackingMode { + /// Store every individual forwarded payment AND track per-channel aggregate statistics. + /// + /// Use this when you need full history of forwarded payments for accounting, debugging, + /// or detailed analytics. + Detailed, + /// Track only per-channel aggregate statistics without storing individual payment records. + /// + /// This is the default mode. Use this to reduce storage requirements when you only need + /// aggregate metrics like total fees earned per channel. + #[default] + Stats, +} + // Config defaults const DEFAULT_NETWORK: Network = Network::Bitcoin; const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80; @@ -127,9 +145,10 @@ pub(crate) const HRN_RESOLUTION_TIMEOUT_SECS: u64 = 5; /// | `probing_liquidity_limit_multiplier` | 3 | /// | `log_level` | Debug | /// | `anchor_channels_config` | Some(..) | -/// | `route_parameters` | None | +/// | `route_parameters` | None | +/// | `forwarded_payment_tracking_mode` | Detailed | /// -/// See [`AnchorChannelsConfig`] and [`RouteParametersConfig`] for more information regarding their +/// See [`AnchorChannelsConfig`], [`RouteParametersConfig`], and [`ForwardedPaymentTrackingMode`] for more information regarding their /// respective default values. /// /// [`Node`]: crate::Node @@ -192,6 +211,10 @@ pub struct Config { /// **Note:** If unset, default parameters will be used, and you will be able to override the /// parameters on a per-payment basis in the corresponding method calls. pub route_parameters: Option, + /// The mode used for tracking forwarded payments. + /// + /// See [`ForwardedPaymentTrackingMode`] for more information on the available modes. + pub forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode, } impl Default for Config { @@ -206,6 +229,7 @@ impl Default for Config { anchor_channels_config: Some(AnchorChannelsConfig::default()), route_parameters: None, node_alias: None, + forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode::default(), } } } diff --git a/src/event.rs b/src/event.rs index a6e494a62..6f7b8914c 100644 --- a/src/event.rs +++ b/src/event.rs @@ -33,7 +33,7 @@ use lightning_liquidity::lsps2::utils::compute_opening_fee; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use rand::{rng, Rng}; -use crate::config::{may_announce_channel, Config}; +use crate::config::{may_announce_channel, Config, ForwardedPaymentTrackingMode}; use crate::connection::ConnectionManager; use crate::data_store::DataStoreUpdateResult; use crate::fee_estimator::ConfirmationTarget; @@ -46,12 +46,13 @@ use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; use crate::payment::store::{ - ForwardedPaymentDetails, ForwardedPaymentId, PaymentDetails, PaymentDetailsUpdate, - PaymentDirection, PaymentKind, PaymentStatus, + ChannelForwardingStats, ForwardedPaymentDetails, ForwardedPaymentId, PaymentDetails, + PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, }; use crate::runtime::Runtime; use crate::types::{ - CustomTlvRecord, DynStore, ForwardedPaymentStore, OnionMessenger, PaymentStore, Sweeper, Wallet, + ChannelForwardingStatsStore, CustomTlvRecord, DynStore, ForwardedPaymentStore, OnionMessenger, + PaymentStore, Sweeper, Wallet, }; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, @@ -492,6 +493,7 @@ where liquidity_source: Option>>>, payment_store: Arc, forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, peer_store: Arc>, runtime: Arc, logger: L, @@ -512,6 +514,7 @@ where output_sweeper: Arc, network_graph: Arc, liquidity_source: Option>>>, payment_store: Arc, forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, peer_store: Arc>, static_invoice_store: Option, onion_messenger: Arc, om_mailbox: Option>, runtime: Arc, logger: L, config: Arc, @@ -527,6 +530,7 @@ where liquidity_source, payment_store, forwarded_payment_store, + channel_forwarding_stats_store, peer_store, logger, runtime, @@ -1370,40 +1374,99 @@ where .await; } - // Store the forwarded payment details let prev_channel_id_value = prev_channel_id .expect("prev_channel_id expected for events generated by LDK versions greater than 0.0.107."); let next_channel_id_value = next_channel_id .expect("next_channel_id expected for events generated by LDK versions greater than 0.0.107."); - // PaymentForwarded does not have a unique id, so we generate a random one here. - let mut id_bytes = [0u8; 32]; - rng().fill(&mut id_bytes); - let forwarded_at_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("SystemTime::now() should come after SystemTime::UNIX_EPOCH") .as_secs(); - let forwarded_payment = ForwardedPaymentDetails { - id: ForwardedPaymentId(id_bytes), - prev_channel_id: prev_channel_id_value, - next_channel_id: next_channel_id_value, - prev_user_channel_id: prev_user_channel_id.map(UserChannelId), - next_user_channel_id: next_user_channel_id.map(UserChannelId), - prev_node_id, - next_node_id, - total_fee_earned_msat, - skimmed_fee_msat, - claim_from_onchain_tx, - outbound_amount_forwarded_msat, - forwarded_at_timestamp, + // Calculate inbound amount (outbound + fee) + let inbound_amount_msat = outbound_amount_forwarded_msat + .unwrap_or(0) + .saturating_add(total_fee_earned_msat.unwrap_or(0)); + + // Update per-channel forwarding stats for the inbound channel (prev_channel) + // For new entries, this becomes the initial value; for existing entries, + // these values are used as increments via the to_update() -> update() pattern. + let inbound_stats = ChannelForwardingStats { + channel_id: prev_channel_id_value, + counterparty_node_id: prev_node_id, + inbound_payments_forwarded: 1, + outbound_payments_forwarded: 0, + total_inbound_amount_msat: inbound_amount_msat, + total_outbound_amount_msat: 0, + total_fee_earned_msat: total_fee_earned_msat.unwrap_or(0), + total_skimmed_fee_msat: skimmed_fee_msat.unwrap_or(0), + onchain_claims_count: if claim_from_onchain_tx { 1 } else { 0 }, + first_forwarded_at_timestamp: forwarded_at_timestamp, + last_forwarded_at_timestamp: forwarded_at_timestamp, + }; + self.channel_forwarding_stats_store + .insert_or_update(inbound_stats) + .map_err(|e| { + log_error!( + self.logger, + "Failed to update inbound channel forwarding stats: {e}" + ); + ReplayEvent() + })?; + + // Update per-channel forwarding stats for the outbound channel (next_channel) + let outbound_stats = ChannelForwardingStats { + channel_id: next_channel_id_value, + counterparty_node_id: next_node_id, + inbound_payments_forwarded: 0, + outbound_payments_forwarded: 1, + total_inbound_amount_msat: 0, + total_outbound_amount_msat: outbound_amount_forwarded_msat.unwrap_or(0), + total_fee_earned_msat: 0, + total_skimmed_fee_msat: 0, + onchain_claims_count: 0, + first_forwarded_at_timestamp: forwarded_at_timestamp, + last_forwarded_at_timestamp: forwarded_at_timestamp, }; + self.channel_forwarding_stats_store + .insert_or_update(outbound_stats) + .map_err(|e| { + log_error!( + self.logger, + "Failed to update outbound channel forwarding stats: {e}" + ); + ReplayEvent() + })?; - self.forwarded_payment_store.insert(forwarded_payment).map_err(|e| { - log_error!(self.logger, "Failed to store forwarded payment: {e}"); - ReplayEvent() - })?; + // Only store individual forwarded payment details in Detailed mode + if self.config.forwarded_payment_tracking_mode + == ForwardedPaymentTrackingMode::Detailed + { + // PaymentForwarded does not have a unique id, so we generate a random one here. + let mut id_bytes = [0u8; 32]; + rng().fill(&mut id_bytes); + + let forwarded_payment = ForwardedPaymentDetails { + id: ForwardedPaymentId(id_bytes), + prev_channel_id: prev_channel_id_value, + next_channel_id: next_channel_id_value, + prev_user_channel_id: prev_user_channel_id.map(UserChannelId), + next_user_channel_id: next_user_channel_id.map(UserChannelId), + prev_node_id, + next_node_id, + total_fee_earned_msat, + skimmed_fee_msat, + claim_from_onchain_tx, + outbound_amount_forwarded_msat, + forwarded_at_timestamp, + }; + + self.forwarded_payment_store.insert(forwarded_payment).map_err(|e| { + log_error!(self.logger, "Failed to store forwarded payment: {e}"); + ReplayEvent() + })?; + } let event = Event::PaymentForwarded { prev_channel_id: prev_channel_id_value, diff --git a/src/ffi/types.rs b/src/ffi/types.rs index d63c42bd5..84b1d5577 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -54,10 +54,11 @@ pub use crate::graph::{ChannelInfo, ChannelUpdateInfo, NodeAnnouncementInfo, Nod pub use crate::liquidity::{LSPS1OrderStatus, LSPS2ServiceConfig}; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; pub use crate::payment::store::{ - ConfirmationStatus, ForwardedPaymentId, LSPFeeLimits, PaymentDirection, PaymentKind, - PaymentStatus, + ChannelForwardingStats, ConfirmationStatus, ForwardedPaymentId, LSPFeeLimits, PaymentDirection, + PaymentKind, PaymentStatus, }; pub use crate::payment::{ForwardedPaymentDetails, UnifiedPaymentResult}; +pub use crate::config::ForwardedPaymentTrackingMode; use crate::{hex_utils, SocketAddress, UniffiCustomTypeConverter, UserChannelId}; impl UniffiCustomTypeConverter for PublicKey { diff --git a/src/io/mod.rs b/src/io/mod.rs index b381c7261..d6c4d0c68 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -31,6 +31,11 @@ pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "forwarded_payments"; pub(crate) const FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The channel forwarding stats will be persisted under this prefix. +pub(crate) const CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE: &str = + "channel_forwarding_stats"; +pub(crate) const CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// The node metrics will be persisted under this key. pub(crate) const NODE_METRICS_PRIMARY_NAMESPACE: &str = ""; pub(crate) const NODE_METRICS_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/io/utils.rs b/src/io/utils.rs index f94122434..485560bdb 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -39,6 +39,10 @@ use rand::rngs::OsRng; use rand::TryRngCore; use super::*; +use crate::io::{ + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, +}; use crate::chain::ChainSource; use crate::config::WALLET_KEYS_SEED_LEN; use crate::fee_estimator::OnchainFeeEstimator; @@ -46,7 +50,7 @@ use crate::io::{ NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, }; use crate::logger::{log_error, LdkLogger, Logger}; -use crate::payment::{ForwardedPaymentDetails, PendingPaymentDetails}; +use crate::payment::{ChannelForwardingStats, ForwardedPaymentDetails, PendingPaymentDetails}; use crate::peer_store::PeerStore; use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper}; @@ -320,6 +324,22 @@ where .await } +/// Read previously persisted channel forwarding stats from the store. +pub(crate) async fn read_channel_forwarding_stats( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + read_objects_from_store( + kv_store, + logger, + CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await +} + /// Read `OutputSweeper` state from the store. pub(crate) async fn read_output_sweeper( broadcaster: Arc, fee_estimator: Arc, diff --git a/src/lib.rs b/src/lib.rs index 0bea78471..1322e099d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -124,7 +124,8 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, - NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + ForwardedPaymentTrackingMode, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, + RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; pub use error::Error as NodeError; @@ -142,6 +143,7 @@ use lightning::events::bump_transaction::{Input, Wallet as LdkWallet}; use lightning::impl_writeable_tlv_based; use lightning::ln::chan_utils::FUNDING_TRANSACTION_WITNESS_WEIGHT; use lightning::ln::channel_state::{ChannelDetails as LdkChannelDetails, ChannelShutdownState}; +use lightning::ln::types::ChannelId; use lightning::ln::channelmanager::PaymentId; use lightning::ln::funding::SpliceContribution; use lightning::ln::msgs::SocketAddress; @@ -153,16 +155,16 @@ use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use payment::asynchronous::om_mailbox::OnionMessageMailbox; use payment::asynchronous::static_invoice_store::StaticInvoiceStore; use payment::{ - Bolt11Payment, Bolt12Payment, ForwardedPaymentDetails, ForwardedPaymentId, OnchainPayment, - PaymentDetails, SpontaneousPayment, UnifiedPayment, + Bolt11Payment, Bolt12Payment, ChannelForwardingStats, ForwardedPaymentDetails, + ForwardedPaymentId, OnchainPayment, PaymentDetails, SpontaneousPayment, UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; use rand::Rng; use runtime::Runtime; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, - ForwardedPaymentStore, Graph, HRNResolver, KeysManager, OnionMessenger, PaymentStore, - PeerManager, Router, Scorer, Sweeper, Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelForwardingStatsStore, + ChannelManager, DynStore, ForwardedPaymentStore, Graph, HRNResolver, KeysManager, + OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId}; pub use { @@ -223,6 +225,7 @@ pub struct Node { peer_store: Arc>>, payment_store: Arc, forwarded_payment_store: Arc, + channel_forwarding_stats_store: Arc, is_running: Arc>, node_metrics: Arc>, om_mailbox: Option>, @@ -575,6 +578,7 @@ impl Node { self.liquidity_source.clone(), Arc::clone(&self.payment_store), Arc::clone(&self.forwarded_payment_store), + Arc::clone(&self.channel_forwarding_stats_store), Arc::clone(&self.peer_store), static_invoice_store, Arc::clone(&self.onion_messenger), @@ -1718,10 +1722,48 @@ impl Node { } /// Retrieves all forwarded payments. + /// + /// **Note:** In [`ForwardedPaymentTrackingMode::Stats`] mode, this will return an empty vector + /// since individual payment records are not stored in that mode. + /// + /// [`ForwardedPaymentTrackingMode::Stats`]: crate::config::ForwardedPaymentTrackingMode::Stats pub fn list_forwarded_payments(&self) -> Vec { self.forwarded_payment_store.list_filter(|_| true) } + /// Returns the configured forwarded payment tracking mode. + pub fn forwarded_payment_tracking_mode(&self) -> ForwardedPaymentTrackingMode { + self.config.forwarded_payment_tracking_mode + } + + /// Retrieve the forwarding statistics for a specific channel. + /// + /// Returns `Some` if statistics exist for the given channel and `None` otherwise. + pub fn channel_forwarding_stats( + &self, channel_id: &ChannelId, + ) -> Option { + self.channel_forwarding_stats_store.get(channel_id) + } + + /// Retrieves all channel forwarding statistics. + pub fn list_channel_forwarding_stats(&self) -> Vec { + self.channel_forwarding_stats_store.list_filter(|_| true) + } + + /// Retrieves all channel forwarding statistics that match the given predicate. + /// + /// For example, to list stats for all channels that have earned at least 10000 msat in fees: + /// ```ignore + /// node.list_channel_forwarding_stats_with_filter(|s| { + /// s.total_fee_earned_msat >= 10000 + /// }); + /// ``` + pub fn list_channel_forwarding_stats_with_filter bool>( + &self, f: F, + ) -> Vec { + self.channel_forwarding_stats_store.list_filter(f) + } + /// Retrieves a list of known peers. pub fn list_peers(&self) -> Vec { let mut peers = Vec::new(); diff --git a/src/payment/mod.rs b/src/payment/mod.rs index 39a9a336a..28b4bb31e 100644 --- a/src/payment/mod.rs +++ b/src/payment/mod.rs @@ -22,7 +22,7 @@ pub use onchain::OnchainPayment; pub use pending_payment_store::PendingPaymentDetails; pub use spontaneous::SpontaneousPayment; pub use store::{ - ConfirmationStatus, ForwardedPaymentDetails, ForwardedPaymentId, LSPFeeLimits, PaymentDetails, - PaymentDirection, PaymentKind, PaymentStatus, + ChannelForwardingStats, ConfirmationStatus, ForwardedPaymentDetails, ForwardedPaymentId, + LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, }; pub use unified::{UnifiedPayment, UnifiedPaymentResult}; diff --git a/src/payment/store.rs b/src/payment/store.rs index 8722b9a8f..2ea81f92c 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -889,3 +889,165 @@ impl StorableObject for ForwardedPaymentDetails { ForwardedPaymentDetailsUpdate { id: self.id } } } + +/// Aggregate statistics for forwarded payments through a single channel. +/// +/// Each channel has one stats entry tracking all forwards where it was either +/// the inbound or outbound channel. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ChannelForwardingStats { + /// The channel this stats entry tracks. + pub channel_id: ChannelId, + /// The counterparty node id for this channel. + pub counterparty_node_id: Option, + /// Number of payments forwarded where this was the inbound channel. + pub inbound_payments_forwarded: u64, + /// Number of payments forwarded where this was the outbound channel. + pub outbound_payments_forwarded: u64, + /// Total amount received on this channel for forwarding (msat). + pub total_inbound_amount_msat: u64, + /// Total amount sent on this channel for forwarding (msat). + pub total_outbound_amount_msat: u64, + /// Total fees earned from forwards where this was the inbound channel (msat). + pub total_fee_earned_msat: u64, + /// Total skimmed fees (msat). + pub total_skimmed_fee_msat: u64, + /// Number of forwards claimed via onchain tx. + pub onchain_claims_count: u64, + /// Timestamp of first forward through this channel. + pub first_forwarded_at_timestamp: u64, + /// Timestamp of most recent forward through this channel. + pub last_forwarded_at_timestamp: u64, +} + +impl_writeable_tlv_based!(ChannelForwardingStats, { + (0, channel_id, required), + (2, counterparty_node_id, option), + (4, inbound_payments_forwarded, required), + (6, outbound_payments_forwarded, required), + (8, total_inbound_amount_msat, required), + (10, total_outbound_amount_msat, required), + (12, total_fee_earned_msat, required), + (14, total_skimmed_fee_msat, required), + (16, onchain_claims_count, required), + (18, first_forwarded_at_timestamp, required), + (20, last_forwarded_at_timestamp, required), +}); + +/// Update type for [`ChannelForwardingStats`] that supports incrementing counters. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ChannelForwardingStatsUpdate { + /// The channel ID being updated. + pub channel_id: ChannelId, + /// The counterparty node id (used when creating new entry). + pub counterparty_node_id: Option, + /// Increment for inbound payment count. + pub inbound_payments_increment: u64, + /// Increment for outbound payment count. + pub outbound_payments_increment: u64, + /// Increment for total inbound amount. + pub inbound_amount_increment_msat: u64, + /// Increment for total outbound amount. + pub outbound_amount_increment_msat: u64, + /// Increment for total fee earned. + pub fee_earned_increment_msat: u64, + /// Increment for skimmed fee. + pub skimmed_fee_increment_msat: u64, + /// Increment for onchain claims count. + pub onchain_claims_increment: u64, + /// Current timestamp for updating first/last timestamps. + pub timestamp: u64, +} + +impl StorableObjectUpdate for ChannelForwardingStatsUpdate { + fn id(&self) -> ChannelId { + self.channel_id + } +} + +impl StorableObjectId for ChannelId { + fn encode_to_hex_str(&self) -> String { + hex_utils::to_string(&self.0) + } +} + +impl StorableObject for ChannelForwardingStats { + type Id = ChannelId; + type Update = ChannelForwardingStatsUpdate; + + fn id(&self) -> Self::Id { + self.channel_id + } + + fn update(&mut self, update: &Self::Update) -> bool { + debug_assert_eq!( + self.channel_id, update.channel_id, + "We should only ever update stats for the same channel id" + ); + + let mut updated = false; + + // Update counterparty if not already set + if self.counterparty_node_id.is_none() && update.counterparty_node_id.is_some() { + self.counterparty_node_id = update.counterparty_node_id; + updated = true; + } + + // Increment counters + if update.inbound_payments_increment > 0 { + self.inbound_payments_forwarded += update.inbound_payments_increment; + updated = true; + } + if update.outbound_payments_increment > 0 { + self.outbound_payments_forwarded += update.outbound_payments_increment; + updated = true; + } + if update.inbound_amount_increment_msat > 0 { + self.total_inbound_amount_msat += update.inbound_amount_increment_msat; + updated = true; + } + if update.outbound_amount_increment_msat > 0 { + self.total_outbound_amount_msat += update.outbound_amount_increment_msat; + updated = true; + } + if update.fee_earned_increment_msat > 0 { + self.total_fee_earned_msat += update.fee_earned_increment_msat; + updated = true; + } + if update.skimmed_fee_increment_msat > 0 { + self.total_skimmed_fee_msat += update.skimmed_fee_increment_msat; + updated = true; + } + if update.onchain_claims_increment > 0 { + self.onchain_claims_count += update.onchain_claims_increment; + updated = true; + } + + // Update timestamps + if updated { + if self.first_forwarded_at_timestamp == 0 { + self.first_forwarded_at_timestamp = update.timestamp; + } + self.last_forwarded_at_timestamp = update.timestamp; + } + + updated + } + + fn to_update(&self) -> Self::Update { + // This creates an update representing the current state as increments. + // This is primarily used for insert_or_update behavior. + ChannelForwardingStatsUpdate { + channel_id: self.channel_id, + counterparty_node_id: self.counterparty_node_id, + inbound_payments_increment: self.inbound_payments_forwarded, + outbound_payments_increment: self.outbound_payments_forwarded, + inbound_amount_increment_msat: self.total_inbound_amount_msat, + outbound_amount_increment_msat: self.total_outbound_amount_msat, + fee_earned_increment_msat: self.total_fee_earned_msat, + skimmed_fee_increment_msat: self.total_skimmed_fee_msat, + onchain_claims_increment: self.onchain_claims_count, + timestamp: self.last_forwarded_at_timestamp, + } + } +} diff --git a/src/types.rs b/src/types.rs index 13e74f09f..178d1b1c1 100644 --- a/src/types.rs +++ b/src/types.rs @@ -39,7 +39,7 @@ use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; -use crate::payment::store::ForwardedPaymentDetails; +use crate::payment::store::{ChannelForwardingStats, ForwardedPaymentDetails}; use crate::payment::{PaymentDetails, PendingPaymentDetails}; use crate::runtime::RuntimeSpawner; @@ -322,6 +322,7 @@ pub(crate) type BumpTransactionEventHandler = pub(crate) type PaymentStore = DataStore>; pub(crate) type ForwardedPaymentStore = DataStore>; +pub(crate) type ChannelForwardingStatsStore = DataStore>; /// A local, potentially user-provided, identifier of a channel. ///