diff --git a/Cargo.lock b/Cargo.lock index ccb5359c..c5dbea1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1597,8 +1597,10 @@ dependencies = [ "arc-swap", "base-flashtypes", "base-primitives", + "base-reth-flashblocks", "base-reth-test-utils", "criterion", + "derive_more", "eyre", "futures-util", "jsonrpsee", @@ -1736,21 +1738,16 @@ dependencies = [ "alloy-signer-local", "alloy-sol-macro", "alloy-sol-types", - "base-flashtypes", - "base-reth-flashblocks", "chrono", - "derive_more", "eyre", "futures-util", "jsonrpsee", - "once_cell", "op-alloy-network", "op-alloy-rpc-types", "op-alloy-rpc-types-engine", "reth", "reth-db", "reth-e2e-test-utils", - "reth-exex", "reth-ipc", "reth-node-core", "reth-optimism-chainspec", @@ -1763,7 +1760,6 @@ dependencies = [ "reth-tracing", "serde_json", "tokio", - "tokio-stream", "tower", "tracing-subscriber 0.3.22", "url", diff --git a/crates/client/flashblocks/Cargo.toml b/crates/client/flashblocks/Cargo.toml index 82166911..96e8e585 100644 --- a/crates/client/flashblocks/Cargo.toml +++ b/crates/client/flashblocks/Cargo.toml @@ -11,6 +11,16 @@ repository.workspace = true [lints] workspace = true +[features] +default = [] +test-utils = [ + "dep:base-reth-test-utils", + "dep:reth-e2e-test-utils", + "dep:reth-optimism-node", + "dep:reth-provider", + "dep:derive_more", +] + [dependencies] # workspace base-flashtypes.workspace = true @@ -68,7 +78,15 @@ arc-swap.workspace = true metrics-derive.workspace = true rayon.workspace = true +# test-utils feature dependencies +base-reth-test-utils = { workspace = true, optional = true } +reth-e2e-test-utils = { workspace = true, optional = true } +reth-optimism-node = { workspace = true, optional = true } +reth-provider = { workspace = true, optional = true } +derive_more = { workspace = true, features = ["deref"], optional = true } + [dev-dependencies] +base-reth-flashblocks = { path = ".", features = ["test-utils"] } rstest.workspace = true rand.workspace = true eyre.workspace = true diff --git a/crates/client/flashblocks/src/lib.rs b/crates/client/flashblocks/src/lib.rs index 1f2f1ced..a1926a20 100644 --- a/crates/client/flashblocks/src/lib.rs +++ b/crates/client/flashblocks/src/lib.rs @@ -49,3 +49,6 @@ pub use extension::{FlashblocksCanonConfig, FlashblocksCanonExtension, Flashbloc mod rpc_extension; pub use rpc_extension::FlashblocksRpcExtension; + +#[cfg(feature = "test-utils")] +pub mod test_utils; diff --git a/crates/client/test-utils/src/flashblocks_harness.rs b/crates/client/flashblocks/src/test_utils/harness.rs similarity index 93% rename from crates/client/test-utils/src/flashblocks_harness.rs rename to crates/client/flashblocks/src/test_utils/harness.rs index a1acaa2c..5a51a1d3 100644 --- a/crates/client/test-utils/src/flashblocks_harness.rs +++ b/crates/client/flashblocks/src/test_utils/harness.rs @@ -3,6 +3,9 @@ use std::sync::Arc; use base_flashtypes::Flashblock; +use base_reth_test_utils::{ + OpAddOns, OpBuilder, TestHarness, default_launcher, init_silenced_tracing, +}; use derive_more::Deref; use eyre::Result; use futures_util::Future; @@ -10,14 +13,7 @@ use reth::builder::NodeHandle; use reth_e2e_test_utils::Adapter; use reth_optimism_node::OpNode; -use crate::{ - harness::TestHarness, - init_silenced_tracing, - node::{ - FlashblocksLocalNode, FlashblocksParts, LocalFlashblocksState, OpAddOns, OpBuilder, - default_launcher, - }, -}; +use super::{FlashblocksLocalNode, FlashblocksParts, LocalFlashblocksState}; /// Helper that exposes [`TestHarness`] conveniences plus Flashblocks helpers. #[derive(Debug, Deref)] diff --git a/crates/client/flashblocks/src/test_utils/mod.rs b/crates/client/flashblocks/src/test_utils/mod.rs new file mode 100644 index 00000000..fe769f89 --- /dev/null +++ b/crates/client/flashblocks/src/test_utils/mod.rs @@ -0,0 +1,275 @@ +//! Test utilities for flashblocks integration tests. +//! +//! This module provides test harnesses and helpers for testing flashblocks functionality. +//! It is gated behind the `test-utils` feature flag. + +mod harness; +use std::{ + fmt, + sync::{Arc, Mutex}, +}; + +use base_flashtypes::Flashblock; +use base_reth_test_utils::{ + LocalNode, LocalNodeProvider, OpAddOns, OpBuilder, default_launcher, init_silenced_tracing, +}; +use eyre::Result; +use futures_util::Future; +pub use harness::FlashblocksHarness; +use once_cell::sync::OnceCell; +use reth::builder::NodeHandle; +use reth_e2e_test_utils::Adapter; +use reth_exex::ExExEvent; +use reth_optimism_node::OpNode; +use reth_provider::CanonStateSubscriptions; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::StreamExt; + +use crate::{ + EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, FlashblocksReceiver, + FlashblocksState, +}; + +/// Convenience alias for the Flashblocks state backing the local node. +pub type LocalFlashblocksState = FlashblocksState; + +/// Components that allow tests to interact with the Flashblocks worker tasks. +#[derive(Clone)] +pub struct FlashblocksParts { + sender: mpsc::Sender<(Flashblock, oneshot::Sender<()>)>, + state: Arc, +} + +impl fmt::Debug for FlashblocksParts { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FlashblocksParts").finish_non_exhaustive() + } +} + +impl FlashblocksParts { + /// Clone the shared [`FlashblocksState`] handle. + pub fn state(&self) -> Arc { + self.state.clone() + } + + /// Send a flashblock to the background processor and wait until it is handled. + pub async fn send(&self, flashblock: Flashblock) -> Result<()> { + let (tx, rx) = oneshot::channel(); + self.sender.send((flashblock, tx)).await.map_err(|err| eyre::eyre!(err))?; + rx.await.map_err(|err| eyre::eyre!(err))?; + Ok(()) + } +} + +#[derive(Clone)] +struct FlashblocksNodeExtensions { + inner: Arc, +} + +struct FlashblocksNodeExtensionsInner { + sender: mpsc::Sender<(Flashblock, oneshot::Sender<()>)>, + #[allow(clippy::type_complexity)] + receiver: Arc)>>>>, + fb_cell: Arc>>, + process_canonical: bool, +} + +impl FlashblocksNodeExtensions { + fn new(process_canonical: bool) -> Self { + let (sender, receiver) = mpsc::channel::<(Flashblock, oneshot::Sender<()>)>(100); + let inner = FlashblocksNodeExtensionsInner { + sender, + receiver: Arc::new(Mutex::new(Some(receiver))), + fb_cell: Arc::new(OnceCell::new()), + process_canonical, + }; + Self { inner: Arc::new(inner) } + } + + fn apply(&self, builder: OpBuilder) -> OpBuilder { + let fb_cell = self.inner.fb_cell.clone(); + let receiver = self.inner.receiver.clone(); + let process_canonical = self.inner.process_canonical; + + let fb_cell_for_exex = fb_cell.clone(); + + builder + .install_exex("flashblocks-canon", move |mut ctx| { + let fb_cell = fb_cell_for_exex.clone(); + let process_canonical = process_canonical; + async move { + let provider = ctx.provider().clone(); + let fb = init_flashblocks_state(&fb_cell, &provider); + Ok(async move { + while let Some(note) = ctx.notifications.try_next().await? { + if let Some(committed) = note.committed_chain() { + let hash = committed.tip().num_hash(); + if process_canonical { + // Many suites drive canonical updates manually to reproduce race conditions, so + // allowing this to be disabled keeps canonical replay deterministic. + let chain = Arc::unwrap_or_clone(committed); + for (_, block) in chain.into_blocks() { + fb.on_canonical_block_received(block); + } + } + let _ = ctx.events.send(ExExEvent::FinishedHeight(hash)); + } + } + Ok(()) + }) + } + }) + .extend_rpc_modules(move |ctx| { + let fb_cell = fb_cell.clone(); + let provider = ctx.provider().clone(); + let fb = init_flashblocks_state(&fb_cell, &provider); + + let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new( + ctx.provider().subscribe_to_canonical_state(), + ); + tokio::spawn(async move { + use tokio_stream::StreamExt; + while let Some(Ok(notification)) = canon_stream.next().await { + provider.canonical_in_memory_state().notify_canon_state(notification); + } + }); + let api_ext = EthApiExt::new( + ctx.registry.eth_api().clone(), + ctx.registry.eth_handlers().filter.clone(), + fb.clone(), + ); + ctx.modules.replace_configured(api_ext.into_rpc())?; + + // Register eth_subscribe subscription endpoint for flashblocks + // Uses replace_configured since eth_subscribe already exists from reth's standard module + // Pass eth_api to enable proxying standard subscription types to reth's implementation + let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone()); + ctx.modules.replace_configured(eth_pubsub.into_rpc())?; + + let fb_for_task = fb.clone(); + let mut receiver = receiver + .lock() + .expect("flashblock receiver mutex poisoned") + .take() + .expect("flashblock receiver should only be initialized once"); + tokio::spawn(async move { + while let Some((payload, tx)) = receiver.recv().await { + fb_for_task.on_flashblock_received(payload); + let _ = tx.send(()); + } + }); + + Ok(()) + }) + } + + fn wrap_launcher(&self, launcher: L) -> impl FnOnce(OpBuilder) -> LRet + where + L: FnOnce(OpBuilder) -> LRet, + { + let extensions = self.clone(); + move |builder| { + let builder = extensions.apply(builder); + launcher(builder) + } + } + + fn parts(&self) -> Result { + let state = self.inner.fb_cell.get().ok_or_else(|| { + eyre::eyre!("FlashblocksState should be initialized during node launch") + })?; + Ok(FlashblocksParts { sender: self.inner.sender.clone(), state: state.clone() }) + } +} + +fn init_flashblocks_state( + cell: &Arc>>, + provider: &LocalNodeProvider, +) -> Arc { + cell.get_or_init(|| { + let fb = Arc::new(FlashblocksState::new(provider.clone(), 5)); + fb.start(); + fb + }) + .clone() +} + +/// Local node wrapper that exposes helpers specific to Flashblocks tests. +pub struct FlashblocksLocalNode { + node: LocalNode, + parts: FlashblocksParts, +} + +impl fmt::Debug for FlashblocksLocalNode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FlashblocksLocalNode") + .field("node", &self.node) + .field("parts", &self.parts) + .finish() + } +} + +impl FlashblocksLocalNode { + /// Launch a flashblocks-enabled node using the default launcher. + pub async fn new() -> Result { + Self::with_launcher(default_launcher).await + } + + /// Builds a flashblocks-enabled node with canonical block streaming disabled so tests can call + /// `FlashblocksState::on_canonical_block_received` at precise points. + pub async fn manual_canonical() -> Result { + Self::with_manual_canonical_launcher(default_launcher).await + } + + /// Launch a flashblocks-enabled node with a custom launcher and canonical processing enabled. + pub async fn with_launcher(launcher: L) -> Result + where + L: FnOnce(OpBuilder) -> LRet, + LRet: Future, OpAddOns>>>, + { + Self::with_launcher_inner(launcher, true).await + } + + /// Same as [`Self::with_launcher`] but leaves canonical processing to the caller. + pub async fn with_manual_canonical_launcher(launcher: L) -> Result + where + L: FnOnce(OpBuilder) -> LRet, + LRet: Future, OpAddOns>>>, + { + Self::with_launcher_inner(launcher, false).await + } + + async fn with_launcher_inner(launcher: L, process_canonical: bool) -> Result + where + L: FnOnce(OpBuilder) -> LRet, + LRet: Future, OpAddOns>>>, + { + init_silenced_tracing(); + let extensions = FlashblocksNodeExtensions::new(process_canonical); + let wrapped_launcher = extensions.wrap_launcher(launcher); + let node = LocalNode::new(wrapped_launcher).await?; + + let parts = extensions.parts()?; + Ok(Self { node, parts }) + } + + /// Access the shared Flashblocks state for assertions or manual driving. + pub fn flashblocks_state(&self) -> Arc { + self.parts.state() + } + + /// Send a flashblock through the background processor and await completion. + pub async fn send_flashblock(&self, flashblock: Flashblock) -> Result<()> { + self.parts.send(flashblock).await + } + + /// Split the wrapper into the underlying node plus flashblocks parts. + pub fn into_parts(self) -> (LocalNode, FlashblocksParts) { + (self.node, self.parts) + } + + /// Borrow the underlying [`LocalNode`]. + pub fn as_node(&self) -> &LocalNode { + &self.node + } +} diff --git a/crates/client/flashblocks/tests/eip7702_tests.rs b/crates/client/flashblocks/tests/eip7702_tests.rs index e4ab15c9..22025d15 100644 --- a/crates/client/flashblocks/tests/eip7702_tests.rs +++ b/crates/client/flashblocks/tests/eip7702_tests.rs @@ -11,9 +11,8 @@ use alloy_sol_types::SolCall; use base_flashtypes::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata, }; -use base_reth_test_utils::{ - Account, FlashblocksHarness, L1_BLOCK_INFO_DEPOSIT_TX, Minimal7702Account, SignerSync, -}; +use base_reth_flashblocks::test_utils::FlashblocksHarness; +use base_reth_test_utils::{Account, L1_BLOCK_INFO_DEPOSIT_TX, Minimal7702Account, SignerSync}; use eyre::Result; use op_alloy_network::ReceiptResponse; diff --git a/crates/client/flashblocks/tests/eth_call_erc20.rs b/crates/client/flashblocks/tests/eth_call_erc20.rs index de846fc4..08ac224c 100644 --- a/crates/client/flashblocks/tests/eth_call_erc20.rs +++ b/crates/client/flashblocks/tests/eth_call_erc20.rs @@ -19,9 +19,8 @@ use alloy_sol_types::{SolConstructor, SolValue}; use base_flashtypes::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata, }; -use base_reth_test_utils::{ - FlashblocksHarness, L1_BLOCK_INFO_DEPOSIT_TX, MockERC20, TransparentUpgradeableProxy, -}; +use base_reth_flashblocks::test_utils::FlashblocksHarness; +use base_reth_test_utils::{L1_BLOCK_INFO_DEPOSIT_TX, MockERC20, TransparentUpgradeableProxy}; use eyre::Result; struct Erc20TestSetup { harness: FlashblocksHarness, diff --git a/crates/client/flashblocks/tests/flashblocks_rpc.rs b/crates/client/flashblocks/tests/flashblocks_rpc.rs index 22ec2218..71bfc730 100644 --- a/crates/client/flashblocks/tests/flashblocks_rpc.rs +++ b/crates/client/flashblocks/tests/flashblocks_rpc.rs @@ -14,7 +14,8 @@ use alloy_rpc_types_eth::{TransactionInput, error::EthRpcErrorCode}; use base_flashtypes::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata, }; -use base_reth_test_utils::{DoubleCounter, FlashblocksHarness, L1_BLOCK_INFO_DEPOSIT_TX}; +use base_reth_flashblocks::test_utils::FlashblocksHarness; +use base_reth_test_utils::{DoubleCounter, L1_BLOCK_INFO_DEPOSIT_TX}; use eyre::Result; use futures_util::{SinkExt, StreamExt}; use op_alloy_network::{Optimism, ReceiptResponse, TransactionResponse}; diff --git a/crates/client/flashblocks/tests/state.rs b/crates/client/flashblocks/tests/state.rs index 8f998125..ac9da75d 100644 --- a/crates/client/flashblocks/tests/state.rs +++ b/crates/client/flashblocks/tests/state.rs @@ -11,10 +11,11 @@ use alloy_rpc_types_engine::PayloadId; use base_flashtypes::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata, }; -use base_reth_flashblocks::{FlashblocksAPI, FlashblocksState, PendingBlocksAPI}; +use base_reth_flashblocks::{ + FlashblocksAPI, FlashblocksState, PendingBlocksAPI, test_utils::FlashblocksHarness, +}; use base_reth_test_utils::{ - FlashblocksHarness, L1_BLOCK_INFO_DEPOSIT_TX, L1_BLOCK_INFO_DEPOSIT_TX_HASH, LocalNodeProvider, - TestAccounts, + L1_BLOCK_INFO_DEPOSIT_TX, L1_BLOCK_INFO_DEPOSIT_TX_HASH, LocalNodeProvider, TestAccounts, }; use op_alloy_consensus::OpDepositReceipt; use op_alloy_network::BlockResponse; diff --git a/crates/client/test-utils/Cargo.toml b/crates/client/test-utils/Cargo.toml index d66222f9..e0a4f931 100644 --- a/crates/client/test-utils/Cargo.toml +++ b/crates/client/test-utils/Cargo.toml @@ -12,10 +12,6 @@ description = "Common integration test utilities for node-reth crates" workspace = true [dependencies] -# Project -base-flashtypes.workspace = true -base-reth-flashblocks.workspace = true - # reth reth.workspace = true reth-optimism-node.workspace = true @@ -27,7 +23,6 @@ reth-primitives-traits.workspace = true reth-db.workspace = true reth-e2e-test-utils.workspace = true reth-node-core.workspace = true -reth-exex.workspace = true reth-tracing.workspace = true reth-rpc-layer.workspace = true reth-ipc.workspace = true @@ -54,7 +49,6 @@ op-alloy-network.workspace = true # tokio tokio.workspace = true -tokio-stream.workspace = true # async futures-util.workspace = true @@ -63,11 +57,9 @@ futures-util.workspace = true jsonrpsee.workspace = true # misc -derive_more = { workspace = true, features = ["deref"] } tracing-subscriber.workspace = true serde_json.workspace = true eyre.workspace = true -once_cell.workspace = true url.workspace = true chrono.workspace = true diff --git a/crates/client/test-utils/src/harness.rs b/crates/client/test-utils/src/harness.rs index 7a1a7e3c..1423ed0e 100644 --- a/crates/client/test-utils/src/harness.rs +++ b/crates/client/test-utils/src/harness.rs @@ -55,7 +55,7 @@ impl TestHarness { } /// Build a harness from an already-running [`LocalNode`]. - pub(crate) async fn from_node(node: LocalNode) -> Result { + pub async fn from_node(node: LocalNode) -> Result { let engine = node.engine_api()?; let accounts = TestAccounts::new(); diff --git a/crates/client/test-utils/src/lib.rs b/crates/client/test-utils/src/lib.rs index df8fc534..95013eee 100644 --- a/crates/client/test-utils/src/lib.rs +++ b/crates/client/test-utils/src/lib.rs @@ -21,16 +21,13 @@ pub use engine::{EngineAddress, EngineApi, EngineProtocol, HttpEngine, IpcEngine mod fixtures; pub use fixtures::{create_provider_factory, load_genesis}; -mod flashblocks_harness; -pub use flashblocks_harness::FlashblocksHarness; - mod harness; pub use harness::TestHarness; mod node; pub use node::{ - FlashblocksLocalNode, FlashblocksParts, LocalFlashblocksState, LocalNode, LocalNodeProvider, - OpAddOns, OpBuilder, OpComponentsBuilder, OpTypes, default_launcher, + LocalNode, LocalNodeProvider, OpAddOns, OpBuilder, OpComponentsBuilder, OpTypes, + default_launcher, }; mod tracing; diff --git a/crates/client/test-utils/src/node.rs b/crates/client/test-utils/src/node.rs index b943abe2..fd04f143 100644 --- a/crates/client/test-utils/src/node.rs +++ b/crates/client/test-utils/src/node.rs @@ -1,23 +1,12 @@ //! Local node setup with Base Sepolia chainspec -use std::{ - any::Any, - fmt, - net::SocketAddr, - sync::{Arc, Mutex}, -}; +use std::{any::Any, fmt, net::SocketAddr, sync::Arc}; use alloy_genesis::Genesis; use alloy_provider::RootProvider; use alloy_rpc_client::RpcClient; -use base_flashtypes::Flashblock; -use base_reth_flashblocks::{ - EthApiExt, EthApiOverrideServer, EthPubSub, EthPubSubApiServer, FlashblocksReceiver, - FlashblocksState, -}; use eyre::Result; use futures_util::Future; -use once_cell::sync::OnceCell; use op_alloy_network::Optimism; use reth::{ api::{FullNodeTypesAdapter, NodeTypesWithDBAdapter}, @@ -34,23 +23,18 @@ use reth_db::{ test_utils::{ERROR_DB_CREATION, TempDatabase, tempdir_path}, }; use reth_e2e_test_utils::{Adapter, TmpDB}; -use reth_exex::ExExEvent; use reth_node_core::{ args::DatadirArgs, dirs::{DataDirPath, MaybePlatformPath}, }; use reth_optimism_chainspec::OpChainSpec; use reth_optimism_node::{OpNode, args::RollupArgs}; -use reth_provider::{CanonStateSubscriptions, providers::BlockchainProvider}; -use tokio::sync::{mpsc, oneshot}; -use tokio_stream::StreamExt; +use reth_provider::providers::BlockchainProvider; use crate::engine::EngineApi; /// Convenience alias for the local blockchain provider type. pub type LocalNodeProvider = BlockchainProvider>; -/// Convenience alias for the Flashblocks state backing the local node. -pub type LocalFlashblocksState = FlashblocksState; /// Handle to a launched local node along with the resources required to keep it alive. pub struct LocalNode { @@ -73,155 +57,6 @@ impl fmt::Debug for LocalNode { } } -/// Components that allow tests to interact with the Flashblocks worker tasks. -#[derive(Clone)] -pub struct FlashblocksParts { - sender: mpsc::Sender<(Flashblock, oneshot::Sender<()>)>, - state: Arc, -} - -impl fmt::Debug for FlashblocksParts { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FlashblocksParts").finish_non_exhaustive() - } -} - -impl FlashblocksParts { - /// Clone the shared [`FlashblocksState`] handle. - pub fn state(&self) -> Arc { - self.state.clone() - } - - /// Send a flashblock to the background processor and wait until it is handled. - pub async fn send(&self, flashblock: Flashblock) -> Result<()> { - let (tx, rx) = oneshot::channel(); - self.sender.send((flashblock, tx)).await.map_err(|err| eyre::eyre!(err))?; - rx.await.map_err(|err| eyre::eyre!(err))?; - Ok(()) - } -} - -#[derive(Clone)] -struct FlashblocksNodeExtensions { - inner: Arc, -} - -struct FlashblocksNodeExtensionsInner { - sender: mpsc::Sender<(Flashblock, oneshot::Sender<()>)>, - #[allow(clippy::type_complexity)] - receiver: Arc)>>>>, - fb_cell: Arc>>, - process_canonical: bool, -} - -impl FlashblocksNodeExtensions { - fn new(process_canonical: bool) -> Self { - let (sender, receiver) = mpsc::channel::<(Flashblock, oneshot::Sender<()>)>(100); - let inner = FlashblocksNodeExtensionsInner { - sender, - receiver: Arc::new(Mutex::new(Some(receiver))), - fb_cell: Arc::new(OnceCell::new()), - process_canonical, - }; - Self { inner: Arc::new(inner) } - } - - fn apply(&self, builder: OpBuilder) -> OpBuilder { - let fb_cell = self.inner.fb_cell.clone(); - let receiver = self.inner.receiver.clone(); - let process_canonical = self.inner.process_canonical; - - let fb_cell_for_exex = fb_cell.clone(); - - builder - .install_exex("flashblocks-canon", move |mut ctx| { - let fb_cell = fb_cell_for_exex.clone(); - let process_canonical = process_canonical; - async move { - let provider = ctx.provider().clone(); - let fb = init_flashblocks_state(&fb_cell, &provider); - Ok(async move { - while let Some(note) = ctx.notifications.try_next().await? { - if let Some(committed) = note.committed_chain() { - let hash = committed.tip().num_hash(); - if process_canonical { - // Many suites drive canonical updates manually to reproduce race conditions, so - // allowing this to be disabled keeps canonical replay deterministic. - let chain = Arc::unwrap_or_clone(committed); - for (_, block) in chain.into_blocks() { - fb.on_canonical_block_received(block); - } - } - let _ = ctx.events.send(ExExEvent::FinishedHeight(hash)); - } - } - Ok(()) - }) - } - }) - .extend_rpc_modules(move |ctx| { - let fb_cell = fb_cell.clone(); - let provider = ctx.provider().clone(); - let fb = init_flashblocks_state(&fb_cell, &provider); - - let mut canon_stream = tokio_stream::wrappers::BroadcastStream::new( - ctx.provider().subscribe_to_canonical_state(), - ); - tokio::spawn(async move { - use tokio_stream::StreamExt; - while let Some(Ok(notification)) = canon_stream.next().await { - provider.canonical_in_memory_state().notify_canon_state(notification); - } - }); - let api_ext = EthApiExt::new( - ctx.registry.eth_api().clone(), - ctx.registry.eth_handlers().filter.clone(), - fb.clone(), - ); - ctx.modules.replace_configured(api_ext.into_rpc())?; - - // Register eth_subscribe subscription endpoint for flashblocks - // Uses replace_configured since eth_subscribe already exists from reth's standard module - // Pass eth_api to enable proxying standard subscription types to reth's implementation - let eth_pubsub = EthPubSub::new(ctx.registry.eth_api().clone(), fb.clone()); - ctx.modules.replace_configured(eth_pubsub.into_rpc())?; - - let fb_for_task = fb.clone(); - let mut receiver = receiver - .lock() - .expect("flashblock receiver mutex poisoned") - .take() - .expect("flashblock receiver should only be initialized once"); - tokio::spawn(async move { - while let Some((payload, tx)) = receiver.recv().await { - fb_for_task.on_flashblock_received(payload); - let _ = tx.send(()); - } - }); - - Ok(()) - }) - } - - fn wrap_launcher(&self, launcher: L) -> impl FnOnce(OpBuilder) -> LRet - where - L: FnOnce(OpBuilder) -> LRet, - { - let extensions = self.clone(); - move |builder| { - let builder = extensions.apply(builder); - launcher(builder) - } - } - - fn parts(&self) -> Result { - let state = self.inner.fb_cell.get().ok_or_else(|| { - eyre::eyre!("FlashblocksState should be initialized during node launch") - })?; - Ok(FlashblocksParts { sender: self.inner.sender.clone(), state: state.clone() }) - } -} - /// Optimism node types used for the local harness. pub type OpTypes = FullNodeTypesAdapter>>; @@ -374,94 +209,3 @@ where _task_manager: tasks, }) } - -fn init_flashblocks_state( - cell: &Arc>>, - provider: &LocalNodeProvider, -) -> Arc { - cell.get_or_init(|| { - let fb = Arc::new(FlashblocksState::new(provider.clone(), 5)); - fb.start(); - fb - }) - .clone() -} - -/// Local node wrapper that exposes helpers specific to Flashblocks tests. -pub struct FlashblocksLocalNode { - node: LocalNode, - parts: FlashblocksParts, -} - -impl fmt::Debug for FlashblocksLocalNode { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("FlashblocksLocalNode") - .field("node", &self.node) - .field("parts", &self.parts) - .finish() - } -} - -impl FlashblocksLocalNode { - /// Launch a flashblocks-enabled node using the default launcher. - pub async fn new() -> Result { - Self::with_launcher(default_launcher).await - } - - /// Builds a flashblocks-enabled node with canonical block streaming disabled so tests can call - /// `FlashblocksState::on_canonical_block_received` at precise points. - pub async fn manual_canonical() -> Result { - Self::with_manual_canonical_launcher(default_launcher).await - } - - /// Launch a flashblocks-enabled node with a custom launcher and canonical processing enabled. - pub async fn with_launcher(launcher: L) -> Result - where - L: FnOnce(OpBuilder) -> LRet, - LRet: Future, OpAddOns>>>, - { - Self::with_launcher_inner(launcher, true).await - } - - /// Same as [`Self::with_launcher`] but leaves canonical processing to the caller. - pub async fn with_manual_canonical_launcher(launcher: L) -> Result - where - L: FnOnce(OpBuilder) -> LRet, - LRet: Future, OpAddOns>>>, - { - Self::with_launcher_inner(launcher, false).await - } - - async fn with_launcher_inner(launcher: L, process_canonical: bool) -> Result - where - L: FnOnce(OpBuilder) -> LRet, - LRet: Future, OpAddOns>>>, - { - let extensions = FlashblocksNodeExtensions::new(process_canonical); - let wrapped_launcher = extensions.wrap_launcher(launcher); - let node = LocalNode::new(wrapped_launcher).await?; - - let parts = extensions.parts()?; - Ok(Self { node, parts }) - } - - /// Access the shared Flashblocks state for assertions or manual driving. - pub fn flashblocks_state(&self) -> Arc { - self.parts.state() - } - - /// Send a flashblock through the background processor and await completion. - pub async fn send_flashblock(&self, flashblock: Flashblock) -> Result<()> { - self.parts.send(flashblock).await - } - - /// Split the wrapper into the underlying node plus flashblocks parts. - pub fn into_parts(self) -> (LocalNode, FlashblocksParts) { - (self.node, self.parts) - } - - /// Borrow the underlying [`LocalNode`]. - pub fn as_node(&self) -> &LocalNode { - &self.node - } -}