diff --git a/Cargo.toml b/Cargo.toml index e98d70b..5a8c908 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ signet-node-config = { version = "0.16.0-rc.7", path = "crates/node-config" } signet-node-tests = { version = "0.16.0-rc.7", path = "crates/node-tests" } signet-node-types = { version = "0.16.0-rc.7", path = "crates/node-types" } signet-rpc = { version = "0.16.0-rc.7", path = "crates/rpc" } +signet-rpc-storage = { version = "0.16.0-rc.7", path = "crates/rpc-storage" } init4-bin-base = { version = "0.18.0-rc.8", features = ["alloy"] } @@ -55,6 +56,10 @@ signet-tx-cache = "0.16.0-rc.8" signet-types = "0.16.0-rc.8" signet-zenith = "0.16.0-rc.8" signet-journal = "0.16.0-rc.8" +signet-storage = "0.2.0" +signet-cold = "0.2.0" +signet-hot = "0.2.0" +signet-storage-types = "0.2.0" # ajj ajj = { version = "0.3.4" } diff --git a/crates/node-tests/tests/rpc.rs b/crates/node-tests/tests/rpc.rs index 9c183e1..a293665 100644 --- a/crates/node-tests/tests/rpc.rs +++ b/crates/node-tests/tests/rpc.rs @@ -223,13 +223,13 @@ async fn getLogs_post(ctx: &SignetTestContext, contract: &TestCounterInstance) { .await .unwrap(); - // Two logs: one from the host transact, one from the alloy tx + // Two logs: one from the alloy tx, one from the host transact assert_eq!(logs.len(), 2); let log_inner = &logs[0].inner; assert_eq!(log_inner.address, *contract.address()); - // First increment is from the host transact (system tx runs first) + // First increment is from the alloy tx (regular txs execute before system txs) assert_eq!(log_inner.topics(), &[Counter::Count::SIGNATURE_HASH, B256::with_last_byte(1)]); - // Second increment is from the alloy tx + // Second increment is from the host transact (system tx) let log_inner = &logs[1].inner; assert_eq!(log_inner.address, *contract.address()); assert_eq!(log_inner.topics(), &[Counter::Count::SIGNATURE_HASH, B256::with_last_byte(2)]); diff --git a/crates/rpc-storage/Cargo.toml b/crates/rpc-storage/Cargo.toml new file mode 100644 index 0000000..4a1fe01 --- /dev/null +++ b/crates/rpc-storage/Cargo.toml @@ -0,0 +1,46 @@ +[package] +name = "signet-rpc-storage" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = "Ethereum JSON-RPC server backed by signet-storage" + +[dependencies] +signet-storage.workspace = true +signet-cold.workspace = true +signet-hot.workspace = true +signet-storage-types.workspace = true +signet-evm.workspace = true +trevm = { workspace = true, features = ["call", "estimate_gas"] } +signet-types.workspace = true +signet-tx-cache.workspace = true +signet-bundle.workspace = true +alloy.workspace = true +ajj.workspace = true +tokio.workspace = true +tokio-util = "0.7" +tracing.workspace = true +thiserror.workspace = true +serde.workspace = true +dashmap = "6.1.0" +revm-inspectors.workspace = true +itertools.workspace = true + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio-util = "0.7" +signet-cold = { workspace = true, features = ["test-utils"] } +signet-hot = { workspace = true, features = ["test-utils"] } +signet-storage.workspace = true +signet-storage-types.workspace = true +signet-constants.workspace = true +alloy.workspace = true +serde_json.workspace = true +axum = "0.8" +tower = { version = "0.5", features = ["util"] } +http = "1" +trevm.workspace = true diff --git a/crates/rpc-storage/README.md b/crates/rpc-storage/README.md new file mode 100644 index 0000000..ec3247e --- /dev/null +++ b/crates/rpc-storage/README.md @@ -0,0 +1,16 @@ +# signet-rpc-storage + +Ethereum JSON-RPC server backed by `signet-storage`'s unified storage backend. + +This crate provides a standalone ETH RPC implementation that uses hot storage +for state queries and cold storage for block, transaction, and receipt data. +Unlike `signet-rpc`, it does not depend on reth's `FullNodeComponents`. + +## Supported Methods + +- Block queries: `eth_blockNumber`, `eth_getBlockByHash`, `eth_getBlockByNumber`, etc. +- Transaction queries: `eth_getTransactionByHash`, `eth_getTransactionReceipt`, etc. +- Account state: `eth_getBalance`, `eth_getStorageAt`, `eth_getCode`, `eth_getTransactionCount` +- EVM execution: `eth_call`, `eth_estimateGas` +- Logs: `eth_getLogs` +- Transaction submission: `eth_sendRawTransaction` (optional, via `TxCache`) diff --git a/crates/rpc-storage/src/config.rs b/crates/rpc-storage/src/config.rs new file mode 100644 index 0000000..98c1f26 --- /dev/null +++ b/crates/rpc-storage/src/config.rs @@ -0,0 +1,85 @@ +//! Configuration for the storage-backed RPC server. + +use std::time::Duration; + +/// Configuration for the storage-backed ETH RPC server. +/// +/// Mirrors the subset of reth's `EthConfig` that applies to +/// storage-backed RPC. +/// +/// # Example +/// +/// ``` +/// use signet_rpc_storage::StorageRpcConfig; +/// +/// // Use defaults (matches reth defaults). +/// let config = StorageRpcConfig::default(); +/// assert_eq!(config.rpc_gas_cap, 30_000_000); +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct StorageRpcConfig { + /// Maximum gas for `eth_call` and `eth_estimateGas`. + /// + /// Default: `30_000_000` (30M gas). + pub rpc_gas_cap: u64, + + /// Maximum block range per `eth_getLogs` query. + /// + /// Default: `10_000`. + pub max_blocks_per_filter: u64, + + /// Maximum number of logs returned per `eth_getLogs` response. + /// Set to `0` to disable the limit. + /// + /// Default: `20_000`. + pub max_logs_per_response: usize, + + /// Maximum concurrent tracing/debug requests. + /// + /// Controls the size of the semaphore that gates debug + /// namespace calls. + /// + /// Default: `25`. + pub max_tracing_requests: usize, + + /// Time-to-live for stale filters and subscriptions. + /// + /// Default: `5 minutes`. + pub stale_filter_ttl: Duration, + + /// Number of recent blocks to consider for gas price suggestions. + /// + /// Default: `20`. + pub gas_oracle_block_count: u64, + + /// Percentile of effective tips to use as the gas price suggestion. + /// + /// Default: `60.0`. + pub gas_oracle_percentile: f64, + + /// Maximum header history for `eth_feeHistory` without percentiles. + /// + /// Default: `1024`. + pub max_header_history: u64, + + /// Maximum block history for `eth_feeHistory` with percentiles. + /// + /// Default: `1024`. + pub max_block_history: u64, +} + +impl Default for StorageRpcConfig { + fn default() -> Self { + Self { + rpc_gas_cap: 30_000_000, + max_blocks_per_filter: 10_000, + max_logs_per_response: 20_000, + max_tracing_requests: 25, + stale_filter_ttl: Duration::from_secs(5 * 60), + gas_oracle_block_count: 20, + gas_oracle_percentile: 60.0, + max_header_history: 1024, + max_block_history: 1024, + } + } +} diff --git a/crates/rpc-storage/src/ctx.rs b/crates/rpc-storage/src/ctx.rs new file mode 100644 index 0000000..a65f845 --- /dev/null +++ b/crates/rpc-storage/src/ctx.rs @@ -0,0 +1,236 @@ +//! RPC context wrapping [`UnifiedStorage`]. + +use crate::{ + EthError, StorageRpcConfig, + interest::{FilterManager, NewBlockNotification, SubscriptionManager}, + resolve::{BlockTags, ResolveError}, +}; +use alloy::eips::{BlockId, BlockNumberOrTag}; +use signet_cold::ColdStorageReadHandle; +use signet_hot::HotKv; +use signet_hot::model::{HotKvRead, RevmRead}; +use signet_storage::UnifiedStorage; +use signet_tx_cache::TxCache; +use signet_types::constants::SignetSystemConstants; +use std::sync::Arc; +use tokio::sync::{Semaphore, broadcast}; +use trevm::revm::database::DBErrorMarker; +use trevm::revm::database::StateBuilder; + +/// Resolved block context for EVM execution. +/// +/// Contains the header and a revm-compatible database snapshot at the +/// resolved block height, ready for use with `signet_evm`. +#[derive(Debug)] +pub(crate) struct EvmBlockContext { + /// The resolved block header. + pub header: alloy::consensus::Header, + /// The revm database at the resolved height. + pub db: trevm::revm::database::State, +} + +/// RPC context backed by [`UnifiedStorage`]. +/// +/// Provides access to hot storage (state), cold storage (blocks/txs/receipts), +/// block tag resolution, and optional transaction forwarding. +/// +/// # Construction +/// +/// ```ignore +/// let ctx = StorageRpcCtx::new(storage, constants, tags, Some(tx_cache), StorageRpcConfig::default()); +/// ``` +#[derive(Debug)] +pub struct StorageRpcCtx { + inner: Arc>, +} + +impl Clone for StorageRpcCtx { + fn clone(&self) -> Self { + Self { inner: Arc::clone(&self.inner) } + } +} + +#[derive(Debug)] +struct StorageRpcCtxInner { + storage: UnifiedStorage, + constants: SignetSystemConstants, + tags: BlockTags, + tx_cache: Option, + config: StorageRpcConfig, + tracing_semaphore: Arc, + filter_manager: FilterManager, + sub_manager: SubscriptionManager, +} + +impl StorageRpcCtx { + /// Create a new storage-backed RPC context. + /// + /// The `notif_sender` is used by the subscription manager to receive + /// new block notifications. Callers send [`NewBlockNotification`]s on + /// this channel as blocks are appended to storage. + pub fn new( + storage: UnifiedStorage, + constants: SignetSystemConstants, + tags: BlockTags, + tx_cache: Option, + config: StorageRpcConfig, + notif_sender: broadcast::Sender, + ) -> Self { + let tracing_semaphore = Arc::new(Semaphore::new(config.max_tracing_requests)); + let filter_manager = FilterManager::new(config.stale_filter_ttl, config.stale_filter_ttl); + let sub_manager = SubscriptionManager::new(notif_sender, config.stale_filter_ttl); + Self { + inner: Arc::new(StorageRpcCtxInner { + storage, + constants, + tags, + tx_cache, + config, + tracing_semaphore, + filter_manager, + sub_manager, + }), + } + } + + /// Access the unified storage. + pub fn storage(&self) -> &UnifiedStorage { + &self.inner.storage + } + + /// Get a cold storage read handle. + pub fn cold(&self) -> ColdStorageReadHandle { + self.inner.storage.cold_reader() + } + + /// Get a hot storage read transaction. + pub fn hot_reader(&self) -> signet_storage::StorageResult { + self.inner.storage.reader() + } + + /// Access the block tags. + pub fn tags(&self) -> &BlockTags { + &self.inner.tags + } + + /// Access the system constants. + pub fn constants(&self) -> &SignetSystemConstants { + &self.inner.constants + } + + /// Get the chain ID. + pub fn chain_id(&self) -> u64 { + self.inner.constants.ru_chain_id() + } + + /// Access the RPC configuration. + pub fn config(&self) -> &StorageRpcConfig { + &self.inner.config + } + + /// Acquire a permit from the tracing semaphore. + /// + /// Limits concurrent tracing/debug requests. Callers should hold + /// the permit for the duration of their tracing operation. + pub async fn acquire_tracing_permit(&self) -> tokio::sync::OwnedSemaphorePermit { + Arc::clone(&self.inner.tracing_semaphore) + .acquire_owned() + .await + .expect("tracing semaphore closed") + } + + /// Access the optional tx cache. + pub fn tx_cache(&self) -> Option<&TxCache> { + self.inner.tx_cache.as_ref() + } + + /// Access the filter manager. + pub(crate) fn filter_manager(&self) -> &FilterManager { + &self.inner.filter_manager + } + + /// Access the subscription manager. + pub(crate) fn sub_manager(&self) -> &SubscriptionManager { + &self.inner.sub_manager + } + + /// Resolve a [`BlockNumberOrTag`] to a block number. + /// + /// This is synchronous — no cold storage lookup is needed. + /// + /// - `Latest` / `Pending` → latest tag + /// - `Safe` → safe tag + /// - `Finalized` → finalized tag + /// - `Earliest` → `0` + /// - `Number(n)` → `n` + pub(crate) fn resolve_block_tag(&self, tag: BlockNumberOrTag) -> u64 { + match tag { + BlockNumberOrTag::Latest | BlockNumberOrTag::Pending => self.tags().latest(), + BlockNumberOrTag::Safe => self.tags().safe(), + BlockNumberOrTag::Finalized => self.tags().finalized(), + BlockNumberOrTag::Earliest => 0, + BlockNumberOrTag::Number(n) => n, + } + } + + /// Resolve a [`BlockId`] to a block number. + /// + /// For tag/number-based IDs, resolves synchronously via + /// [`resolve_block_tag`](Self::resolve_block_tag). For hash-based IDs, + /// fetches the header from cold storage to obtain the block number. + pub(crate) async fn resolve_block_id(&self, id: BlockId) -> Result { + match id { + BlockId::Number(tag) => Ok(self.resolve_block_tag(tag)), + BlockId::Hash(h) => { + let header = self + .cold() + .get_header_by_hash(h.block_hash) + .await? + .ok_or(ResolveError::HashNotFound(h.block_hash))?; + Ok(header.number) + } + } + } + + /// Create a revm-compatible database at a specific block height. + /// + /// The returned `State>` implements both `Database` and + /// `DatabaseCommit`, making it suitable for use with `signet_evm`. + pub fn revm_state_at_height( + &self, + height: u64, + ) -> signet_storage::StorageResult>> + where + ::Error: DBErrorMarker, + { + let revm_read = self.inner.storage.revm_reader_at_height(height)?; + Ok(StateBuilder::new_with_database(revm_read).build()) + } + + /// Resolve a [`BlockId`] to a header and revm database in one pass. + /// + /// For hash-based IDs, fetches the header directly by hash. For + /// tag/number-based IDs, resolves the tag then fetches the header by + /// number. This avoids a redundant header lookup that would occur if + /// resolving to a block number first. + pub(crate) async fn resolve_evm_block( + &self, + id: BlockId, + ) -> Result>, EthError> + where + ::Error: DBErrorMarker, + { + let cold = self.cold(); + let header = match id { + BlockId::Hash(h) => cold.get_header_by_hash(h.block_hash).await?, + BlockId::Number(tag) => { + let height = self.resolve_block_tag(tag); + cold.get_header_by_number(height).await? + } + } + .ok_or(EthError::BlockNotFound(id))?; + + let db = self.revm_state_at_height(header.number)?; + Ok(EvmBlockContext { header, db }) + } +} diff --git a/crates/rpc-storage/src/debug/endpoints.rs b/crates/rpc-storage/src/debug/endpoints.rs new file mode 100644 index 0000000..7662429 --- /dev/null +++ b/crates/rpc-storage/src/debug/endpoints.rs @@ -0,0 +1,199 @@ +//! Debug namespace RPC endpoint implementations. + +use crate::{ + ctx::StorageRpcCtx, + debug::DebugError, + eth::helpers::{CfgFiller, await_handler, response_tri}, +}; +use ajj::{HandlerCtx, ResponsePayload}; +use alloy::{ + consensus::BlockHeader, + eips::BlockId, + primitives::B256, + rpc::types::trace::geth::{GethDebugTracingOptions, GethTrace, TraceResult}, +}; +use itertools::Itertools; +use signet_evm::EvmErrored; +use signet_hot::HotKv; +use signet_hot::model::HotKvRead; +use signet_types::MagicSig; +use tracing::Instrument; +use trevm::revm::database::DBErrorMarker; + +/// Params for `debug_traceBlockByNumber` and `debug_traceBlockByHash`. +#[derive(Debug, serde::Deserialize)] +pub(super) struct TraceBlockParams(T, #[serde(default)] Option); + +/// Params for `debug_traceTransaction`. +#[derive(Debug, serde::Deserialize)] +pub(super) struct TraceTransactionParams(B256, #[serde(default)] Option); + +/// `debug_traceBlockByNumber` and `debug_traceBlockByHash` handler. +pub(super) async fn trace_block( + hctx: HandlerCtx, + TraceBlockParams(id, opts): TraceBlockParams, + ctx: StorageRpcCtx, +) -> ResponsePayload, DebugError> +where + T: Into, + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let opts = response_tri!(opts.ok_or(DebugError::InvalidTracerConfig)); + + let _permit = ctx.acquire_tracing_permit().await; + + let id = id.into(); + let span = tracing::debug_span!("traceBlock", ?id, tracer = ?opts.tracer.as_ref()); + + let fut = async move { + let cold = ctx.cold(); + let block_num = response_tri!( + ctx.resolve_block_id(id) + .await + .map_err(|e| { DebugError::BlockNotFound(e.to_string()) }) + ); + + let (header, txs) = response_tri!( + tokio::try_join!( + cold.get_header_by_number(block_num), + cold.get_transactions_in_block(block_num), + ) + .map_err(|e| DebugError::Cold(e.to_string())) + ); + + let Some(header) = header else { + return ResponsePayload::internal_error_message( + format!("block not found: {id}").into(), + ); + }; + + let block_hash = header.hash_slow(); + + tracing::debug!(number = header.number, "Loaded block"); + + let mut frames = Vec::with_capacity(txs.len()); + + // State BEFORE this block + let db = response_tri!( + ctx.revm_state_at_height(header.number.saturating_sub(1)) + .map_err(|e| DebugError::Hot(e.to_string())) + ); + + let mut trevm = signet_evm::signet_evm(db, ctx.constants().clone()) + .fill_cfg(&CfgFiller(ctx.chain_id())) + .fill_block(&header); + + let mut txns = txs.iter().enumerate().peekable(); + for (idx, tx) in txns + .by_ref() + .peeking_take_while(|(_, t)| MagicSig::try_from_signature(t.signature()).is_none()) + { + let tx_info = alloy::rpc::types::TransactionInfo { + hash: Some(*tx.tx_hash()), + index: Some(idx as u64), + block_hash: Some(block_hash), + block_number: Some(header.number), + base_fee: header.base_fee_per_gas(), + }; + + let t = trevm.fill_tx(tx); + let frame; + (frame, trevm) = response_tri!(crate::debug::tracer::trace(t, &opts, tx_info)); + frames.push(TraceResult::Success { result: frame, tx_hash: Some(*tx.tx_hash()) }); + + tracing::debug!(tx_index = idx, tx_hash = ?tx.tx_hash(), "Traced transaction"); + } + + ResponsePayload::Success(frames) + } + .instrument(span); + + await_handler!(@response_option hctx.spawn_blocking(fut)) +} + +/// `debug_traceTransaction` handler. +pub(super) async fn trace_transaction( + hctx: HandlerCtx, + TraceTransactionParams(tx_hash, opts): TraceTransactionParams, + ctx: StorageRpcCtx, +) -> ResponsePayload +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let opts = response_tri!(opts.ok_or(DebugError::InvalidTracerConfig)); + + let _permit = ctx.acquire_tracing_permit().await; + + let span = tracing::debug_span!("traceTransaction", %tx_hash, tracer = ?opts.tracer.as_ref()); + + let fut = async move { + let cold = ctx.cold(); + + // Look up the transaction and its containing block + let confirmed = response_tri!( + cold.get_tx_by_hash(tx_hash).await.map_err(|e| DebugError::Cold(e.to_string())) + ); + + let confirmed = response_tri!(confirmed.ok_or(DebugError::TransactionNotFound)); + let (_tx, meta) = confirmed.into_parts(); + + let block_num = meta.block_number(); + let block_hash = meta.block_hash(); + + let (header, txs) = response_tri!( + tokio::try_join!( + cold.get_header_by_number(block_num), + cold.get_transactions_in_block(block_num), + ) + .map_err(|e| DebugError::Cold(e.to_string())) + ); + + let header = + response_tri!(header.ok_or(DebugError::BlockNotFound(format!("block {block_num}")))); + + tracing::debug!(number = block_num, "Loaded containing block"); + + // State BEFORE this block + let db = response_tri!( + ctx.revm_state_at_height(block_num.saturating_sub(1)) + .map_err(|e| DebugError::Hot(e.to_string())) + ); + + let mut trevm = signet_evm::signet_evm(db, ctx.constants().clone()) + .fill_cfg(&CfgFiller(ctx.chain_id())) + .fill_block(&header); + + // Replay all transactions up to (but not including) the target + let mut txns = txs.iter().enumerate().peekable(); + for (_idx, tx) in txns.by_ref().peeking_take_while(|(_, t)| t.tx_hash() != &tx_hash) { + if MagicSig::try_from_signature(tx.signature()).is_some() { + return ResponsePayload::internal_error_message( + DebugError::TransactionNotFound.to_string().into(), + ); + } + + trevm = response_tri!(trevm.run_tx(tx).map_err(EvmErrored::into_error)).accept_state(); + } + + let (index, tx) = response_tri!(txns.next().ok_or(DebugError::TransactionNotFound)); + + let trevm = trevm.fill_tx(tx); + + let tx_info = alloy::rpc::types::TransactionInfo { + hash: Some(*tx.tx_hash()), + index: Some(index as u64), + block_hash: Some(block_hash), + block_number: Some(header.number), + base_fee: header.base_fee_per_gas(), + }; + + let res = response_tri!(crate::debug::tracer::trace(trevm, &opts, tx_info)).0; + + ResponsePayload::Success(res) + } + .instrument(span); + + await_handler!(@response_option hctx.spawn_blocking(fut)) +} diff --git a/crates/rpc-storage/src/debug/error.rs b/crates/rpc-storage/src/debug/error.rs new file mode 100644 index 0000000..bc87c42 --- /dev/null +++ b/crates/rpc-storage/src/debug/error.rs @@ -0,0 +1,43 @@ +//! Error types for the debug namespace. + +/// Errors that can occur in the `debug` namespace. +#[derive(Debug, Clone, thiserror::Error)] +pub enum DebugError { + /// Cold storage error. + #[error("cold storage: {0}")] + Cold(String), + /// Hot storage error. + #[error("hot storage: {0}")] + Hot(String), + /// Invalid tracer configuration. + #[error("invalid tracer config")] + InvalidTracerConfig, + /// Unsupported tracer type. + #[error("unsupported: {0}")] + Unsupported(&'static str), + /// EVM execution error. + #[error("evm: {0}")] + Evm(String), + /// Block not found. + #[error("block not found: {0}")] + BlockNotFound(String), + /// Transaction not found. + #[error("transaction not found")] + TransactionNotFound, +} + +impl DebugError { + /// Convert to a string by value. + pub fn into_string(self) -> String { + self.to_string() + } +} + +impl serde::Serialize for DebugError { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} diff --git a/crates/rpc-storage/src/debug/mod.rs b/crates/rpc-storage/src/debug/mod.rs new file mode 100644 index 0000000..172a2fc --- /dev/null +++ b/crates/rpc-storage/src/debug/mod.rs @@ -0,0 +1,25 @@ +//! Debug namespace RPC router backed by storage. + +mod endpoints; +use endpoints::{trace_block, trace_transaction}; +mod error; +pub use error::DebugError; +pub(crate) mod tracer; + +use crate::ctx::StorageRpcCtx; +use alloy::{eips::BlockNumberOrTag, primitives::B256}; +use signet_hot::HotKv; +use signet_hot::model::HotKvRead; +use trevm::revm::database::DBErrorMarker; + +/// Instantiate a `debug` API router backed by storage. +pub(crate) fn debug() -> ajj::Router> +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + ajj::Router::new() + .route("traceBlockByNumber", trace_block::) + .route("traceBlockByHash", trace_block::) + .route("traceTransaction", trace_transaction::) +} diff --git a/crates/rpc-storage/src/debug/tracer.rs b/crates/rpc-storage/src/debug/tracer.rs new file mode 100644 index 0000000..52ed546 --- /dev/null +++ b/crates/rpc-storage/src/debug/tracer.rs @@ -0,0 +1,224 @@ +//! Core tracing logic for the debug namespace. +//! +//! Largely adapted from reth: `crates/rpc/rpc/src/debug.rs`. + +use crate::debug::DebugError; +use alloy::rpc::types::{ + TransactionInfo, + trace::geth::{ + FourByteFrame, GethDebugBuiltInTracerType, GethDebugTracerConfig, GethDebugTracerType, + GethDebugTracingOptions, GethTrace, NoopFrame, + }, +}; +use revm_inspectors::tracing::{ + FourByteInspector, MuxInspector, TracingInspector, TracingInspectorConfig, +}; +use signet_evm::{EvmNeedsTx, EvmReady}; +use tracing::instrument; +use trevm::{ + helpers::Ctx, + revm::{Database, DatabaseCommit, DatabaseRef, Inspector, context::ContextTr}, +}; + +/// Trace a transaction using the provided EVM and tracing options. +#[instrument(skip(trevm, config, tx_info), fields(tx_hash = ?tx_info.hash))] +pub(super) fn trace( + trevm: EvmReady, + config: &GethDebugTracingOptions, + tx_info: TransactionInfo, +) -> Result<(GethTrace, EvmNeedsTx), DebugError> +where + Db: Database + DatabaseCommit + DatabaseRef, + Insp: Inspector>, +{ + let Some(tracer) = &config.tracer else { + return Err(DebugError::InvalidTracerConfig); + }; + + let GethDebugTracerType::BuiltInTracer(built_in) = tracer else { + return Err(DebugError::Unsupported("JS tracer")); + }; + + match built_in { + GethDebugBuiltInTracerType::Erc7562Tracer => { + Err(DebugError::Unsupported("ERC-7562 tracing is not yet implemented")) + } + GethDebugBuiltInTracerType::FourByteTracer => trace_four_byte(trevm), + GethDebugBuiltInTracerType::CallTracer => trace_call(&config.tracer_config, trevm), + GethDebugBuiltInTracerType::FlatCallTracer => { + trace_flat_call(&config.tracer_config, trevm, tx_info) + } + GethDebugBuiltInTracerType::PreStateTracer => trace_pre_state(&config.tracer_config, trevm), + GethDebugBuiltInTracerType::NoopTracer => Ok(( + NoopFrame::default().into(), + trevm + .run() + .map_err(|err| DebugError::Evm(err.into_error().to_string()))? + .accept_state(), + )), + GethDebugBuiltInTracerType::MuxTracer => trace_mux(&config.tracer_config, trevm, tx_info), + } +} + +fn trace_four_byte( + trevm: EvmReady, +) -> Result<(GethTrace, EvmNeedsTx), DebugError> +where + Db: Database + DatabaseCommit, + Insp: Inspector>, +{ + let mut four_byte = FourByteInspector::default(); + let trevm = trevm + .try_with_inspector(&mut four_byte, |trevm| trevm.run()) + .map_err(|e| DebugError::Evm(e.into_error().to_string()))?; + Ok((FourByteFrame::from(four_byte).into(), trevm.accept_state())) +} + +fn trace_call( + tracer_config: &GethDebugTracerConfig, + trevm: EvmReady, +) -> Result<(GethTrace, EvmNeedsTx), DebugError> +where + Db: Database + DatabaseCommit, + Insp: Inspector>, +{ + let call_config = + tracer_config.clone().into_call_config().map_err(|_| DebugError::InvalidTracerConfig)?; + + let mut inspector = + TracingInspector::new(TracingInspectorConfig::from_geth_call_config(&call_config)); + + let trevm = trevm + .try_with_inspector(&mut inspector, |trevm| trevm.run()) + .map_err(|e| DebugError::Evm(e.into_error().to_string()))?; + + let frame = inspector + .with_transaction_gas_limit(trevm.gas_limit()) + .into_geth_builder() + .geth_call_traces(call_config, trevm.gas_used()); + + Ok((frame.into(), trevm.accept_state())) +} + +fn trace_pre_state( + tracer_config: &GethDebugTracerConfig, + trevm: EvmReady, +) -> Result<(GethTrace, EvmNeedsTx), DebugError> +where + Db: Database + DatabaseCommit + DatabaseRef, + Insp: Inspector>, +{ + let prestate_config = tracer_config + .clone() + .into_pre_state_config() + .map_err(|_| DebugError::InvalidTracerConfig)?; + + let mut inspector = + TracingInspector::new(TracingInspectorConfig::from_geth_prestate_config(&prestate_config)); + + let trevm = trevm + .try_with_inspector(&mut inspector, |trevm| trevm.run()) + .map_err(|e| DebugError::Evm(e.into_error().to_string()))?; + let gas_limit = trevm.gas_limit(); + + // NB: state must be UNCOMMITTED for prestate diff computation. + let (result, mut trevm) = trevm.take_result_and_state(); + + let frame = inspector + .with_transaction_gas_limit(gas_limit) + .into_geth_builder() + .geth_prestate_traces(&result, &prestate_config, trevm.inner_mut_unchecked().db_mut()) + .map_err(|err| DebugError::Evm(err.to_string()))?; + + // Equivalent to `trevm.accept_state()`. + trevm.inner_mut_unchecked().db_mut().commit(result.state); + + Ok((frame.into(), trevm)) +} + +fn trace_flat_call( + tracer_config: &GethDebugTracerConfig, + trevm: EvmReady, + tx_info: TransactionInfo, +) -> Result<(GethTrace, EvmNeedsTx), DebugError> +where + Db: Database + DatabaseCommit, + Insp: Inspector>, +{ + let flat_call_config = tracer_config + .clone() + .into_flat_call_config() + .map_err(|_| DebugError::InvalidTracerConfig)?; + + let mut inspector = + TracingInspector::new(TracingInspectorConfig::from_flat_call_config(&flat_call_config)); + + let trevm = trevm + .try_with_inspector(&mut inspector, |trevm| trevm.run()) + .map_err(|e| DebugError::Evm(e.into_error().to_string()))?; + + let frame = inspector + .with_transaction_gas_limit(trevm.gas_limit()) + .into_parity_builder() + .into_localized_transaction_traces(tx_info); + + Ok((frame.into(), trevm.accept_state())) +} + +fn trace_mux( + tracer_config: &GethDebugTracerConfig, + trevm: EvmReady, + tx_info: TransactionInfo, +) -> Result<(GethTrace, EvmNeedsTx), DebugError> +where + Db: Database + DatabaseCommit + DatabaseRef, + Insp: Inspector>, +{ + let mux_config = + tracer_config.clone().into_mux_config().map_err(|_| DebugError::InvalidTracerConfig)?; + + let mut inspector = MuxInspector::try_from_config(mux_config) + .map_err(|err| DebugError::Evm(err.to_string()))?; + + let trevm = trevm + .try_with_inspector(&mut inspector, |trevm| trevm.run()) + .map_err(|e| DebugError::Evm(e.into_error().to_string()))?; + + // NB: state must be UNCOMMITTED for prestate diff computation. + let (result, mut trevm) = trevm.take_result_and_state(); + + let frame = inspector + .try_into_mux_frame(&result, trevm.inner_mut_unchecked().db_mut(), tx_info) + .map_err(|err| DebugError::Evm(err.to_string()))?; + + // Equivalent to `trevm.accept_state()`. + trevm.inner_mut_unchecked().db_mut().commit(result.state); + + Ok((frame.into(), trevm)) +} + +// Some code in this file has been copied and modified from reth +// +// The original license is included below: +// +// The MIT License (MIT) +// +// Copyright (c) 2022-2025 Reth Contributors +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +//. +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. diff --git a/crates/rpc-storage/src/eth/endpoints.rs b/crates/rpc-storage/src/eth/endpoints.rs new file mode 100644 index 0000000..4e010f0 --- /dev/null +++ b/crates/rpc-storage/src/eth/endpoints.rs @@ -0,0 +1,1132 @@ +//! ETH namespace RPC endpoint implementations. + +use crate::{ + ctx::{EvmBlockContext, StorageRpcCtx}, + eth::helpers::{ + AddrWithBlock, BlockParams, BlockRangeInclusiveIter, CfgFiller, FeeHistoryArgs, + StorageAtArgs, SubscribeArgs, TxParams, await_handler, build_receipt, + build_receipt_from_parts, build_rpc_transaction, normalize_gas_stateless, response_tri, + }, + gas_oracle, + interest::{FilterOutput, InterestKind}, +}; +use ajj::{HandlerCtx, ResponsePayload}; +use alloy::{ + consensus::{BlockHeader, Transaction, TxReceipt}, + eips::{ + BlockId, BlockNumberOrTag, + eip1559::BaseFeeParams, + eip2718::{Decodable2718, Encodable2718}, + }, + primitives::{B256, U64, U256}, + rpc::types::{Block, BlockTransactions, FeeHistory, Filter, FilteredParams, Log}, +}; +use signet_cold::{HeaderSpecifier, ReceiptSpecifier}; +use signet_hot::model::HotKvRead; +use signet_hot::{HistoryRead, HotKv, db::HotDbRead}; +use std::borrow::Cow; +use tracing::{Instrument, debug, trace_span}; +use trevm::{EstimationResult, revm::database::DBErrorMarker}; + +use super::error::CallErrorData; + +// --------------------------------------------------------------------------- +// Not Supported +// --------------------------------------------------------------------------- + +pub(crate) async fn not_supported() -> ResponsePayload<(), ()> { + ResponsePayload::internal_error_message(Cow::Borrowed("Method not supported.")) +} + +// --------------------------------------------------------------------------- +// Simple Queries +// --------------------------------------------------------------------------- + +pub(crate) async fn block_number(ctx: StorageRpcCtx) -> Result { + Ok(U64::from(ctx.tags().latest())) +} + +pub(crate) async fn chain_id(ctx: StorageRpcCtx) -> Result { + Ok(U64::from(ctx.chain_id())) +} + +// --------------------------------------------------------------------------- +// Gas & Fee Queries +// --------------------------------------------------------------------------- + +pub(crate) async fn gas_price(hctx: HandlerCtx, ctx: StorageRpcCtx) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let task = async move { + let latest = ctx.tags().latest(); + let cold = ctx.cold(); + + let tip = gas_oracle::suggest_tip_cap(&cold, latest, ctx.config()) + .await + .map_err(|e| e.to_string())?; + + let base_fee = cold + .get_header_by_number(latest) + .await + .map_err(|e| e.to_string())? + .and_then(|h| h.base_fee_per_gas) + .unwrap_or_default(); + + Ok(tip + U256::from(base_fee)) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn max_priority_fee_per_gas( + hctx: HandlerCtx, + ctx: StorageRpcCtx, +) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let task = async move { + let latest = ctx.tags().latest(); + gas_oracle::suggest_tip_cap(&ctx.cold(), latest, ctx.config()) + .await + .map_err(|e| e.to_string()) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn fee_history( + hctx: HandlerCtx, + FeeHistoryArgs(block_count, newest, reward_percentiles): FeeHistoryArgs, + ctx: StorageRpcCtx, +) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let task = async move { + let mut block_count = block_count.to::(); + + if block_count == 0 { + return Ok(FeeHistory::default()); + } + + let max_fee_history = if reward_percentiles.is_none() { + ctx.config().max_header_history + } else { + ctx.config().max_block_history + }; + + block_count = block_count.min(max_fee_history); + + let mut newest = newest; + if newest.is_pending() { + newest = BlockNumberOrTag::Latest; + block_count = block_count.saturating_sub(1); + } + + let end_block = ctx.resolve_block_tag(newest); + let end_block_plus = end_block + 1; + + block_count = block_count.min(end_block_plus); + + // Validate percentiles + if let Some(percentiles) = &reward_percentiles + && percentiles.windows(2).any(|w| w[0] > w[1] || w[0] > 100.) + { + return Err("invalid reward percentiles".to_string()); + } + + let start_block = end_block_plus - block_count; + let cold = ctx.cold(); + + let specs: Vec<_> = (start_block..=end_block).map(HeaderSpecifier::Number).collect(); + let headers = cold.get_headers(specs).await.map_err(|e| e.to_string())?; + + let mut base_fee_per_gas: Vec = Vec::with_capacity(headers.len() + 1); + let mut gas_used_ratio: Vec = Vec::with_capacity(headers.len()); + let mut rewards: Vec> = Vec::new(); + + for (offset, maybe_header) in headers.iter().enumerate() { + let Some(header) = maybe_header else { + return Err(format!("missing header at block {}", start_block + offset as u64)); + }; + + base_fee_per_gas.push(header.base_fee_per_gas.unwrap_or_default() as u128); + gas_used_ratio.push(if header.gas_limit > 0 { + header.gas_used as f64 / header.gas_limit as f64 + } else { + 0.0 + }); + + if let Some(percentiles) = &reward_percentiles { + let block_num = start_block + offset as u64; + + let (txs, receipts) = tokio::try_join!( + cold.get_transactions_in_block(block_num), + cold.get_receipts_in_block(block_num), + ) + .map_err(|e| e.to_string())?; + + let block_rewards = calculate_reward_percentiles( + percentiles, + header.gas_used, + header.base_fee_per_gas.unwrap_or_default(), + &txs, + &receipts, + ); + rewards.push(block_rewards); + } + } + + // Next block base fee + if let Some(last_header) = headers.last().and_then(|h| h.as_ref()) { + base_fee_per_gas.push( + last_header.next_block_base_fee(BaseFeeParams::ethereum()).unwrap_or_default() + as u128, + ); + } + + let base_fee_per_blob_gas = vec![0; base_fee_per_gas.len()]; + let blob_gas_used_ratio = vec![0.; gas_used_ratio.len()]; + + Ok(FeeHistory { + base_fee_per_gas, + gas_used_ratio, + base_fee_per_blob_gas, + blob_gas_used_ratio, + oldest_block: start_block, + reward: reward_percentiles.map(|_| rewards), + }) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +/// Calculate reward percentiles for a single block. +/// +/// Sorts transactions by effective tip ascending, then walks +/// cumulative gas used to find the tip value at each percentile. +fn calculate_reward_percentiles( + percentiles: &[f64], + gas_used: u64, + base_fee: u64, + txs: &[signet_storage_types::TransactionSigned], + receipts: &[signet_storage_types::Receipt], +) -> Vec { + if gas_used == 0 || txs.is_empty() { + return vec![0; percentiles.len()]; + } + + // Pair each tx's effective tip with its gas used (from receipt cumulative deltas) + let mut tx_gas_and_tip: Vec<(u64, u128)> = txs + .iter() + .zip(receipts.iter()) + .enumerate() + .map(|(i, (tx, receipt))| { + let prev_cumulative = + if i > 0 { receipts[i - 1].inner.cumulative_gas_used() } else { 0 }; + let tx_gas = receipt.inner.cumulative_gas_used() - prev_cumulative; + let tip = tx.effective_tip_per_gas(base_fee).unwrap_or_default(); + (tx_gas, tip) + }) + .collect(); + + // Sort by tip ascending + tx_gas_and_tip.sort_by_key(|&(_, tip)| tip); + + let mut result = Vec::with_capacity(percentiles.len()); + let mut cumulative_gas: u64 = 0; + let mut tx_idx = 0; + + for &percentile in percentiles { + let threshold = (gas_used as f64 * percentile / 100.0) as u64; + + while tx_idx < tx_gas_and_tip.len() { + cumulative_gas += tx_gas_and_tip[tx_idx].0; + if cumulative_gas >= threshold { + break; + } + tx_idx += 1; + } + + result.push(tx_gas_and_tip.get(tx_idx).map(|&(_, tip)| tip).unwrap_or_default()); + } + + result +} + +// --------------------------------------------------------------------------- +// Block Queries +// --------------------------------------------------------------------------- + +pub(crate) async fn block( + hctx: HandlerCtx, + BlockParams(t, full): BlockParams, + ctx: StorageRpcCtx, +) -> Result, String> +where + T: Into, + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let id = t.into(); + let full = full.unwrap_or(false); + + let task = async move { + let cold = ctx.cold(); + let block_num = ctx.resolve_block_id(id).await.map_err(|e| e.to_string())?; + + let (header, txs) = tokio::try_join!( + cold.get_header_by_number(block_num), + cold.get_transactions_in_block(block_num), + ) + .map_err(|e| e.to_string())?; + + let Some(header) = header else { + return Ok(None); + }; + + let block_hash = header.hash_slow(); + + let transactions = if full { + let base_fee = header.base_fee_per_gas(); + let rpc_txs: Vec<_> = txs + .into_iter() + .enumerate() + .map(|(i, tx)| { + let meta = signet_storage_types::ConfirmationMeta::new( + block_num, block_hash, i as u64, + ); + build_rpc_transaction(tx, &meta, base_fee).map_err(|e| e.to_string()) + }) + .collect::>()?; + BlockTransactions::Full(rpc_txs) + } else { + let hashes: Vec = txs.iter().map(|tx| *tx.tx_hash()).collect(); + BlockTransactions::Hashes(hashes) + }; + + Ok(Some(Block { + header: alloy::rpc::types::Header::new(header), + transactions, + uncles: vec![], + withdrawals: None, + })) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn block_tx_count( + hctx: HandlerCtx, + (t,): (T,), + ctx: StorageRpcCtx, +) -> Result, String> +where + T: Into, + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let id = t.into(); + + let task = async move { + let cold = ctx.cold(); + let block_num = ctx.resolve_block_id(id).await.map_err(|e| e.to_string())?; + + cold.get_transaction_count(block_num) + .await + .map(|c| Some(U64::from(c))) + .map_err(|e| e.to_string()) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn block_receipts( + hctx: HandlerCtx, + (id,): (BlockId,), + ctx: StorageRpcCtx, +) -> Result>, String> +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let task = async move { + let cold = ctx.cold(); + let block_num = ctx.resolve_block_id(id).await.map_err(|e| e.to_string())?; + + let (header, txs, receipts) = tokio::try_join!( + cold.get_header_by_number(block_num), + cold.get_transactions_in_block(block_num), + cold.get_receipts_in_block(block_num), + ) + .map_err(|e| e.to_string())?; + + let Some(header) = header else { + return Ok(None); + }; + + let block_hash = header.hash_slow(); + let mut log_index: u64 = 0; + + let rpc_receipts = txs + .into_iter() + .zip(receipts.iter()) + .enumerate() + .map(|(idx, (tx, receipt))| { + let prev_cumulative = idx + .checked_sub(1) + .and_then(|i| receipts.get(i)) + .map(|r| r.inner.cumulative_gas_used()) + .unwrap_or_default(); + + let gas_used = receipt.inner.cumulative_gas_used() - prev_cumulative; + let offset = log_index; + log_index += receipt.inner.logs().len() as u64; + + build_receipt_from_parts( + tx, + &header, + block_hash, + idx as u64, + receipt.clone(), + gas_used, + offset, + ) + .map_err(|e| e.to_string()) + }) + .collect::, _>>()?; + + Ok(Some(rpc_receipts)) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn header_by( + hctx: HandlerCtx, + (t,): (T,), + ctx: StorageRpcCtx, +) -> Result, String> +where + T: Into, + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let id = t.into(); + + let task = async move { + let cold = ctx.cold(); + let block_num = ctx.resolve_block_id(id).await.map_err(|e| e.to_string())?; + + cold.get_header_by_number(block_num) + .await + .map(|h| h.map(alloy::rpc::types::Header::new)) + .map_err(|e| e.to_string()) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +// --------------------------------------------------------------------------- +// Transaction Queries +// --------------------------------------------------------------------------- + +pub(crate) async fn transaction_by_hash( + hctx: HandlerCtx, + (hash,): (B256,), + ctx: StorageRpcCtx, +) -> Result, String> +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let task = async move { + let cold = ctx.cold(); + let Some(confirmed) = cold.get_tx_by_hash(hash).await.map_err(|e| e.to_string())? else { + return Ok(None); + }; + + let (tx, meta) = confirmed.into_parts(); + + // Fetch header for base_fee + let header = + cold.get_header_by_number(meta.block_number()).await.map_err(|e| e.to_string())?; + let base_fee = header.and_then(|h| h.base_fee_per_gas()); + + build_rpc_transaction(tx, &meta, base_fee).map(Some).map_err(|e| e.to_string()) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn raw_transaction_by_hash( + hctx: HandlerCtx, + (hash,): (B256,), + ctx: StorageRpcCtx, +) -> Result, String> +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let task = async move { + ctx.cold() + .get_tx_by_hash(hash) + .await + .map(|opt| opt.map(|c| c.into_inner().encoded_2718().into())) + .map_err(|e| e.to_string()) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn tx_by_block_and_index( + hctx: HandlerCtx, + (t, index): (T, U64), + ctx: StorageRpcCtx, +) -> Result, String> +where + T: Into, + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let id = t.into(); + + let task = async move { + let cold = ctx.cold(); + let block_num = ctx.resolve_block_id(id).await.map_err(|e| e.to_string())?; + + let Some(confirmed) = cold + .get_tx_by_block_and_index(block_num, index.to::()) + .await + .map_err(|e| e.to_string())? + else { + return Ok(None); + }; + + let (tx, meta) = confirmed.into_parts(); + let header = + cold.get_header_by_number(meta.block_number()).await.map_err(|e| e.to_string())?; + let base_fee = header.and_then(|h| h.base_fee_per_gas()); + + build_rpc_transaction(tx, &meta, base_fee).map(Some).map_err(|e| e.to_string()) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn raw_tx_by_block_and_index( + hctx: HandlerCtx, + (t, index): (T, U64), + ctx: StorageRpcCtx, +) -> Result, String> +where + T: Into, + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let id = t.into(); + + let task = async move { + let cold = ctx.cold(); + let block_num = ctx.resolve_block_id(id).await.map_err(|e| e.to_string())?; + + cold.get_tx_by_block_and_index(block_num, index.to::()) + .await + .map(|opt| opt.map(|c| c.into_inner().encoded_2718().into())) + .map_err(|e| e.to_string()) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn transaction_receipt( + hctx: HandlerCtx, + (hash,): (B256,), + ctx: StorageRpcCtx, +) -> Result, String> +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let task = async move { + let Some(receipt_ctx) = ctx + .cold() + .get_receipt_with_context(ReceiptSpecifier::TxHash(hash)) + .await + .map_err(|e| e.to_string())? + else { + return Ok(None); + }; + + build_receipt(receipt_ctx).map(Some).map_err(|e| e.to_string()) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +// --------------------------------------------------------------------------- +// Account State (Hot Storage) +// --------------------------------------------------------------------------- + +pub(crate) async fn balance( + hctx: HandlerCtx, + AddrWithBlock(address, block): AddrWithBlock, + ctx: StorageRpcCtx, +) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let block = block.unwrap_or(BlockId::latest()); + + let task = async move { + let height = ctx.resolve_block_id(block).await.map_err(|e| e.to_string())?; + + let reader = ctx.hot_reader().map_err(|e| e.to_string())?; + let acct = + reader.get_account_at_height(&address, Some(height)).map_err(|e| e.to_string())?; + + Ok(acct.map(|a| a.balance).unwrap_or_default()) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn storage_at( + hctx: HandlerCtx, + StorageAtArgs(address, key, block): StorageAtArgs, + ctx: StorageRpcCtx, +) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let block = block.unwrap_or(BlockId::latest()); + + let task = async move { + let height = ctx.resolve_block_id(block).await.map_err(|e| e.to_string())?; + + let reader = ctx.hot_reader().map_err(|e| e.to_string())?; + let val = reader + .get_storage_at_height(&address, &key, Some(height)) + .map_err(|e| e.to_string())?; + + Ok(val.unwrap_or_default().to_be_bytes().into()) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn addr_tx_count( + hctx: HandlerCtx, + AddrWithBlock(address, block): AddrWithBlock, + ctx: StorageRpcCtx, +) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let block = block.unwrap_or(BlockId::latest()); + + let task = async move { + let height = ctx.resolve_block_id(block).await.map_err(|e| e.to_string())?; + + let reader = ctx.hot_reader().map_err(|e| e.to_string())?; + let acct = + reader.get_account_at_height(&address, Some(height)).map_err(|e| e.to_string())?; + + Ok(U64::from(acct.map(|a| a.nonce).unwrap_or_default())) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn code_at( + hctx: HandlerCtx, + AddrWithBlock(address, block): AddrWithBlock, + ctx: StorageRpcCtx, +) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let block = block.unwrap_or(BlockId::latest()); + + let task = async move { + let height = ctx.resolve_block_id(block).await.map_err(|e| e.to_string())?; + + let reader = ctx.hot_reader().map_err(|e| e.to_string())?; + let acct = + reader.get_account_at_height(&address, Some(height)).map_err(|e| e.to_string())?; + + let Some(acct) = acct else { + return Ok(alloy::primitives::Bytes::new()); + }; + + let Some(code_hash) = acct.bytecode_hash else { + return Ok(alloy::primitives::Bytes::new()); + }; + + let code = reader.get_bytecode(&code_hash).map_err(|e| e.to_string())?; + + Ok(code.map(|c| c.original_bytes()).unwrap_or_default()) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +// --------------------------------------------------------------------------- +// EVM Execution +// --------------------------------------------------------------------------- + +pub(crate) async fn call( + hctx: HandlerCtx, + TxParams(mut request, block, state_overrides, block_overrides): TxParams, + ctx: StorageRpcCtx, +) -> ResponsePayload +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let max_gas = ctx.config().rpc_gas_cap; + normalize_gas_stateless(&mut request, max_gas); + + let id = block.unwrap_or(BlockId::latest()); + let span = trace_span!("eth_call", block_id = %id); + + let task = async move { + let EvmBlockContext { header, db } = response_tri!(ctx.resolve_evm_block(id).await); + + let trevm = signet_evm::signet_evm(db, ctx.constants().clone()) + .fill_cfg(&CfgFiller(ctx.chain_id())) + .fill_block(&header); + + let trevm = response_tri!(trevm.maybe_apply_state_overrides(state_overrides.as_ref())) + .maybe_apply_block_overrides(block_overrides.as_deref()) + .fill_tx(&request); + + let mut trevm = trevm; + let new_gas = response_tri!(trevm.cap_tx_gas()); + if Some(new_gas) != request.gas { + debug!(req_gas = ?request.gas, new_gas, "capping gas for call"); + } + + let result = response_tri!(trevm.call().map_err(signet_evm::EvmErrored::into_error)); + + match result.0 { + trevm::revm::context::result::ExecutionResult::Success { output, .. } => { + ResponsePayload::Success(output.data().clone()) + } + trevm::revm::context::result::ExecutionResult::Revert { output, .. } => { + ResponsePayload::internal_error_with_message_and_obj( + "execution reverted".into(), + output.clone().into(), + ) + } + trevm::revm::context::result::ExecutionResult::Halt { reason, .. } => { + ResponsePayload::internal_error_with_message_and_obj( + "execution halted".into(), + format!("{reason:?}").into(), + ) + } + } + } + .instrument(span); + + await_handler!(@response_option hctx.spawn_blocking(task)) +} + +pub(crate) async fn estimate_gas( + hctx: HandlerCtx, + TxParams(mut request, block, state_overrides, block_overrides): TxParams, + ctx: StorageRpcCtx, +) -> ResponsePayload +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let max_gas = ctx.config().rpc_gas_cap; + normalize_gas_stateless(&mut request, max_gas); + + let id = block.unwrap_or(BlockId::pending()); + let span = trace_span!("eth_estimateGas", block_id = %id); + + let task = async move { + let EvmBlockContext { header, db } = response_tri!(ctx.resolve_evm_block(id).await); + + let trevm = signet_evm::signet_evm(db, ctx.constants().clone()) + .fill_cfg(&CfgFiller(ctx.chain_id())) + .fill_block(&header); + + let trevm = response_tri!(trevm.maybe_apply_state_overrides(state_overrides.as_ref())) + .maybe_apply_block_overrides(block_overrides.as_deref()) + .fill_tx(&request); + + let (estimate, _) = + response_tri!(trevm.estimate_gas().map_err(signet_evm::EvmErrored::into_error)); + + match estimate { + EstimationResult::Success { limit, .. } => ResponsePayload::Success(U64::from(limit)), + EstimationResult::Revert { reason, .. } => { + ResponsePayload::internal_error_with_message_and_obj( + "execution reverted".into(), + reason.clone().into(), + ) + } + EstimationResult::Halt { reason, .. } => { + ResponsePayload::internal_error_with_message_and_obj( + "execution halted".into(), + format!("{reason:?}").into(), + ) + } + } + } + .instrument(span); + + await_handler!(@response_option hctx.spawn_blocking(task)) +} + +// --------------------------------------------------------------------------- +// Transaction Submission +// --------------------------------------------------------------------------- + +pub(crate) async fn send_raw_transaction( + hctx: HandlerCtx, + (tx,): (alloy::primitives::Bytes,), + ctx: StorageRpcCtx, +) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let Some(tx_cache) = ctx.tx_cache().cloned() else { + return Err("tx-cache URL not provided".to_string()); + }; + + let task = |hctx: HandlerCtx| async move { + let envelope = alloy::consensus::TxEnvelope::decode_2718(&mut tx.as_ref()) + .map_err(|e| e.to_string())?; + + let hash = *envelope.tx_hash(); + hctx.spawn(async move { + if let Err(e) = tx_cache.forward_raw_transaction(envelope).await { + tracing::warn!(%hash, err = %e, "failed to forward raw transaction"); + } + }); + + Ok(hash) + }; + + await_handler!(@option hctx.spawn_blocking_with_ctx(task)) +} + +// --------------------------------------------------------------------------- +// Logs +// --------------------------------------------------------------------------- + +/// Maximum headers fetched per batch when scanning bloom filters. +const MAX_HEADERS_RANGE: u64 = 1_000; + +pub(crate) async fn get_logs( + hctx: HandlerCtx, + (filter,): (Filter,), + ctx: StorageRpcCtx, +) -> Result, String> +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let task = async move { + let cold = ctx.cold(); + + // Build bloom filters for efficient block-level filtering. + let address_filter = FilteredParams::address_filter(&filter.address); + let topics_filter = FilteredParams::topics_filter(&filter.topics); + + match filter.block_option { + alloy::rpc::types::FilterBlockOption::AtBlockHash(hash) => { + let header = cold + .get_header_by_hash(hash) + .await + .map_err(|e| e.to_string())? + .ok_or_else(|| format!("block not found: {hash}"))?; + + if !FilteredParams::matches_address(header.logs_bloom, &address_filter) + || !FilteredParams::matches_topics(header.logs_bloom, &topics_filter) + { + return Ok(vec![]); + } + + let block_num = header.number; + let (txs, receipts) = tokio::try_join!( + cold.get_transactions_in_block(block_num), + cold.get_receipts_in_block(block_num), + ) + .map_err(|e| e.to_string())?; + + Ok(collect_matching_logs(&header, hash, &txs, &receipts, &filter)) + } + + alloy::rpc::types::FilterBlockOption::Range { from_block, to_block } => { + let from = from_block.map(|b| ctx.resolve_block_tag(b)).unwrap_or(0); + let to = to_block + .map(|b| ctx.resolve_block_tag(b)) + .unwrap_or_else(|| ctx.tags().latest()); + + if from > to { + return Err("fromBlock must not exceed toBlock".to_string()); + } + let max_blocks = ctx.config().max_blocks_per_filter; + if to - from > max_blocks { + return Err(format!("query exceeds max block range ({max_blocks})")); + } + + let mut all_logs = Vec::new(); + + for (chunk_start, chunk_end) in + BlockRangeInclusiveIter::new(from..=to, MAX_HEADERS_RANGE) + { + let specs: Vec<_> = + (chunk_start..=chunk_end).map(HeaderSpecifier::Number).collect(); + + let headers = cold.get_headers(specs).await.map_err(|e| e.to_string())?; + + for (offset, maybe_header) in headers.into_iter().enumerate() { + let Some(header) = maybe_header else { + continue; + }; + + if !FilteredParams::matches_address(header.logs_bloom, &address_filter) + || !FilteredParams::matches_topics(header.logs_bloom, &topics_filter) + { + continue; + } + + let block_num = chunk_start + offset as u64; + let block_hash = header.hash_slow(); + + let (txs, receipts) = tokio::try_join!( + cold.get_transactions_in_block(block_num), + cold.get_receipts_in_block(block_num), + ) + .map_err(|e| e.to_string())?; + + let logs = + collect_matching_logs(&header, block_hash, &txs, &receipts, &filter); + all_logs.extend(logs); + + let max_logs = ctx.config().max_logs_per_response; + if max_logs > 0 && all_logs.len() > max_logs { + return Err(format!( + "query exceeds max logs per response ({max_logs})" + )); + } + } + } + + Ok(all_logs) + } + } + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +/// Extract logs from a block's receipts that match the filter's address and topic criteria. +pub(crate) fn collect_matching_logs( + header: &alloy::consensus::Header, + block_hash: B256, + txs: &[signet_storage_types::TransactionSigned], + receipts: &[signet_storage_types::Receipt], + filter: &Filter, +) -> Vec { + let mut logs = Vec::new(); + let mut log_index: u64 = 0; + + for (tx_idx, (tx, receipt)) in txs.iter().zip(receipts.iter()).enumerate() { + let tx_hash = *tx.tx_hash(); + + for log in &receipt.inner.logs { + if filter.matches_address(log.address) && filter.matches_topics(log.topics()) { + logs.push(Log { + inner: log.clone(), + block_hash: Some(block_hash), + block_number: Some(header.number), + block_timestamp: Some(header.timestamp), + transaction_hash: Some(tx_hash), + transaction_index: Some(tx_idx as u64), + log_index: Some(log_index), + removed: false, + }); + } + log_index += 1; + } + } + + logs +} + +// --------------------------------------------------------------------------- +// Filters +// --------------------------------------------------------------------------- + +pub(crate) async fn new_filter( + hctx: HandlerCtx, + (filter,): (Filter,), + ctx: StorageRpcCtx, +) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let task = async move { + let latest = ctx.tags().latest(); + Ok(ctx.filter_manager().install_log_filter(latest, filter)) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn new_block_filter( + hctx: HandlerCtx, + ctx: StorageRpcCtx, +) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let task = async move { + let latest = ctx.tags().latest(); + Ok(ctx.filter_manager().install_block_filter(latest)) + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +pub(crate) async fn uninstall_filter( + (id,): (U64,), + ctx: StorageRpcCtx, +) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + Ok(ctx.filter_manager().uninstall(id).is_some()) +} + +pub(crate) async fn get_filter_changes( + hctx: HandlerCtx, + (id,): (U64,), + ctx: StorageRpcCtx, +) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let task = async move { + let fm = ctx.filter_manager(); + let mut entry = fm.get_mut(id).ok_or_else(|| format!("filter not found: {id}"))?; + + let latest = ctx.tags().latest(); + let start = entry.next_start_block(); + + if start > latest { + entry.mark_polled(latest); + return Ok(entry.empty_output()); + } + + let cold = ctx.cold(); + + if entry.is_block() { + let specs: Vec<_> = (start..=latest).map(HeaderSpecifier::Number).collect(); + let headers = cold.get_headers(specs).await.map_err(|e| e.to_string())?; + let hashes: Vec = headers.into_iter().flatten().map(|h| h.hash_slow()).collect(); + entry.mark_polled(latest); + Ok(FilterOutput::from(hashes)) + } else { + let filter = entry.as_filter().cloned().unwrap(); + let address_filter = FilteredParams::address_filter(&filter.address); + let topics_filter = FilteredParams::topics_filter(&filter.topics); + + let mut all_logs = Vec::new(); + + for (chunk_start, chunk_end) in + BlockRangeInclusiveIter::new(start..=latest, MAX_HEADERS_RANGE) + { + let specs: Vec<_> = + (chunk_start..=chunk_end).map(HeaderSpecifier::Number).collect(); + let headers = cold.get_headers(specs).await.map_err(|e| e.to_string())?; + + for (offset, maybe_header) in headers.into_iter().enumerate() { + let Some(header) = maybe_header else { continue }; + + if !FilteredParams::matches_address(header.logs_bloom, &address_filter) + || !FilteredParams::matches_topics(header.logs_bloom, &topics_filter) + { + continue; + } + + let block_num = chunk_start + offset as u64; + let block_hash = header.hash_slow(); + + let (txs, receipts) = tokio::try_join!( + cold.get_transactions_in_block(block_num), + cold.get_receipts_in_block(block_num), + ) + .map_err(|e| e.to_string())?; + + all_logs.extend(collect_matching_logs( + &header, block_hash, &txs, &receipts, &filter, + )); + } + } + + entry.mark_polled(latest); + Ok(FilterOutput::from(all_logs)) + } + }; + + await_handler!(@option hctx.spawn_blocking(task)) +} + +// --------------------------------------------------------------------------- +// Subscriptions +// --------------------------------------------------------------------------- + +pub(crate) async fn subscribe( + hctx: HandlerCtx, + SubscribeArgs(kind, filter): SubscribeArgs, + ctx: StorageRpcCtx, +) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let interest = match kind { + alloy::rpc::types::pubsub::SubscriptionKind::NewHeads => InterestKind::Block, + alloy::rpc::types::pubsub::SubscriptionKind::Logs => { + let f = filter.unwrap_or_default(); + InterestKind::Log(f) + } + other => { + return Err(format!("unsupported subscription kind: {other:?}")); + } + }; + + ctx.sub_manager() + .subscribe(&hctx, interest) + .ok_or_else(|| "notifications not enabled on this transport".to_string()) +} + +pub(crate) async fn unsubscribe((id,): (U64,), ctx: StorageRpcCtx) -> Result +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + Ok(ctx.sub_manager().unsubscribe(id)) +} diff --git a/crates/rpc-storage/src/eth/error.rs b/crates/rpc-storage/src/eth/error.rs new file mode 100644 index 0000000..93322e8 --- /dev/null +++ b/crates/rpc-storage/src/eth/error.rs @@ -0,0 +1,58 @@ +//! Error types for the storage-backed ETH RPC. + +use alloy::{eips::BlockId, primitives::Bytes}; +use serde::Serialize; + +/// Errors from the storage-backed ETH RPC. +#[derive(Debug, thiserror::Error)] +pub enum EthError { + /// Cold storage error. + #[error("cold storage: {0}")] + Cold(#[from] signet_cold::ColdStorageError), + /// Hot storage error. + #[error("hot storage: {0}")] + Hot(#[from] signet_storage::StorageError), + /// Block resolution error. + #[error("resolve: {0}")] + Resolve(#[from] crate::resolve::ResolveError), + /// Invalid transaction signature. + #[error("invalid transaction signature")] + InvalidSignature, + /// Block not found. + #[error("block not found: {0}")] + BlockNotFound(BlockId), + /// EVM execution error. + #[error("evm: {0}")] + Evm(String), +} + +impl EthError { + /// Convert the error to a string for JSON-RPC responses. + pub fn into_string(self) -> String { + self.to_string() + } +} + +/// Error data for `eth_call` and `eth_estimateGas` responses. +/// +/// Serialized as JSON in the error response `data` field. +#[derive(Debug, Clone, Serialize)] +#[serde(untagged)] +pub(crate) enum CallErrorData { + /// Revert data bytes. + Bytes(Bytes), + /// Error message string. + String(String), +} + +impl From for CallErrorData { + fn from(b: Bytes) -> Self { + Self::Bytes(b) + } +} + +impl From for CallErrorData { + fn from(s: String) -> Self { + Self::String(s) + } +} diff --git a/crates/rpc-storage/src/eth/helpers.rs b/crates/rpc-storage/src/eth/helpers.rs new file mode 100644 index 0000000..38ae472 --- /dev/null +++ b/crates/rpc-storage/src/eth/helpers.rs @@ -0,0 +1,290 @@ +//! Parameter types, macros, and utility helpers for ETH RPC endpoints. + +use crate::eth::error::EthError; +use alloy::{ + consensus::{ + BlockHeader, ReceiptEnvelope, ReceiptWithBloom, Transaction, TxReceipt, + transaction::{Recovered, SignerRecoverable}, + }, + eips::BlockId, + primitives::{Address, TxKind, U256}, + rpc::types::{ + BlockOverrides, Log, TransactionReceipt, TransactionRequest, state::StateOverride, + }, +}; +use serde::Deserialize; +use signet_cold::ReceiptContext; +use signet_storage_types::ConfirmationMeta; +use trevm::MIN_TRANSACTION_GAS; + +/// Args for `eth_call` and `eth_estimateGas`. +#[derive(Debug, Deserialize)] +pub(crate) struct TxParams( + pub TransactionRequest, + #[serde(default)] pub Option, + #[serde(default)] pub Option, + #[serde(default)] pub Option>, +); + +/// Args for `eth_getBlockByHash` and `eth_getBlockByNumber`. +#[derive(Debug, Deserialize)] +pub(crate) struct BlockParams(pub T, #[serde(default)] pub Option); + +/// Args for `eth_getStorageAt`. +#[derive(Debug, Deserialize)] +pub(crate) struct StorageAtArgs(pub Address, pub U256, #[serde(default)] pub Option); + +/// Args for `eth_getBalance`, `eth_getTransactionCount`, and `eth_getCode`. +#[derive(Debug, Deserialize)] +pub(crate) struct AddrWithBlock(pub Address, #[serde(default)] pub Option); + +/// Args for `eth_feeHistory`. +#[derive(Debug, Deserialize)] +pub(crate) struct FeeHistoryArgs( + pub alloy::primitives::U64, + pub alloy::eips::BlockNumberOrTag, + #[serde(default)] pub Option>, +); + +/// Args for `eth_subscribe`. +#[derive(Debug, Deserialize)] +pub(crate) struct SubscribeArgs( + pub alloy::rpc::types::pubsub::SubscriptionKind, + #[serde(default)] pub Option>, +); + +/// Normalize transaction request gas without making DB reads. +/// +/// - If the gas is below `MIN_TRANSACTION_GAS`, set it to `None` +/// - If the gas is above the `rpc_gas_cap`, set it to the `rpc_gas_cap` +pub(crate) const fn normalize_gas_stateless(request: &mut TransactionRequest, max_gas: u64) { + match request.gas { + Some(..MIN_TRANSACTION_GAS) => request.gas = None, + Some(val) if val > max_gas => request.gas = Some(max_gas), + _ => {} + } +} + +/// Await a handler task, returning an error string on panic/cancel. +macro_rules! await_handler { + ($h:expr) => { + match $h.await { + Ok(res) => res, + Err(_) => return Err("task panicked or cancelled".to_string()), + } + }; + + (@option $h:expr) => { + match $h.await { + Ok(Some(res)) => res, + _ => return Err("task panicked or cancelled".to_string()), + } + }; + + (@response_option $h:expr) => { + match $h.await { + Ok(Some(res)) => res, + _ => { + return ajj::ResponsePayload::internal_error_message(std::borrow::Cow::Borrowed( + "task panicked or cancelled", + )) + } + } + }; +} +pub(crate) use await_handler; + +/// Try-operator for `ResponsePayload`. +macro_rules! response_tri { + ($h:expr) => { + match $h { + Ok(res) => res, + Err(err) => return ajj::ResponsePayload::internal_error_message(err.to_string().into()), + } + }; +} +pub(crate) use response_tri; + +/// An iterator that yields inclusive block ranges of a given step size. +#[derive(Debug)] +pub(crate) struct BlockRangeInclusiveIter { + iter: std::iter::StepBy>, + step: u64, + end: u64, +} + +impl BlockRangeInclusiveIter { + pub(crate) fn new(range: std::ops::RangeInclusive, step: u64) -> Self { + Self { end: *range.end(), iter: range.step_by(step as usize + 1), step } + } +} + +impl Iterator for BlockRangeInclusiveIter { + type Item = (u64, u64); + + fn next(&mut self) -> Option { + let start = self.iter.next()?; + let end = (start + self.step).min(self.end); + if start > end { + return None; + } + Some((start, end)) + } +} + +/// Small wrapper implementing [`trevm::Cfg`] to set the chain ID. +pub(crate) struct CfgFiller(pub u64); + +impl trevm::Cfg for CfgFiller { + fn fill_cfg_env(&self, cfg: &mut trevm::revm::context::CfgEnv) { + cfg.chain_id = self.0; + } +} + +/// Recover the sender of a transaction, falling back to [`MagicSig`]. +/// +/// [`MagicSig`]: signet_types::MagicSig +pub(crate) fn recover_sender( + tx: &signet_storage_types::TransactionSigned, +) -> Result { + signet_types::MagicSig::try_from_signature(tx.signature()) + .map(|s| s.rollup_sender()) + .or_else(|| SignerRecoverable::recover_signer_unchecked(tx).ok()) + .ok_or(EthError::InvalidSignature) +} + +/// Build an [`alloy::rpc::types::Transaction`] from cold storage types. +pub(crate) fn build_rpc_transaction( + tx: signet_storage_types::TransactionSigned, + meta: &ConfirmationMeta, + base_fee: Option, +) -> Result { + let sender = recover_sender(&tx)?; + + // Convert EthereumTxEnvelope → TxEnvelope (EthereumTxEnvelope) + let tx_envelope: alloy::consensus::TxEnvelope = tx.into(); + let inner = Recovered::new_unchecked(tx_envelope, sender); + + let egp = base_fee + .map(|bf| inner.effective_tip_per_gas(bf).unwrap_or_default() as u64 + bf) + .unwrap_or_else(|| inner.max_fee_per_gas() as u64); + + Ok(alloy::rpc::types::Transaction { + inner, + block_hash: Some(meta.block_hash()), + block_number: Some(meta.block_number()), + transaction_index: Some(meta.transaction_index()), + effective_gas_price: Some(egp as u128), + }) +} + +/// Build a [`TransactionReceipt`] from a [`ReceiptContext`]. +pub(crate) fn build_receipt( + ctx: ReceiptContext, +) -> Result>, EthError> { + let (receipt, meta) = ctx.receipt.into_parts(); + let gas_used = receipt.inner.cumulative_gas_used() - ctx.prior_cumulative_gas; + + build_receipt_inner( + ctx.transaction, + &ctx.header, + &meta, + receipt, + gas_used, + 0, // log_index_offset: single receipt, no prior logs + ) +} + +/// Build a [`TransactionReceipt`] from individual components. +/// +/// Used by `eth_getBlockReceipts` where all receipts in the block are available. +pub(crate) fn build_receipt_from_parts( + tx: signet_storage_types::TransactionSigned, + header: &alloy::consensus::Header, + block_hash: alloy::primitives::B256, + tx_index: u64, + receipt: signet_storage_types::Receipt, + gas_used: u64, + log_index_offset: u64, +) -> Result>, EthError> { + let meta = ConfirmationMeta::new(header.number, block_hash, tx_index); + build_receipt_inner(tx, header, &meta, receipt, gas_used, log_index_offset) +} + +/// Shared receipt builder. +fn build_receipt_inner( + tx: signet_storage_types::TransactionSigned, + header: &alloy::consensus::Header, + meta: &ConfirmationMeta, + receipt: signet_storage_types::Receipt, + gas_used: u64, + log_index_offset: u64, +) -> Result>, EthError> { + let sender = recover_sender(&tx)?; + let tx_hash = *tx.tx_hash(); + + let logs_bloom = receipt.inner.bloom(); + let status = receipt.inner.status_or_post_state(); + let cumulative_gas_used = receipt.inner.cumulative_gas_used(); + + let logs: Vec = receipt + .inner + .logs + .into_iter() + .enumerate() + .map(|(i, log)| Log { + inner: log, + block_hash: Some(meta.block_hash()), + block_number: Some(meta.block_number()), + block_timestamp: Some(header.timestamp), + transaction_hash: Some(tx_hash), + transaction_index: Some(meta.transaction_index()), + log_index: Some(log_index_offset + i as u64), + removed: false, + }) + .collect(); + + let rpc_receipt = alloy::rpc::types::eth::Receipt { status, cumulative_gas_used, logs }; + + let (contract_address, to) = match tx.kind() { + TxKind::Create => (Some(sender.create(tx.nonce())), None), + TxKind::Call(addr) => (None, Some(Address(*addr))), + }; + + let base_fee = header.base_fee_per_gas(); + let egp = base_fee + .map(|bf| tx.effective_tip_per_gas(bf).unwrap_or_default() as u64 + bf) + .unwrap_or_else(|| tx.max_fee_per_gas() as u64); + + Ok(TransactionReceipt { + inner: build_receipt_envelope( + ReceiptWithBloom { receipt: rpc_receipt, logs_bloom }, + receipt.tx_type, + ), + transaction_hash: tx_hash, + transaction_index: Some(meta.transaction_index()), + block_hash: Some(meta.block_hash()), + block_number: Some(meta.block_number()), + from: sender, + to, + gas_used, + contract_address, + effective_gas_price: egp as u128, + blob_gas_price: None, + blob_gas_used: None, + }) +} + +/// Wrap a receipt in the appropriate [`ReceiptEnvelope`] variant. +const fn build_receipt_envelope( + receipt: ReceiptWithBloom>, + tx_type: alloy::consensus::TxType, +) -> ReceiptEnvelope { + match tx_type { + alloy::consensus::TxType::Legacy => ReceiptEnvelope::Legacy(receipt), + alloy::consensus::TxType::Eip2930 => ReceiptEnvelope::Eip2930(receipt), + alloy::consensus::TxType::Eip1559 => ReceiptEnvelope::Eip1559(receipt), + alloy::consensus::TxType::Eip4844 => ReceiptEnvelope::Eip4844(receipt), + alloy::consensus::TxType::Eip7702 => ReceiptEnvelope::Eip7702(receipt), + } +} diff --git a/crates/rpc-storage/src/eth/mod.rs b/crates/rpc-storage/src/eth/mod.rs new file mode 100644 index 0000000..463a9aa --- /dev/null +++ b/crates/rpc-storage/src/eth/mod.rs @@ -0,0 +1,91 @@ +//! ETH namespace RPC router backed by storage. + +mod endpoints; +use endpoints::{ + addr_tx_count, balance, block, block_number, block_receipts, block_tx_count, call, chain_id, + code_at, estimate_gas, fee_history, gas_price, get_filter_changes, get_logs, header_by, + max_priority_fee_per_gas, new_block_filter, new_filter, not_supported, raw_transaction_by_hash, + raw_tx_by_block_and_index, send_raw_transaction, storage_at, subscribe, transaction_by_hash, + transaction_receipt, tx_by_block_and_index, uninstall_filter, unsubscribe, +}; + +mod error; +pub use error::EthError; + +pub(crate) mod helpers; + +use crate::StorageRpcCtx; +use alloy::{eips::BlockNumberOrTag, primitives::B256}; +use signet_hot::HotKv; +use signet_hot::model::HotKvRead; +use trevm::revm::database::DBErrorMarker; + +/// Instantiate the `eth` API router backed by storage. +pub(crate) fn eth() -> ajj::Router> +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + ajj::Router::new() + .route("blockNumber", block_number::) + .route("chainId", chain_id::) + .route("getBlockByHash", block::) + .route("getBlockByNumber", block::) + .route("getBlockTransactionCountByHash", block_tx_count::) + .route("getBlockTransactionCountByNumber", block_tx_count::) + .route("getBlockReceipts", block_receipts::) + .route("getRawTransactionByHash", raw_transaction_by_hash::) + .route("getTransactionByHash", transaction_by_hash::) + .route("getRawTransactionByBlockHashAndIndex", raw_tx_by_block_and_index::) + .route( + "getRawTransactionByBlockNumberAndIndex", + raw_tx_by_block_and_index::, + ) + .route("getTransactionByBlockHashAndIndex", tx_by_block_and_index::) + .route("getTransactionByBlockNumberAndIndex", tx_by_block_and_index::) + .route("getTransactionReceipt", transaction_receipt::) + .route("getBlockHeaderByHash", header_by::) + .route("getBlockHeaderByNumber", header_by::) + .route("getBalance", balance::) + .route("getStorageAt", storage_at::) + .route("getTransactionCount", addr_tx_count::) + .route("getCode", code_at::) + .route("call", call::) + .route("estimateGas", estimate_gas::) + .route("sendRawTransaction", send_raw_transaction::) + .route("getLogs", get_logs::) + // --- + // Unsupported methods + // --- + .route("protocolVersion", not_supported) + .route("syncing", not_supported) + .route("gasPrice", gas_price::) + .route("maxPriorityFeePerGas", max_priority_fee_per_gas::) + .route("feeHistory", fee_history::) + .route("coinbase", not_supported) + .route("accounts", not_supported) + .route("blobBaseFee", not_supported) + .route("getUncleCountByBlockHash", not_supported) + .route("getUncleCountByBlockNumber", not_supported) + .route("getUncleByBlockHashAndIndex", not_supported) + .route("getUncleByBlockNumberAndIndex", not_supported) + .route("getWork", not_supported) + .route("hashrate", not_supported) + .route("mining", not_supported) + .route("submitHashrate", not_supported) + .route("submitWork", not_supported) + .route("sendTransaction", not_supported) + .route("sign", not_supported) + .route("signTransaction", not_supported) + .route("signTypedData", not_supported) + .route("getProof", not_supported) + .route("createAccessList", not_supported) + .route("newFilter", new_filter::) + .route("newBlockFilter", new_block_filter::) + .route("newPendingTransactionFilter", not_supported) + .route("uninstallFilter", uninstall_filter::) + .route("getFilterChanges", get_filter_changes::) + .route("getFilterLogs", get_filter_changes::) + .route("subscribe", subscribe::) + .route("unsubscribe", unsubscribe::) +} diff --git a/crates/rpc-storage/src/gas_oracle.rs b/crates/rpc-storage/src/gas_oracle.rs new file mode 100644 index 0000000..47c23c2 --- /dev/null +++ b/crates/rpc-storage/src/gas_oracle.rs @@ -0,0 +1,55 @@ +//! Cold-storage gas oracle for computing gas price suggestions. +//! +//! Reads recent block headers and transactions from cold storage to +//! compute a suggested tip cap based on recent transaction activity. + +use alloy::{consensus::Transaction, primitives::U256}; +use signet_cold::{ColdStorageError, ColdStorageReadHandle, HeaderSpecifier}; + +use crate::StorageRpcConfig; + +/// Suggest a tip cap based on recent transaction tips. +/// +/// Reads the last `gas_oracle_block_count` blocks from cold storage, +/// computes the effective tip per gas for each transaction, sorts all +/// tips, and returns the value at `gas_oracle_percentile`. +/// +/// Returns `U256::ZERO` if no transactions are found in the range. +pub(crate) async fn suggest_tip_cap( + cold: &ColdStorageReadHandle, + latest: u64, + config: &StorageRpcConfig, +) -> Result { + let block_count = config.gas_oracle_block_count.min(latest + 1); + let start = latest.saturating_sub(block_count - 1); + + let specs: Vec<_> = (start..=latest).map(HeaderSpecifier::Number).collect(); + let headers = cold.get_headers(specs).await?; + + let mut all_tips: Vec = Vec::new(); + + for (offset, maybe_header) in headers.into_iter().enumerate() { + let Some(header) = maybe_header else { continue }; + let base_fee = header.base_fee_per_gas.unwrap_or_default(); + let block_num = start + offset as u64; + + let txs = cold.get_transactions_in_block(block_num).await?; + + for tx in &txs { + if let Some(tip) = tx.effective_tip_per_gas(base_fee) { + all_tips.push(tip); + } + } + } + + if all_tips.is_empty() { + return Ok(U256::ZERO); + } + + all_tips.sort_unstable(); + + let index = ((config.gas_oracle_percentile / 100.0) * (all_tips.len() - 1) as f64) as usize; + let index = index.min(all_tips.len() - 1); + + Ok(U256::from(all_tips[index])) +} diff --git a/crates/rpc-storage/src/interest/filters.rs b/crates/rpc-storage/src/interest/filters.rs new file mode 100644 index 0000000..6d98fd0 --- /dev/null +++ b/crates/rpc-storage/src/interest/filters.rs @@ -0,0 +1,326 @@ +//! Filter management for `eth_newFilter` / `eth_getFilterChanges`. + +use crate::interest::InterestKind; +use alloy::{ + primitives::{B256, U64}, + rpc::types::{Filter, Log}, +}; +use dashmap::{DashMap, mapref::one::RefMut}; +use std::{ + collections::VecDeque, + sync::{ + Arc, Weak, + atomic::{AtomicU64, Ordering}, + }, + time::{Duration, Instant}, +}; +use tracing::trace; + +type FilterId = U64; + +/// Either type for filter outputs. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] +#[serde(untagged)] +#[allow(dead_code)] +pub(crate) enum Either { + /// Log + Log(Log), + /// Block hash + Block(B256), +} + +/// The output of a filter. +/// +/// This will be either a list of logs or a list of block hashes. Pending tx +/// filters are not supported by Signet. For convenience, there is a special +/// variant for empty results. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] +#[serde(untagged)] +pub(crate) enum FilterOutput { + /// Empty output. Holds a `[(); 0]` to make sure it serializes as an empty + /// array. + Empty([(); 0]), + /// Logs + Log(VecDeque), + /// Block hashes + Block(VecDeque), +} + +#[allow(dead_code)] +impl FilterOutput { + /// Create an empty filter output. + pub(crate) const fn empty() -> Self { + Self::Empty([]) + } + + /// True if this is an empty filter output. + pub(crate) fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// The length of this filter output. + pub(crate) fn len(&self) -> usize { + match self { + Self::Empty(_) => 0, + Self::Log(logs) => logs.len(), + Self::Block(blocks) => blocks.len(), + } + } + + /// Extend this filter output with another. + /// + /// # Panics + /// + /// If the two filter outputs are of different types. + pub(crate) fn extend(&mut self, other: Self) { + match (self, other) { + (Self::Log(logs), Self::Log(other_logs)) => logs.extend(other_logs), + (Self::Block(blocks), Self::Block(other_blocks)) => blocks.extend(other_blocks), + (_, Self::Empty(_)) => (), + (this @ Self::Empty(_), other) => *this = other, + _ => panic!("attempted to mix log and block outputs"), + } + } + + /// Pop a value from the front of the filter output. + pub(crate) fn pop_front(&mut self) -> Option { + match self { + Self::Log(logs) => logs.pop_front().map(Either::Log), + Self::Block(blocks) => blocks.pop_front().map(Either::Block), + Self::Empty(_) => None, + } + } +} + +impl From> for FilterOutput { + fn from(block_hashes: Vec) -> Self { + Self::Block(block_hashes.into()) + } +} + +impl From> for FilterOutput { + fn from(logs: Vec) -> Self { + Self::Log(logs.into()) + } +} + +impl FromIterator for FilterOutput { + fn from_iter>(iter: T) -> Self { + let inner: VecDeque<_> = iter.into_iter().collect(); + if inner.is_empty() { Self::empty() } else { Self::Log(inner) } + } +} + +impl FromIterator for FilterOutput { + fn from_iter>(iter: T) -> Self { + let inner: VecDeque<_> = iter.into_iter().collect(); + if inner.is_empty() { Self::empty() } else { Self::Block(inner) } + } +} + +/// An active filter. +/// +/// Records the filter details, the [`Instant`] at which the filter was last +/// polled, and the first block whose contents should be considered. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct ActiveFilter { + next_start_block: u64, + last_poll_time: Instant, + kind: InterestKind, +} + +impl core::fmt::Display for ActiveFilter { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!( + f, + "ActiveFilter {{ next_start_block: {}, ms_since_last_poll: {}, kind: {:?} }}", + self.next_start_block, + self.last_poll_time.elapsed().as_millis(), + self.kind + ) + } +} + +impl ActiveFilter { + /// True if this is a log filter. + #[allow(dead_code)] + pub(crate) const fn is_filter(&self) -> bool { + self.kind.is_filter() + } + + /// True if this is a block filter. + pub(crate) const fn is_block(&self) -> bool { + self.kind.is_block() + } + + /// Fallible cast to a filter. + pub(crate) const fn as_filter(&self) -> Option<&Filter> { + self.kind.as_filter() + } + + /// Mark the filter as having been polled at the given block. + pub(crate) fn mark_polled(&mut self, current_block: u64) { + self.next_start_block = current_block + 1; + self.last_poll_time = Instant::now(); + } + + /// Get the next start block for the filter. + pub(crate) const fn next_start_block(&self) -> u64 { + self.next_start_block + } + + /// Get the duration since the filter was last polled. + pub(crate) fn time_since_last_poll(&self) -> Duration { + self.last_poll_time.elapsed() + } + + /// Return an empty output of the same kind as this filter. + pub(crate) const fn empty_output(&self) -> FilterOutput { + self.kind.empty_output() + } +} + +/// Inner logic for [`FilterManager`]. +#[derive(Debug)] +pub(crate) struct FilterManagerInner { + current_id: AtomicU64, + filters: DashMap, +} + +impl FilterManagerInner { + /// Create a new filter manager. + fn new() -> Self { + // Start from 1, as 0 is weird in quantity encoding. + Self { current_id: AtomicU64::new(1), filters: DashMap::new() } + } + + /// Get the next filter ID. + fn next_id(&self) -> FilterId { + FilterId::from(self.current_id.fetch_add(1, Ordering::Relaxed)) + } + + /// Get a filter by ID. + pub(crate) fn get_mut(&self, id: FilterId) -> Option> { + self.filters.get_mut(&id) + } + + fn install(&self, current_block: u64, kind: InterestKind) -> FilterId { + let id = self.next_id(); + let next_start_block = current_block + 1; + let _ = self + .filters + .insert(id, ActiveFilter { next_start_block, last_poll_time: Instant::now(), kind }); + id + } + + /// Install a new log filter. + pub(crate) fn install_log_filter(&self, current_block: u64, filter: Filter) -> FilterId { + self.install(current_block, InterestKind::Log(Box::new(filter))) + } + + /// Install a new block filter. + pub(crate) fn install_block_filter(&self, current_block: u64) -> FilterId { + self.install(current_block, InterestKind::Block) + } + + /// Uninstall a filter, returning the kind of filter that was uninstalled. + pub(crate) fn uninstall(&self, id: FilterId) -> Option<(U64, ActiveFilter)> { + self.filters.remove(&id) + } + + /// Clean stale filters that have not been polled in a while. + fn clean_stale(&self, older_than: Duration) { + self.filters.retain(|_, filter| filter.time_since_last_poll() < older_than); + } +} + +/// Manager for filters. +/// +/// The manager tracks active filters, and periodically cleans stale filters. +/// Filters are stored in a [`DashMap`] that maps filter IDs to active filters. +/// Filter IDs are assigned sequentially, starting from 1. +/// +/// Calling [`Self::new`] spawns a task that periodically cleans stale filters. +/// This task runs on a separate thread to avoid [`DashMap::retain`] deadlock. +/// See [`DashMap`] documentation for more information. +#[derive(Debug, Clone)] +pub(crate) struct FilterManager { + inner: Arc, +} + +impl FilterManager { + /// Create a new filter manager. Spawn a task to clean stale filters. + pub(crate) fn new(clean_interval: Duration, age_limit: Duration) -> Self { + let inner = Arc::new(FilterManagerInner::new()); + let manager = Self { inner }; + FilterCleanTask::new(Arc::downgrade(&manager.inner), clean_interval, age_limit).spawn(); + manager + } +} + +impl std::ops::Deref for FilterManager { + type Target = FilterManagerInner; + + fn deref(&self) -> &Self::Target { + self.inner.deref() + } +} + +/// Task to clean up unpolled filters. +/// +/// This task runs on a separate thread to avoid [`DashMap::retain`] deadlocks. +#[derive(Debug)] +struct FilterCleanTask { + manager: Weak, + sleep: Duration, + age_limit: Duration, +} + +impl FilterCleanTask { + /// Create a new filter cleaner task. + const fn new(manager: Weak, sleep: Duration, age_limit: Duration) -> Self { + Self { manager, sleep, age_limit } + } + + /// Run the task. This task runs on a separate thread, which ensures that + /// [`DashMap::retain`]'s deadlock condition is not met. See [`DashMap`] + /// documentation for more information. + fn spawn(self) { + std::thread::spawn(move || { + loop { + std::thread::sleep(self.sleep); + trace!("cleaning stale filters"); + match self.manager.upgrade() { + Some(manager) => manager.clean_stale(self.age_limit), + None => break, + } + } + }); + } +} + +// Some code in this file has been copied and modified from reth +// +// The original license is included below: +// +// The MIT License (MIT) +// +// Copyright (c) 2022-2025 Reth Contributors +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +//. +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. diff --git a/crates/rpc-storage/src/interest/kind.rs b/crates/rpc-storage/src/interest/kind.rs new file mode 100644 index 0000000..ff2392e --- /dev/null +++ b/crates/rpc-storage/src/interest/kind.rs @@ -0,0 +1,106 @@ +//! Filter kinds for subscriptions and polling filters. + +use crate::interest::{NewBlockNotification, filters::FilterOutput, subs::SubscriptionBuffer}; +use alloy::rpc::types::{Filter, Header, Log}; +use std::collections::VecDeque; + +/// The different kinds of filters that can be created. +/// +/// Pending tx filters are not supported by Signet. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum InterestKind { + /// Log filter with a user-supplied [`Filter`]. + Log(Box), + /// New-block filter. + Block, +} + +impl InterestKind { + /// True if this is a log filter. + #[allow(dead_code)] + pub(crate) const fn is_filter(&self) -> bool { + matches!(self, Self::Log(_)) + } + + /// True if this is a block filter. + pub(crate) const fn is_block(&self) -> bool { + matches!(self, Self::Block) + } + + /// Fallible cast to a filter. + pub(crate) const fn as_filter(&self) -> Option<&Filter> { + match self { + Self::Log(f) => Some(f), + _ => None, + } + } + + fn apply_block(notif: &NewBlockNotification) -> SubscriptionBuffer { + let header = Header { + hash: notif.header.hash_slow(), + inner: notif.header.clone(), + total_difficulty: None, + size: None, + }; + SubscriptionBuffer::Block(VecDeque::from([header])) + } + + fn apply_filter(&self, notif: &NewBlockNotification) -> SubscriptionBuffer { + let filter = self.as_filter().unwrap(); + let block_hash = notif.header.hash_slow(); + let block_number = notif.header.number; + let block_timestamp = notif.header.timestamp; + + let logs: VecDeque = notif + .receipts + .iter() + .enumerate() + .flat_map(|(tx_idx, receipt)| { + let tx_hash = *notif.transactions[tx_idx].tx_hash(); + receipt.inner.logs.iter().enumerate().filter_map(move |(log_idx, log)| { + if filter.matches(log) { + Some(Log { + inner: log.clone(), + block_hash: Some(block_hash), + block_number: Some(block_number), + block_timestamp: Some(block_timestamp), + transaction_hash: Some(tx_hash), + transaction_index: Some(tx_idx as u64), + log_index: Some(log_idx as u64), + removed: false, + }) + } else { + None + } + }) + }) + .collect(); + + SubscriptionBuffer::Log(logs) + } + + /// Apply the filter to a [`NewBlockNotification`], producing a + /// subscription buffer. + pub(crate) fn filter_notification_for_sub( + &self, + notif: &NewBlockNotification, + ) -> SubscriptionBuffer { + if self.is_block() { Self::apply_block(notif) } else { self.apply_filter(notif) } + } + + /// Return an empty output of the same kind as this filter. + pub(crate) const fn empty_output(&self) -> FilterOutput { + match self { + Self::Log(_) => FilterOutput::Log(VecDeque::new()), + Self::Block => FilterOutput::Block(VecDeque::new()), + } + } + + /// Return an empty subscription buffer of the same kind as this filter. + pub(crate) const fn empty_sub_buffer(&self) -> SubscriptionBuffer { + match self { + Self::Log(_) => SubscriptionBuffer::Log(VecDeque::new()), + Self::Block => SubscriptionBuffer::Block(VecDeque::new()), + } + } +} diff --git a/crates/rpc-storage/src/interest/mod.rs b/crates/rpc-storage/src/interest/mod.rs new file mode 100644 index 0000000..3c611d9 --- /dev/null +++ b/crates/rpc-storage/src/interest/mod.rs @@ -0,0 +1,22 @@ +//! Filter and subscription management for block/log notifications. + +mod filters; +pub(crate) use filters::{FilterManager, FilterOutput}; +mod kind; +pub(crate) use kind::InterestKind; +mod subs; +pub(crate) use subs::SubscriptionManager; + +/// Notification sent when a new block is available in storage. +/// +/// The caller constructs and sends these via a +/// [`tokio::sync::broadcast::Sender`]. +#[derive(Debug, Clone)] +pub struct NewBlockNotification { + /// The block header. + pub header: alloy::consensus::Header, + /// Transactions in the block. + pub transactions: Vec, + /// Receipts for the block. + pub receipts: Vec, +} diff --git a/crates/rpc-storage/src/interest/subs.rs b/crates/rpc-storage/src/interest/subs.rs new file mode 100644 index 0000000..8fb7b04 --- /dev/null +++ b/crates/rpc-storage/src/interest/subs.rs @@ -0,0 +1,333 @@ +//! Subscription management for `eth_subscribe` / `eth_unsubscribe`. + +use crate::interest::{InterestKind, NewBlockNotification}; +use ajj::{HandlerCtx, serde_json}; +use alloy::{primitives::U64, rpc::types::Log}; +use dashmap::DashMap; +use std::{ + cmp::min, + collections::VecDeque, + future::pending, + sync::{ + Arc, Weak, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, +}; +use tokio::sync::broadcast::{self, error::RecvError}; +use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; +use tracing::{Instrument, debug, debug_span, enabled, trace}; + +/// Either type for subscription outputs. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] +#[serde(untagged)] +pub(crate) enum Either { + /// A log entry. + Log(Box), + /// A block header. + Block(Box), +} + +/// Buffer for subscription outputs. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum SubscriptionBuffer { + /// Log buffer. + Log(VecDeque), + /// Block header buffer. + Block(VecDeque), +} + +impl SubscriptionBuffer { + /// True if the buffer is empty. + pub(crate) fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Get the number of items in the buffer. + pub(crate) fn len(&self) -> usize { + match self { + Self::Log(buf) => buf.len(), + Self::Block(buf) => buf.len(), + } + } + + /// Extend this buffer with another buffer. + /// + /// # Panics + /// + /// Panics if the buffers are of different types. + pub(crate) fn extend(&mut self, other: Self) { + match (self, other) { + (Self::Log(buf), Self::Log(other)) => buf.extend(other), + (Self::Block(buf), Self::Block(other)) => buf.extend(other), + _ => panic!("mismatched buffer types"), + } + } + + /// Pop the front of the buffer. + pub(crate) fn pop_front(&mut self) -> Option { + match self { + Self::Log(buf) => buf.pop_front().map(|log| Either::Log(Box::new(log))), + Self::Block(buf) => buf.pop_front().map(|header| Either::Block(Box::new(header))), + } + } +} + +impl From> for SubscriptionBuffer { + fn from(logs: Vec) -> Self { + Self::Log(logs.into()) + } +} + +impl FromIterator for SubscriptionBuffer { + fn from_iter>(iter: T) -> Self { + Self::Log(iter.into_iter().collect()) + } +} + +impl From> for SubscriptionBuffer { + fn from(headers: Vec) -> Self { + Self::Block(headers.into()) + } +} + +impl FromIterator for SubscriptionBuffer { + fn from_iter>(iter: T) -> Self { + Self::Block(iter.into_iter().collect()) + } +} + +/// Tracks ongoing subscription tasks. +/// +/// Performs the following functions: +/// - assigns unique subscription IDs +/// - spawns tasks to manage each subscription +/// - allows cancelling subscriptions by ID +/// +/// Calling [`Self::new`] spawns a task that periodically cleans stale filters. +/// This task runs on a separate thread to avoid [`DashMap::retain`] deadlock. +/// See [`DashMap`] documentation for more information. +#[derive(Clone)] +pub(crate) struct SubscriptionManager { + inner: Arc, +} + +impl SubscriptionManager { + /// Instantiate a new subscription manager, start a task to clean up + /// subscriptions cancelled by user disconnection. + pub(crate) fn new( + notif_sender: broadcast::Sender, + clean_interval: Duration, + ) -> Self { + let inner = Arc::new(SubscriptionManagerInner::new(notif_sender)); + let task = SubCleanerTask::new(Arc::downgrade(&inner), clean_interval); + task.spawn(); + Self { inner } + } +} + +impl core::ops::Deref for SubscriptionManager { + type Target = SubscriptionManagerInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl core::fmt::Debug for SubscriptionManager { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("SubscriptionManager").finish_non_exhaustive() + } +} + +/// Inner logic for [`SubscriptionManager`]. +#[derive(Debug)] +pub(crate) struct SubscriptionManagerInner { + next_id: AtomicU64, + tasks: DashMap, + notif_sender: broadcast::Sender, +} + +impl SubscriptionManagerInner { + /// Create a new subscription manager. + fn new(notif_sender: broadcast::Sender) -> Self { + Self { next_id: AtomicU64::new(1), tasks: DashMap::new(), notif_sender } + } + + /// Assign a new subscription ID. + fn next_id(&self) -> U64 { + U64::from(self.next_id.fetch_add(1, Ordering::Relaxed)) + } + + /// Cancel a subscription task. + pub(crate) fn unsubscribe(&self, id: U64) -> bool { + if let Some(task) = self.tasks.remove(&id) { + task.1.cancel(); + true + } else { + false + } + } + + /// Subscribe to notifications. Returns `None` if notifications are + /// disabled. + pub(crate) fn subscribe(&self, ajj_ctx: &HandlerCtx, filter: InterestKind) -> Option { + if !ajj_ctx.notifications_enabled() { + return None; + } + + let id = self.next_id(); + let token = CancellationToken::new(); + let task = SubscriptionTask { + id, + filter, + token: token.clone(), + notifs: self.notif_sender.subscribe(), + }; + task.spawn(ajj_ctx); + + debug!(%id, "registered new subscription"); + + Some(id) + } +} + +/// Task to manage a single subscription. +#[derive(Debug)] +struct SubscriptionTask { + id: U64, + filter: InterestKind, + token: CancellationToken, + notifs: broadcast::Receiver, +} + +impl SubscriptionTask { + /// Create the task future. + async fn task_future(self, ajj_ctx: HandlerCtx, ajj_cancel: WaitForCancellationFutureOwned) { + let SubscriptionTask { id, filter, token, mut notifs } = self; + + let Some(sender) = ajj_ctx.notifications() else { return }; + + let mut notif_buffer = filter.empty_sub_buffer(); + tokio::pin!(ajj_cancel); + + loop { + let span = debug_span!(parent: None, "SubscriptionTask::task_future", %id, filter = tracing::field::Empty); + if enabled!(tracing::Level::TRACE) { + span.record("filter", format!("{filter:?}")); + } + + let guard = span.enter(); + + let permit_fut = async { + if !notif_buffer.is_empty() { + sender.reserve_many(min(sender.max_capacity() / 2, notif_buffer.len())).await + } else { + pending().await + } + } + .in_current_span(); + drop(guard); + + tokio::select! { + biased; + _ = &mut ajj_cancel => { + let _guard = span.enter(); + trace!("subscription cancelled by client disconnect"); + token.cancel(); + break; + } + _ = token.cancelled() => { + let _guard = span.enter(); + trace!("subscription cancelled by user"); + break; + } + permits = permit_fut => { + let _guard = span.enter(); + let Ok(permits) = permits else { + trace!("channel to client closed"); + break + }; + + for permit in permits { + let Some(item) = notif_buffer.pop_front() else { + break; + }; + let notification = ajj::serde_json::json!{ + { + "jsonrpc": "2.0", + "method": "eth_subscription", + "params": { + "result": &item, + "subscription": id + }, + } + }; + let Ok(brv) = serde_json::value::to_raw_value(¬ification) else { + trace!(?item, "failed to serialize notification"); + continue + }; + permit.send(brv); + } + } + notif_res = notifs.recv() => { + let _guard = span.enter(); + + let notif = match notif_res { + Ok(notif) => notif, + Err(RecvError::Lagged(skipped)) => { + trace!(skipped, "missed notifications"); + continue; + }, + Err(e) =>{ + trace!(?e, "notification stream closed"); + break; + } + }; + + let output = filter.filter_notification_for_sub(¬if); + + trace!(count = output.len(), "Filter applied to notification"); + if !output.is_empty() { + notif_buffer.extend(output); + } + } + } + } + } + + /// Spawn on the ajj [`HandlerCtx`]. + fn spawn(self, ctx: &HandlerCtx) { + ctx.spawn_graceful_with_ctx(|ctx, ajj_cancel| self.task_future(ctx, ajj_cancel)); + } +} + +/// Task to clean up cancelled subscriptions. +/// +/// This task runs on a separate thread to avoid [`DashMap::retain`] deadlocks. +#[derive(Debug)] +struct SubCleanerTask { + inner: Weak, + interval: Duration, +} + +impl SubCleanerTask { + /// Create a new subscription cleaner task. + const fn new(inner: Weak, interval: Duration) -> Self { + Self { inner, interval } + } + + /// Run the task. This task runs on a separate thread, which ensures that + /// [`DashMap::retain`]'s deadlock condition is not met. See [`DashMap`] + /// documentation for more information. + fn spawn(self) { + std::thread::spawn(move || { + loop { + std::thread::sleep(self.interval); + if let Some(inner) = self.inner.upgrade() { + inner.tasks.retain(|_, task| !task.is_cancelled()); + } + } + }); + } +} diff --git a/crates/rpc-storage/src/lib.rs b/crates/rpc-storage/src/lib.rs new file mode 100644 index 0000000..af4d93a --- /dev/null +++ b/crates/rpc-storage/src/lib.rs @@ -0,0 +1,65 @@ +#![doc = include_str!("../README.md")] +#![warn( + missing_copy_implementations, + missing_debug_implementations, + missing_docs, + unreachable_pub, + clippy::missing_const_for_fn, + rustdoc::all +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![deny(unused_must_use, rust_2018_idioms)] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +mod config; +pub use config::StorageRpcConfig; +mod ctx; +pub use ctx::StorageRpcCtx; +mod resolve; +pub use resolve::BlockTags; +mod eth; +pub use eth::EthError; +mod gas_oracle; +mod interest; +pub use interest::NewBlockNotification; +mod debug; +pub use debug::DebugError; +mod signet; +pub use signet::error::SignetError; + +/// Instantiate the `eth` API router. +pub fn eth() -> ajj::Router> +where + H: signet_hot::HotKv + Send + Sync + 'static, + ::Error: trevm::revm::database::DBErrorMarker, +{ + eth::eth() +} + +/// Instantiate the `debug` API router. +pub fn debug() -> ajj::Router> +where + H: signet_hot::HotKv + Send + Sync + 'static, + ::Error: trevm::revm::database::DBErrorMarker, +{ + debug::debug() +} + +/// Instantiate the `signet` API router. +pub fn signet() -> ajj::Router> +where + H: signet_hot::HotKv + Send + Sync + 'static, + ::Error: trevm::revm::database::DBErrorMarker, +{ + signet::signet() +} + +/// Instantiate a combined router with `eth`, `debug`, and `signet` +/// namespaces. +pub fn router() -> ajj::Router> +where + H: signet_hot::HotKv + Send + Sync + 'static, + ::Error: trevm::revm::database::DBErrorMarker, +{ + ajj::Router::new().merge(eth::eth()).merge(debug::debug()).merge(signet::signet()) +} diff --git a/crates/rpc-storage/src/resolve.rs b/crates/rpc-storage/src/resolve.rs new file mode 100644 index 0000000..f5f6ebe --- /dev/null +++ b/crates/rpc-storage/src/resolve.rs @@ -0,0 +1,86 @@ +//! Block tag tracking and BlockId resolution. +//! +//! [`BlockTags`] holds externally-updated atomic values for Latest, Safe, +//! and Finalized block numbers. The RPC context owner is responsible for +//! updating these as the chain progresses. + +use alloy::primitives::B256; +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, +}; + +/// Externally-updated block tag tracker. +/// +/// Each tag is an `Arc` that the caller updates as the chain +/// progresses. The RPC layer reads these atomically for tag resolution. +/// +/// # Example +/// +/// ``` +/// use signet_rpc_storage::BlockTags; +/// +/// let tags = BlockTags::new(100, 95, 90); +/// assert_eq!(tags.latest(), 100); +/// +/// tags.set_latest(101); +/// assert_eq!(tags.latest(), 101); +/// ``` +#[derive(Debug, Clone)] +pub struct BlockTags { + latest: Arc, + safe: Arc, + finalized: Arc, +} + +impl BlockTags { + /// Create new block tags with initial values. + pub fn new(latest: u64, safe: u64, finalized: u64) -> Self { + Self { + latest: Arc::new(AtomicU64::new(latest)), + safe: Arc::new(AtomicU64::new(safe)), + finalized: Arc::new(AtomicU64::new(finalized)), + } + } + + /// Get the latest block number. + pub fn latest(&self) -> u64 { + self.latest.load(Ordering::Acquire) + } + + /// Get the safe block number. + pub fn safe(&self) -> u64 { + self.safe.load(Ordering::Acquire) + } + + /// Get the finalized block number. + pub fn finalized(&self) -> u64 { + self.finalized.load(Ordering::Acquire) + } + + /// Set the latest block number. + pub fn set_latest(&self, n: u64) { + self.latest.store(n, Ordering::Release); + } + + /// Set the safe block number. + pub fn set_safe(&self, n: u64) { + self.safe.store(n, Ordering::Release); + } + + /// Set the finalized block number. + pub fn set_finalized(&self, n: u64) { + self.finalized.store(n, Ordering::Release); + } +} + +/// Error resolving a block identifier. +#[derive(Debug, thiserror::Error)] +pub enum ResolveError { + /// Cold storage error. + #[error(transparent)] + Cold(#[from] signet_cold::ColdStorageError), + /// Block hash not found. + #[error("block hash not found: {0}")] + HashNotFound(B256), +} diff --git a/crates/rpc-storage/src/signet/endpoints.rs b/crates/rpc-storage/src/signet/endpoints.rs new file mode 100644 index 0000000..c1c3f26 --- /dev/null +++ b/crates/rpc-storage/src/signet/endpoints.rs @@ -0,0 +1,107 @@ +//! Signet namespace RPC endpoint implementations. + +use crate::{ + ctx::StorageRpcCtx, + eth::helpers::{CfgFiller, await_handler, response_tri}, + signet::error::SignetError, +}; +use ajj::{HandlerCtx, ResponsePayload}; +use alloy::eips::{BlockId, eip1559::BaseFeeParams}; +use signet_bundle::{SignetBundleDriver, SignetCallBundle, SignetCallBundleResponse}; +use signet_hot::HotKv; +use signet_hot::model::HotKvRead; +use signet_types::SignedOrder; +use std::time::Duration; +use tokio::select; +use trevm::revm::database::DBErrorMarker; + +/// `signet_sendOrder` handler. +pub(super) async fn send_order( + hctx: HandlerCtx, + order: SignedOrder, + ctx: StorageRpcCtx, +) -> Result<(), String> +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let Some(tx_cache) = ctx.tx_cache().cloned() else { + return Err(SignetError::TxCacheNotProvided.into_string()); + }; + + let task = |hctx: HandlerCtx| async move { + hctx.spawn(async move { tx_cache.forward_order(order).await.map_err(|e| e.to_string()) }); + Ok(()) + }; + + await_handler!(@option hctx.spawn_blocking_with_ctx(task)) +} + +/// `signet_callBundle` handler. +pub(super) async fn call_bundle( + hctx: HandlerCtx, + bundle: SignetCallBundle, + ctx: StorageRpcCtx, +) -> ResponsePayload +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + let timeout = bundle.bundle.timeout.unwrap_or(1000); + + let task = async move { + let id = bundle.state_block_number(); + let mut block_id: BlockId = id.into(); + + let pending = block_id.is_pending(); + if pending { + block_id = BlockId::latest(); + } + + let cold = ctx.cold(); + let block_num = response_tri!(ctx.resolve_block_id(block_id).await); + + let mut header = + response_tri!(cold.get_header_by_number(block_num).await.map_err(|e| e.to_string())); + + let header = + response_tri!(header.as_mut().ok_or_else(|| format!("block not found: {block_id}"))); + + // For pending blocks, synthesize the next-block header. + if pending { + header.parent_hash = header.hash_slow(); + header.number += 1; + header.timestamp += 12; + header.base_fee_per_gas = header.next_block_base_fee(BaseFeeParams::ethereum()); + header.gas_limit = ctx.config().rpc_gas_cap; + } + + // State at the resolved block number (before any pending header mutation). + let db = response_tri!(ctx.revm_state_at_height(block_num).map_err(|e| e.to_string())); + + let mut driver = SignetBundleDriver::from(&bundle); + + let trevm = signet_evm::signet_evm(db, ctx.constants().clone()) + .fill_cfg(&CfgFiller(ctx.chain_id())) + .fill_block(header); + + response_tri!(trevm.drive_bundle(&mut driver).map_err(|e| e.into_error())); + + ResponsePayload::Success(driver.into_response()) + }; + + let task = async move { + select! { + _ = tokio::time::sleep(Duration::from_millis(timeout)) => { + ResponsePayload::internal_error_message( + "timeout during bundle simulation".into(), + ) + } + result = task => { + result + } + } + }; + + await_handler!(@response_option hctx.spawn_blocking(task)) +} diff --git a/crates/rpc-storage/src/signet/error.rs b/crates/rpc-storage/src/signet/error.rs new file mode 100644 index 0000000..ad019f4 --- /dev/null +++ b/crates/rpc-storage/src/signet/error.rs @@ -0,0 +1,16 @@ +//! Error types for the signet namespace. + +/// Errors that can occur in the `signet` namespace. +#[derive(Debug, Clone, Copy, thiserror::Error)] +pub enum SignetError { + /// The transaction cache was not provided. + #[error("transaction cache not provided")] + TxCacheNotProvided, +} + +impl SignetError { + /// Convert to a string by value. + pub fn into_string(self) -> String { + self.to_string() + } +} diff --git a/crates/rpc-storage/src/signet/mod.rs b/crates/rpc-storage/src/signet/mod.rs new file mode 100644 index 0000000..f9c452e --- /dev/null +++ b/crates/rpc-storage/src/signet/mod.rs @@ -0,0 +1,19 @@ +//! Signet RPC methods and related code. + +mod endpoints; +use endpoints::{call_bundle, send_order}; +pub(crate) mod error; + +use crate::ctx::StorageRpcCtx; +use signet_hot::HotKv; +use signet_hot::model::HotKvRead; +use trevm::revm::database::DBErrorMarker; + +/// Instantiate a `signet` API router backed by storage. +pub(crate) fn signet() -> ajj::Router> +where + H: HotKv + Send + Sync + 'static, + ::Error: DBErrorMarker, +{ + ajj::Router::new().route("sendOrder", send_order::).route("callBundle", call_bundle::) +} diff --git a/crates/rpc-storage/tests/eth_rpc.rs b/crates/rpc-storage/tests/eth_rpc.rs new file mode 100644 index 0000000..e6d5655 --- /dev/null +++ b/crates/rpc-storage/tests/eth_rpc.rs @@ -0,0 +1,846 @@ +//! Integration tests for the `signet-rpc-storage` ETH RPC endpoints. +//! +//! Tests exercise the public router API via the axum service layer, using +//! in-memory storage backends (`MemKv` + `MemColdBackend`). + +use alloy::{ + consensus::{ + EthereumTxEnvelope, Header, Receipt as AlloyReceipt, SignableTransaction, Signed, TxLegacy, + TxType, + }, + primitives::{Address, B256, Log as PrimitiveLog, LogData, TxKind, U256, address, logs_bloom}, +}; +use axum::body::Body; +use http::Request; +use serde_json::{Value, json}; +use signet_cold::{BlockData, ColdStorageHandle, ColdStorageTask, mem::MemColdBackend}; +use signet_constants::SignetSystemConstants; +use signet_hot::{HotKv, db::UnsafeDbWrite, mem::MemKv}; +use signet_rpc_storage::{BlockTags, NewBlockNotification, StorageRpcConfig, StorageRpcCtx}; +use signet_storage::UnifiedStorage; +use signet_storage_types::Receipt; +use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; +use tower::ServiceExt; + +// --------------------------------------------------------------------------- +// Test helpers +// --------------------------------------------------------------------------- + +/// Everything needed to make RPC calls against the storage-backed router. +struct TestHarness { + app: axum::Router, + cold: ColdStorageHandle, + hot: MemKv, + tags: BlockTags, + #[allow(dead_code)] + notif_tx: broadcast::Sender, + ctx: StorageRpcCtx, + _cancel: CancellationToken, +} + +impl TestHarness { + /// Create a minimal harness with empty storage. + async fn new(latest: u64) -> Self { + let cancel = CancellationToken::new(); + let hot = MemKv::new(); + let cold = ColdStorageTask::spawn(MemColdBackend::new(), cancel.clone()); + let storage = UnifiedStorage::new(hot.clone(), cold.clone()); + let constants = SignetSystemConstants::test(); + let tags = BlockTags::new(latest, latest.saturating_sub(2), 0); + let (notif_tx, _) = broadcast::channel::(16); + let ctx = StorageRpcCtx::new( + storage, + constants, + tags.clone(), + None, + StorageRpcConfig::default(), + notif_tx.clone(), + ); + let app = signet_rpc_storage::eth::().into_axum("/").with_state(ctx.clone()); + + Self { app, cold, hot, tags, notif_tx, ctx, _cancel: cancel } + } + + /// Build an axum router for the debug namespace. + fn debug_app(&self) -> axum::Router { + signet_rpc_storage::debug::().into_axum("/").with_state(self.ctx.clone()) + } + + /// Build an axum router for the signet namespace. + #[allow(dead_code)] + fn signet_app(&self) -> axum::Router { + signet_rpc_storage::signet::().into_axum("/").with_state(self.ctx.clone()) + } +} + +/// Make a JSON-RPC call and return the `"result"` field. +/// +/// The `method` parameter is the short name (e.g. `"blockNumber"`), without +/// the `eth_` prefix. The router registers methods without namespace prefix. +/// +/// Panics if the response contains an `"error"` field. +async fn rpc_call(app: &axum::Router, method: &str, params: Value) -> Value { + let resp = rpc_call_raw(app, method, params).await; + if let Some(error) = resp.get("error") { + panic!("RPC error for {method}: {error}"); + } + resp["result"].clone() +} + +/// Make a JSON-RPC call and return the full response (including any error). +async fn rpc_call_raw(app: &axum::Router, method: &str, params: Value) -> Value { + let body = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params, + }); + + let req = Request::builder() + .method("POST") + .uri("/") + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&body).unwrap())) + .unwrap(); + + let response = app.clone().oneshot(req).await.unwrap(); + let bytes = axum::body::to_bytes(response.into_body(), 1024 * 1024).await.unwrap(); + serde_json::from_slice(&bytes).unwrap() +} + +// --------------------------------------------------------------------------- +// Test data builders +// --------------------------------------------------------------------------- + +/// Test address used for account state queries. +const TEST_ADDR: Address = address!("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + +/// Test log-emitting contract address. +const LOG_ADDR: Address = address!("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); + +/// Test log topic. +const LOG_TOPIC: B256 = B256::repeat_byte(0xcc); + +/// Create a legacy transaction signed with a deterministic key. +/// +/// Uses alloy's signer to produce a valid ECDSA signature so that +/// `recover_sender` succeeds during RPC response building. +fn make_signed_tx(nonce: u64) -> (signet_storage_types::TransactionSigned, Address) { + make_signed_tx_with_gas_price(nonce, 1_000_000_000) +} + +/// Create a legacy transaction with a custom gas price. +fn make_signed_tx_with_gas_price( + nonce: u64, + gas_price: u128, +) -> (signet_storage_types::TransactionSigned, Address) { + use alloy::signers::{SignerSync, local::PrivateKeySigner}; + + let signer = PrivateKeySigner::from_signing_key( + alloy::signers::k256::ecdsa::SigningKey::from_slice( + &B256::repeat_byte((nonce as u8).wrapping_add(1)).0, + ) + .unwrap(), + ); + let sender = signer.address(); + + let tx = TxLegacy { + nonce, + gas_price, + gas_limit: 21_000, + to: TxKind::Call(Address::ZERO), + value: U256::from(1000), + ..Default::default() + }; + + let sig_hash = tx.signature_hash(); + let sig = signer.sign_hash_sync(&sig_hash).unwrap(); + let signed: signet_storage_types::TransactionSigned = + EthereumTxEnvelope::Legacy(Signed::new_unhashed(tx, sig)); + + (signed, sender) +} + +/// Build a [`BlockData`] from pre-signed transactions. +/// +/// Creates receipts with incrementing `cumulative_gas_used` and optionally +/// attaches logs to each receipt. +fn make_block( + block_num: u64, + txs: Vec, + logs_per_receipt: usize, +) -> BlockData { + let receipts: Vec = txs + .iter() + .enumerate() + .map(|(i, _)| { + let logs: Vec = (0..logs_per_receipt) + .map(|l| PrimitiveLog { + address: LOG_ADDR, + data: LogData::new_unchecked( + vec![LOG_TOPIC], + alloy::primitives::Bytes::from(vec![l as u8]), + ), + }) + .collect(); + + Receipt { + tx_type: TxType::Legacy, + inner: AlloyReceipt { + status: true.into(), + cumulative_gas_used: 21_000 * (i as u64 + 1), + logs, + }, + } + }) + .collect(); + + // Compute the logs bloom from all receipt logs so getLogs bloom check passes. + let all_logs: Vec<_> = receipts.iter().flat_map(|r| r.inner.logs.iter()).collect(); + let bloom = logs_bloom(all_logs); + + let gas_used = receipts.last().map(|r| r.inner.cumulative_gas_used).unwrap_or_default(); + + let header = Header { + number: block_num, + timestamp: 1_700_000_000 + block_num, + base_fee_per_gas: Some(1_000_000_000), + gas_limit: 30_000_000, + gas_used, + logs_bloom: bloom, + ..Default::default() + }; + + BlockData::new(header, txs, receipts, vec![], None) +} + +// --------------------------------------------------------------------------- +// Group 1: Simple queries +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn test_block_number() { + let h = TestHarness::new(42).await; + let result = rpc_call(&h.app, "blockNumber", json!([])).await; + assert_eq!(result, json!("0x2a")); +} + +#[tokio::test] +async fn test_chain_id() { + let h = TestHarness::new(0).await; + let result = rpc_call(&h.app, "chainId", json!([])).await; + let expected = format!("0x{:x}", SignetSystemConstants::test().ru_chain_id()); + assert_eq!(result, json!(expected)); +} + +// --------------------------------------------------------------------------- +// Group 2: Cold storage — block queries +// --------------------------------------------------------------------------- + +/// Shared setup: append a block with 2 signed transactions to cold storage. +async fn setup_cold_block(h: &TestHarness) -> (Vec, Vec
) { + let (tx0, sender0) = make_signed_tx(0); + let (tx1, sender1) = make_signed_tx(1); + + let hash0 = *tx0.tx_hash(); + let hash1 = *tx1.tx_hash(); + + let block = make_block(1, vec![tx0, tx1], 1); + h.cold.append_block(block).await.unwrap(); + h.tags.set_latest(1); + + (vec![hash0, hash1], vec![sender0, sender1]) +} + +#[tokio::test] +async fn test_get_block_by_number_hashes() { + let h = TestHarness::new(0).await; + let (tx_hashes, _) = setup_cold_block(&h).await; + + let result = rpc_call(&h.app, "getBlockByNumber", json!(["0x1", false])).await; + + assert_eq!(result["number"], json!("0x1")); + let txs = result["transactions"].as_array().unwrap(); + assert_eq!(txs.len(), 2); + // When full=false, transactions are hashes (strings) + assert!(txs[0].is_string()); + assert_eq!(txs[0].as_str().unwrap(), format!("{:?}", tx_hashes[0])); +} + +#[tokio::test] +async fn test_get_block_by_number_full() { + let h = TestHarness::new(0).await; + let (tx_hashes, senders) = setup_cold_block(&h).await; + + let result = rpc_call(&h.app, "getBlockByNumber", json!(["0x1", true])).await; + + assert_eq!(result["number"], json!("0x1")); + let txs = result["transactions"].as_array().unwrap(); + assert_eq!(txs.len(), 2); + // When full=true, transactions are objects + assert!(txs[0].is_object()); + assert_eq!(txs[0]["hash"], json!(format!("{:?}", tx_hashes[0]))); + assert_eq!(txs[0]["from"], json!(format!("{:?}", senders[0]))); + assert_eq!(txs[0]["blockNumber"], json!("0x1")); + assert_eq!(txs[0]["transactionIndex"], json!("0x0")); + assert_eq!(txs[1]["transactionIndex"], json!("0x1")); +} + +#[tokio::test] +async fn test_get_block_by_hash() { + let h = TestHarness::new(0).await; + setup_cold_block(&h).await; + + // Get the block to learn its hash + let block = rpc_call(&h.app, "getBlockByNumber", json!(["0x1", false])).await; + let block_hash = block["hash"].as_str().unwrap().to_string(); + + let result = rpc_call(&h.app, "getBlockByHash", json!([block_hash, false])).await; + assert_eq!(result["number"], json!("0x1")); + assert_eq!(result["hash"], json!(block_hash)); +} + +#[tokio::test] +async fn test_get_block_tx_count() { + let h = TestHarness::new(0).await; + setup_cold_block(&h).await; + + let result = rpc_call(&h.app, "getBlockTransactionCountByNumber", json!(["0x1"])).await; + assert_eq!(result, json!("0x2")); +} + +#[tokio::test] +async fn test_get_block_header() { + let h = TestHarness::new(0).await; + setup_cold_block(&h).await; + + let result = rpc_call(&h.app, "getBlockHeaderByNumber", json!(["0x1"])).await; + assert_eq!(result["number"], json!("0x1")); + assert!(result["baseFeePerGas"].is_string()); +} + +#[tokio::test] +async fn test_get_block_not_found() { + let h = TestHarness::new(255).await; + let result = rpc_call(&h.app, "getBlockByNumber", json!(["0xff", false])).await; + assert!(result.is_null()); +} + +// --------------------------------------------------------------------------- +// Group 3: Cold storage — transaction queries +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn test_get_transaction_by_hash() { + let h = TestHarness::new(0).await; + let (tx_hashes, senders) = setup_cold_block(&h).await; + + let result = + rpc_call(&h.app, "getTransactionByHash", json!([format!("{:?}", tx_hashes[0])])).await; + + assert_eq!(result["hash"], json!(format!("{:?}", tx_hashes[0]))); + assert_eq!(result["from"], json!(format!("{:?}", senders[0]))); + assert_eq!(result["blockNumber"], json!("0x1")); + assert_eq!(result["transactionIndex"], json!("0x0")); +} + +#[tokio::test] +async fn test_get_raw_transaction_by_hash() { + let h = TestHarness::new(0).await; + let (tx_hashes, _) = setup_cold_block(&h).await; + + let result = + rpc_call(&h.app, "getRawTransactionByHash", json!([format!("{:?}", tx_hashes[0])])).await; + + // Raw transaction is a hex string + let hex = result.as_str().unwrap(); + assert!(hex.starts_with("0x")); + assert!(hex.len() > 4); +} + +#[tokio::test] +async fn test_get_tx_by_block_and_index() { + let h = TestHarness::new(0).await; + let (tx_hashes, senders) = setup_cold_block(&h).await; + + let result = + rpc_call(&h.app, "getTransactionByBlockNumberAndIndex", json!(["0x1", "0x0"])).await; + + assert_eq!(result["hash"], json!(format!("{:?}", tx_hashes[0]))); + assert_eq!(result["from"], json!(format!("{:?}", senders[0]))); +} + +#[tokio::test] +async fn test_get_transaction_receipt() { + let h = TestHarness::new(0).await; + let (tx_hashes, senders) = setup_cold_block(&h).await; + + let result = + rpc_call(&h.app, "getTransactionReceipt", json!([format!("{:?}", tx_hashes[0])])).await; + + assert_eq!(result["transactionHash"], json!(format!("{:?}", tx_hashes[0]))); + assert_eq!(result["from"], json!(format!("{:?}", senders[0]))); + assert_eq!(result["blockNumber"], json!("0x1")); + assert_eq!(result["status"], json!("0x1")); + assert_eq!(result["gasUsed"], json!("0x5208")); // 21000 +} + +#[tokio::test] +async fn test_get_block_receipts() { + let h = TestHarness::new(0).await; + setup_cold_block(&h).await; + + let result = rpc_call(&h.app, "getBlockReceipts", json!(["0x1"])).await; + + let receipts = result.as_array().unwrap(); + assert_eq!(receipts.len(), 2); + assert_eq!(receipts[0]["transactionIndex"], json!("0x0")); + assert_eq!(receipts[1]["transactionIndex"], json!("0x1")); + assert_eq!(receipts[0]["status"], json!("0x1")); + assert_eq!(receipts[1]["status"], json!("0x1")); +} + +// --------------------------------------------------------------------------- +// Group 4: Hot storage — account state +// --------------------------------------------------------------------------- + +/// Populate hot storage with a test account. +fn setup_hot_account(hot: &MemKv) { + use signet_storage_types::Account; + use trevm::revm::bytecode::Bytecode; + + let writer = hot.writer().unwrap(); + + let code = alloy::primitives::Bytes::from_static(&[0x60, 0x00, 0x60, 0x00, 0xf3]); + let bytecode = Bytecode::new_raw(code); + let code_hash = bytecode.hash_slow(); + + writer + .put_account( + &TEST_ADDR, + &Account { + nonce: 5, + balance: U256::from(1_000_000_000_000_000_000u128), + bytecode_hash: Some(code_hash), + }, + ) + .unwrap(); + + writer.put_storage(&TEST_ADDR, &U256::from(42), &U256::from(999)).unwrap(); + + writer.put_bytecode(&code_hash, &bytecode).unwrap(); + + writer.commit().unwrap(); +} + +#[tokio::test] +async fn test_get_balance() { + let h = TestHarness::new(1).await; + setup_hot_account(&h.hot); + + // Append a dummy block so tag resolution succeeds + let block = make_block(1, vec![], 0); + h.cold.append_block(block).await.unwrap(); + + let result = + rpc_call(&h.app, "getBalance", json!([format!("{:?}", TEST_ADDR), "latest"])).await; + + // 1 ETH = 10^18 + assert_eq!(result, json!("0xde0b6b3a7640000")); +} + +#[tokio::test] +async fn test_get_transaction_count() { + let h = TestHarness::new(1).await; + setup_hot_account(&h.hot); + + let block = make_block(1, vec![], 0); + h.cold.append_block(block).await.unwrap(); + + let result = + rpc_call(&h.app, "getTransactionCount", json!([format!("{:?}", TEST_ADDR), "latest"])) + .await; + + assert_eq!(result, json!("0x5")); +} + +#[tokio::test] +async fn test_get_storage_at() { + let h = TestHarness::new(1).await; + setup_hot_account(&h.hot); + + let block = make_block(1, vec![], 0); + h.cold.append_block(block).await.unwrap(); + + let slot = format!("{:#066x}", 42u64); + let result = + rpc_call(&h.app, "getStorageAt", json!([format!("{:?}", TEST_ADDR), slot, "latest"])).await; + + // 999 = 0x3e7, padded to 32 bytes + let expected = format!("{:#066x}", 999u64); + assert_eq!(result, json!(expected)); +} + +#[tokio::test] +async fn test_get_code() { + let h = TestHarness::new(1).await; + setup_hot_account(&h.hot); + + let block = make_block(1, vec![], 0); + h.cold.append_block(block).await.unwrap(); + + let result = rpc_call(&h.app, "getCode", json!([format!("{:?}", TEST_ADDR), "latest"])).await; + + assert_eq!(result, json!("0x60006000f3")); +} + +#[tokio::test] +async fn test_get_balance_unknown_account() { + let h = TestHarness::new(1).await; + + let block = make_block(1, vec![], 0); + h.cold.append_block(block).await.unwrap(); + + let unknown = Address::repeat_byte(0xff); + let result = rpc_call(&h.app, "getBalance", json!([format!("{:?}", unknown), "latest"])).await; + + assert_eq!(result, json!("0x0")); +} + +// --------------------------------------------------------------------------- +// Group 5: Logs +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn test_get_logs_by_block_hash() { + let h = TestHarness::new(0).await; + + // Create block with transactions that have logs + let (tx0, _) = make_signed_tx(0); + let block = make_block(1, vec![tx0], 2); // 2 logs per receipt + h.cold.append_block(block).await.unwrap(); + h.tags.set_latest(1); + + // Get the block hash + let block_result = rpc_call(&h.app, "getBlockByNumber", json!(["0x1", false])).await; + let block_hash = block_result["hash"].as_str().unwrap().to_string(); + + let result = rpc_call( + &h.app, + "getLogs", + json!([{ + "blockHash": block_hash, + "address": format!("{:?}", LOG_ADDR), + }]), + ) + .await; + + let logs = result.as_array().unwrap(); + assert_eq!(logs.len(), 2); + assert_eq!(logs[0]["address"], json!(format!("{:?}", LOG_ADDR))); + assert_eq!(logs[0]["blockNumber"], json!("0x1")); + assert_eq!(logs[0]["logIndex"], json!("0x0")); + assert_eq!(logs[1]["logIndex"], json!("0x1")); +} + +#[tokio::test] +async fn test_get_logs_by_range() { + let h = TestHarness::new(0).await; + + let (tx0, _) = make_signed_tx(0); + let block = make_block(1, vec![tx0], 1); + h.cold.append_block(block).await.unwrap(); + h.tags.set_latest(1); + + let result = rpc_call( + &h.app, + "getLogs", + json!([{ + "fromBlock": "0x1", + "toBlock": "0x1", + "topics": [format!("{:?}", LOG_TOPIC)], + }]), + ) + .await; + + let logs = result.as_array().unwrap(); + assert_eq!(logs.len(), 1); + assert!(logs[0]["topics"].as_array().unwrap().contains(&json!(format!("{:?}", LOG_TOPIC)))); +} + +#[tokio::test] +async fn test_get_logs_empty() { + let h = TestHarness::new(0).await; + + let (tx0, _) = make_signed_tx(0); + let block = make_block(1, vec![tx0], 0); // no logs + h.cold.append_block(block).await.unwrap(); + h.tags.set_latest(1); + + let result = rpc_call( + &h.app, + "getLogs", + json!([{ + "fromBlock": "0x1", + "toBlock": "0x1", + "address": format!("{:?}", LOG_ADDR), + }]), + ) + .await; + + assert_eq!(result.as_array().unwrap().len(), 0); +} + +// --------------------------------------------------------------------------- +// Group 6: Edge cases & errors +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn test_not_supported() { + let h = TestHarness::new(0).await; + let resp = rpc_call_raw(&h.app, "syncing", json!([])).await; + assert!(resp.get("error").is_some()); + let msg = resp["error"]["message"].as_str().unwrap(); + assert!(msg.contains("not supported"), "unexpected error: {msg}"); +} + +#[tokio::test] +async fn test_send_raw_tx_no_cache() { + let h = TestHarness::new(0).await; + let resp = rpc_call_raw(&h.app, "sendRawTransaction", json!(["0x00"])).await; + assert!(resp.get("error").is_some()); +} + +// --------------------------------------------------------------------------- +// Group 7: Gas & Fee Queries +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn test_gas_price() { + let h = TestHarness::new(0).await; + + // Create a block with txs that have gas_price (2 gwei) > base_fee (1 gwei) + let (tx0, _) = make_signed_tx_with_gas_price(0, 2_000_000_000); + let block = make_block(1, vec![tx0], 0); + h.cold.append_block(block).await.unwrap(); + h.tags.set_latest(1); + + let result = rpc_call(&h.app, "gasPrice", json!([])).await; + + // tip = gas_price - base_fee = 2e9 - 1e9 = 1e9 + // gasPrice = tip + base_fee = 1e9 + 1e9 = 2e9 = 0x77359400 + assert_eq!(result, json!("0x77359400")); +} + +#[tokio::test] +async fn test_max_priority_fee_per_gas() { + let h = TestHarness::new(0).await; + + let (tx0, _) = make_signed_tx_with_gas_price(0, 2_000_000_000); + let block = make_block(1, vec![tx0], 0); + h.cold.append_block(block).await.unwrap(); + h.tags.set_latest(1); + + let result = rpc_call(&h.app, "maxPriorityFeePerGas", json!([])).await; + + // tip only = gas_price - base_fee = 1e9 = 0x3b9aca00 + assert_eq!(result, json!("0x3b9aca00")); +} + +#[tokio::test] +async fn test_gas_price_empty_blocks() { + let h = TestHarness::new(0).await; + + let block = make_block(1, vec![], 0); + h.cold.append_block(block).await.unwrap(); + h.tags.set_latest(1); + + let result = rpc_call(&h.app, "gasPrice", json!([])).await; + + // No txs means tip = 0, gasPrice = base_fee = 1e9 = 0x3b9aca00 + assert_eq!(result, json!("0x3b9aca00")); +} + +#[tokio::test] +async fn test_fee_history_basic() { + let h = TestHarness::new(0).await; + + for i in 1u64..=3 { + let (tx, _) = make_signed_tx_with_gas_price(i - 1, 2_000_000_000); + let block = make_block(i, vec![tx], 0); + h.cold.append_block(block).await.unwrap(); + } + h.tags.set_latest(3); + + // Request 2 blocks of fee history ending at block 3 + let result = rpc_call(&h.app, "feeHistory", json!(["0x2", "0x3", null])).await; + + // oldest_block = end_block + 1 - block_count = 3 + 1 - 2 = 2 + assert_eq!(result["oldestBlock"], json!("0x2")); + // base_fee_per_gas has block_count + 1 entries (includes next-block prediction) + let base_fees = result["baseFeePerGas"].as_array().unwrap(); + assert_eq!(base_fees.len(), 3); + // gas_used_ratio has block_count entries + let gas_ratios = result["gasUsedRatio"].as_array().unwrap(); + assert_eq!(gas_ratios.len(), 2); + // No reward field when no percentiles requested + assert!(result["reward"].is_null()); +} + +#[tokio::test] +async fn test_fee_history_with_rewards() { + let h = TestHarness::new(0).await; + + let (tx0, _) = make_signed_tx_with_gas_price(0, 2_000_000_000); + let block = make_block(1, vec![tx0], 0); + h.cold.append_block(block).await.unwrap(); + h.tags.set_latest(1); + + let result = rpc_call(&h.app, "feeHistory", json!(["0x1", "0x1", [25.0, 75.0]])).await; + + assert_eq!(result["oldestBlock"], json!("0x1")); + let rewards = result["reward"].as_array().unwrap(); + assert_eq!(rewards.len(), 1); + let block_rewards = rewards[0].as_array().unwrap(); + assert_eq!(block_rewards.len(), 2); +} + +// --------------------------------------------------------------------------- +// Group 8: Filters +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn test_new_block_filter_and_changes() { + let h = TestHarness::new(0).await; + + // Install a block filter at block 0 + let filter_id = rpc_call(&h.app, "newBlockFilter", json!([])).await; + let filter_id_str = filter_id.as_str().unwrap().to_string(); + + // Append a block + let (tx0, _) = make_signed_tx(0); + let block = make_block(1, vec![tx0], 0); + h.cold.append_block(block).await.unwrap(); + h.tags.set_latest(1); + + // Poll for changes — should get block hash for block 1 + let changes = rpc_call(&h.app, "getFilterChanges", json!([filter_id_str])).await; + let hashes = changes.as_array().unwrap(); + assert_eq!(hashes.len(), 1); + assert!(hashes[0].is_string()); + + // Poll again with no new blocks — should be empty + let changes = rpc_call(&h.app, "getFilterChanges", json!([filter_id_str])).await; + let hashes = changes.as_array().unwrap(); + assert!(hashes.is_empty()); +} + +#[tokio::test] +async fn test_new_log_filter_and_changes() { + let h = TestHarness::new(0).await; + + // Install a log filter for LOG_ADDR with LOG_TOPIC + let filter_id = rpc_call( + &h.app, + "newFilter", + json!([{ + "address": format!("{:?}", LOG_ADDR), + "topics": [format!("{:?}", LOG_TOPIC)], + }]), + ) + .await; + let filter_id_str = filter_id.as_str().unwrap().to_string(); + + // Append a block with matching logs + let (tx0, _) = make_signed_tx(0); + let block = make_block(1, vec![tx0], 2); + h.cold.append_block(block).await.unwrap(); + h.tags.set_latest(1); + + // Poll for changes — should get matching logs + let changes = rpc_call(&h.app, "getFilterChanges", json!([filter_id_str])).await; + let logs = changes.as_array().unwrap(); + assert_eq!(logs.len(), 2); + assert_eq!(logs[0]["address"], json!(format!("{:?}", LOG_ADDR))); +} + +#[tokio::test] +async fn test_uninstall_filter() { + let h = TestHarness::new(0).await; + + let filter_id = rpc_call(&h.app, "newBlockFilter", json!([])).await; + let filter_id_str = filter_id.as_str().unwrap().to_string(); + + // Uninstall + let result = rpc_call(&h.app, "uninstallFilter", json!([filter_id_str])).await; + assert_eq!(result, json!(true)); + + // Uninstall again — should return false + let result = rpc_call(&h.app, "uninstallFilter", json!([filter_id_str])).await; + assert_eq!(result, json!(false)); +} + +// --------------------------------------------------------------------------- +// Group 9: Debug namespace +// --------------------------------------------------------------------------- + +/// Set up hot storage with a genesis header and fund an address. +/// +/// The genesis header at block 0 is required so `revm_reader_at_height` +/// can validate height bounds. Without it, MemKv returns `NoBlocks`. +fn setup_hot_for_evm(hot: &MemKv, addr: Address, balance: U256) { + use signet_storage_types::{Account, SealedHeader}; + + let writer = hot.writer().unwrap(); + + // Write a genesis header so the hot storage tracks block 0. + let genesis = SealedHeader::new(Header::default()); + writer.put_header(&genesis).unwrap(); + + writer.put_account(&addr, &Account { nonce: 0, balance, bytecode_hash: None }).unwrap(); + writer.commit().unwrap(); +} + +#[tokio::test] +async fn test_trace_block_by_number_noop() { + let h = TestHarness::new(0).await; + + let (tx0, sender) = make_signed_tx(0); + setup_hot_for_evm(&h.hot, sender, U256::from(1_000_000_000_000_000_000u128)); + + let block = make_block(1, vec![tx0], 0); + h.cold.append_block(block).await.unwrap(); + h.tags.set_latest(1); + + let debug_app = h.debug_app(); + let result = + rpc_call(&debug_app, "traceBlockByNumber", json!(["0x1", {"tracer": "noopTracer"}])).await; + + let traces = result.as_array().unwrap(); + assert_eq!(traces.len(), 1); +} + +#[tokio::test] +async fn test_trace_transaction_noop() { + let h = TestHarness::new(0).await; + + let (tx0, sender) = make_signed_tx(0); + let tx_hash = *tx0.tx_hash(); + setup_hot_for_evm(&h.hot, sender, U256::from(1_000_000_000_000_000_000u128)); + + let block = make_block(1, vec![tx0], 0); + h.cold.append_block(block).await.unwrap(); + h.tags.set_latest(1); + + let debug_app = h.debug_app(); + let result = rpc_call( + &debug_app, + "traceTransaction", + json!([format!("{:?}", tx_hash), {"tracer": "noopTracer"}]), + ) + .await; + + // NoopFrame result is not null + assert!(!result.is_null()); +}