From 26b45228969280858c0b822feda0fc03437d5a65 Mon Sep 17 00:00:00 2001 From: dylan Date: Tue, 18 Feb 2025 12:28:58 -0700 Subject: [PATCH 01/15] wip: impl EvmFactory for Simulator --- Cargo.toml | 2 + src/tasks/mod.rs | 1 + src/tasks/simulator.rs | 336 ++++++++++++++++++++++++++++++++++++++++ tests/simulator_test.rs | 66 ++++++++ 4 files changed, 405 insertions(+) create mode 100644 src/tasks/simulator.rs create mode 100644 tests/simulator_test.rs diff --git a/Cargo.toml b/Cargo.toml index b1c9cb9..75912e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,3 +51,5 @@ async-trait = "0.1.80" oauth2 = "4.4.2" metrics = "0.24.1" metrics-exporter-prometheus = "0.16.0" +trevm = "0.19.3" +revm = "19.5.0" diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index 3149dec..3816934 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -4,3 +4,4 @@ pub mod metrics; pub mod oauth; pub mod submit; pub mod tx_poller; +pub mod simulator; diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs new file mode 100644 index 0000000..609e01c --- /dev/null +++ b/src/tasks/simulator.rs @@ -0,0 +1,336 @@ +use alloy::consensus::TxEnvelope; +use alloy::primitives::U256; +use revm::db::EmptyDBTyped; +use revm::{DatabaseRef, InMemoryDB}; +use revm::db::CacheDB; +use trevm::db::ConcurrentStateInfo; +use std::{ + convert::Infallible, + sync::{Arc, Weak}, +}; +use tokio::{sync::mpsc::UnboundedReceiver, task::JoinSet}; +use trevm::{ + self, db::ConcurrentState, revm::primitives::ResultAndState, Block, Cfg, EvmFactory, Tx, +}; +use trevm::{ + revm::{primitives::EVMError, Database, DatabaseCommit}, + BlockDriver, DbConnect, NoopBlock, NoopCfg, TrevmBuilder, +}; + +/// Defines the SimulatorDatabase type for ease of use and clarity +// pub type SimulatorDatabase = ConcurrentState>; + +/// Evm context inner +#[derive(Debug, Clone)] +pub struct EvmCtxInner { + evm_factory: Ef, + cfg: C, + block: B, +} + +/// Evm evaluation context +#[derive(Debug, Clone)] +pub struct EvmCtx(Arc>); + +/// Evm simulation pool +#[derive(Debug, Clone)] +pub struct EvmPool { + evm: EvmCtx, +} + +impl EvmPool +where + Ef: for<'a> EvmFactory<'a> + Send + 'static, + C: Cfg + 'static, + B: Block + 'static, +{ + /// Creates a new Evm Pool from the given factory and configs + pub fn new(evm_factory: Ef, cfg: C, block: B) -> EvmPool + where + Ef: for<'a> EvmFactory<'a> + Send + 'static, + C: Cfg + 'static, + B: Block + 'static, + { + let inner = EvmCtxInner { evm_factory, cfg, block }; + let evm = EvmCtx(Arc::new(inner)); + println!("evm factory - making new evm"); + EvmPool { evm } + } + + /// Obtains a weak reference to the evm that can be upgrade to run threaded simulation + fn weak_evm(&self) -> Weak> { + println!("obtaining weak evm"); + Arc::downgrade(&self.evm.0) + } +} + +fn eval_fn( + evm: Weak>, + tx: Weak, + evaluator: F, +) -> Option> +where + Ef: for<'a> EvmFactory<'a> + Send + 'static, + C: Cfg + 'static, + B: Block + 'static, + T: Tx + 'static, + F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, +{ + println!("eval_fn running - evm: {:?}", evm); + + // If none, then simulation is over. + let evm = evm.upgrade()?; + println!("evm upgraded"); + // If none, tx can be skipped + let tx = tx.upgrade()?; + println!("tx upgraded"); + + // If none, then tx errored, and can be skipped. + let result = evm.evm_factory.run(&evm.cfg, &evm.block, tx.as_ref()).ok()?; + // Best never comes back because run never returns. Why not? + + // TODO: Result is never returning - but eval_fn is definitely running. + // So what is happening? + println!("result: {:?}", &result); + + let score = evaluator(&result); + println!("score: {}", score); + + Some(Best { tx, result, score }) +} + +pub struct Best { + pub tx: Arc, + pub result: ResultAndState, + pub score: Score, +} + +impl EvmPool +where + Ef: for<'a> EvmFactory<'a> + Send + 'static, + C: Cfg + 'static, + B: Block + 'static, +{ + /// Spawn a task that will evaluate the best candidate from a channel of + /// candidates. + pub fn spawn( + self, + mut inbound_tx: UnboundedReceiver>, + evaluator: F, + deadline: tokio::time::Instant, + ) -> tokio::task::JoinHandle>> + where + T: Tx + 'static, + F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static + Clone, + { + tokio::spawn(async move { + let mut futs = JoinSet::new(); + let sleep = tokio::time::sleep_until(deadline); + tokio::pin!(sleep); + + let mut best: Option> = None; + + loop { + tokio::select! { + biased; + _ = &mut sleep => break, + tx = inbound_tx.recv() => { + let tx = match tx { + Some(tx) => tx, + None => break, + }; + + println!("receiving transaction"); + + let weak_tx = Arc::downgrade(&tx); + let evm = self.weak_evm(); + let eval = evaluator.clone(); + futs.spawn_blocking(|| eval_fn(evm, weak_tx, eval)); + } + Some(Ok(Some(candidate))) = futs.join_next() => { + println!("candidate used gas: {:?}", candidate.result.result.gas_used()); + if candidate.score > best.as_ref().map(|b| b.score).unwrap_or_default() { + best = Some(candidate); + } + } + } + } + best + }) + } +} + +/// +/// Simulator Factory +/// + +#[derive(Clone)] +pub struct SimulatorFactory { + pub db: Db, + pub ext: Ext, +} + +impl SimulatorFactory { + pub fn new(db: Db, ext: Ext) -> Self { + Self { db, ext } + } +} + +impl<'a, Db, Ext> DbConnect<'a> for SimulatorFactory +where + Db: Database + DatabaseRef + DatabaseCommit + Clone + Sync, + Ext: Sync, +{ + type Database = ConcurrentState>; + type Error = Infallible; + + fn connect(&'a self) -> Result { + println!("db connect called"); + todo!() + } +} + +impl<'a, Db, Ext> EvmFactory<'a> for SimulatorFactory +where + Db: Database + DatabaseRef + DatabaseCommit + Clone + Sync + Send + 'static, + Ext: Sync + Clone, +{ + type Ext = (); + + fn create(&'a self) -> Result, Self::Error> { + let empty_db = EmptyDBTyped::default(); + let inner_cache = CacheDB::new(empty_db); + let outer_cache = CacheDB::new(inner_cache); + let concurrent_db = ConcurrentState::new(outer_cache, ConcurrentStateInfo::default()); + let t = trevm::revm::EvmBuilder::default().with_db(concurrent_db).build_trevm(); + Ok(t) + } +} + +/// +/// Extractor +/// + +/// A trait for extracting transactions from a block. +pub trait BlockExtractor: Send + Sync + 'static { + type Driver: BlockDriver: core::error::Error>; + + /// Instantiate an configure a new [`trevm`] instance. + fn trevm(&self, db: Db) -> trevm::EvmNeedsBlock<'static, Ext, Db>; + + /// Extracts transactions from the source. + /// + /// Extraction is infallible. Worst case it should return a no-op driver. + fn extract(&mut self, bytes: &[u8]) -> Self::Driver; +} + +/// An implementation of BlockExtractor for Simulation purposes +pub struct SimulatorExtractor {} + +impl BlockExtractor<(), Db> for SimulatorExtractor +where + Db: Database + DatabaseCommit + Send + 'static, +{ + type Driver = SimBundle; + + fn trevm(&self, db: Db) -> trevm::EvmNeedsBlock<'static, (), Db> { + trevm::revm::EvmBuilder::default().with_db(db).build_trevm().fill_cfg(&NoopCfg) + } + + fn extract(&mut self, bytes: &[u8]) -> Self::Driver { + #[allow(clippy::useless_asref)] + let txs: Vec = + alloy_rlp::Decodable::decode(&mut bytes.as_ref()).unwrap_or_default(); + SimBundle(txs, NoopBlock) + } +} + +pub fn create_simulator_extractor() -> SimulatorExtractor +where + Db: Database + DatabaseCommit + Send + 'static, +{ + SimulatorExtractor {} +} + +/// +/// Bundle Driver +/// + +pub struct SimBundle(Vec, NoopBlock); + +pub struct SimTxEnvelope(pub TxEnvelope); + +impl Tx for SimTxEnvelope { + fn fill_tx_env(&self, tx_env: &mut revm::primitives::TxEnv) { + println!("fillng tx env {:?}", tx_env); + let revm::primitives::TxEnv { .. } = tx_env; + } +} + +impl BlockDriver for SimBundle { + type Block = NoopBlock; + type Error = Error; + + fn block(&self) -> &Self::Block { + &NoopBlock + } + + fn run_txns<'a, Db: Database + DatabaseCommit>( + &mut self, + mut trevm: trevm::EvmNeedsTx<'a, Ext, Db>, + ) -> trevm::RunTxResult<'a, Ext, Db, Self> { + for tx in self.0.iter() { + if tx.recover_signer().is_ok() { + let sim_tx = SimTxEnvelope(tx.clone()); + let t = match trevm.run_tx(&sim_tx) { + Ok(t) => t, + Err(e) => { + if e.is_transaction_error() { + return Ok(e.discard_error()); + } else { + return Err(e.err_into()); + } + } + }; + (_, trevm) = t.accept(); + } + } + Ok(trevm) + } + + fn post_block( + &mut self, + _trevm: &trevm::EvmNeedsBlock<'_, Ext, Db>, + ) -> Result<(), Self::Error> { + Ok(()) + } +} + +/// +/// Impls +/// + +pub struct Error(EVMError); + +impl From> for Error +where + Db: Database, +{ + fn from(e: EVMError) -> Self { + Self(e) + } +} + +impl core::error::Error for Error {} + +impl core::fmt::Debug for Error { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(f, "Error") + } +} + +impl core::fmt::Display for Error { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(f, "Error") + } +} diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs new file mode 100644 index 0000000..3335b78 --- /dev/null +++ b/tests/simulator_test.rs @@ -0,0 +1,66 @@ +use std::str::FromStr; +use std::sync::Arc; +use alloy::consensus::{SignableTransaction as _, TxEip1559, TxEnvelope}; +use alloy::signers::local::PrivateKeySigner; +use alloy::signers::SignerSync as _; +use builder::tasks::simulator::{EvmPool, SimTxEnvelope, SimulatorFactory}; +use revm::db::CacheDB; +use revm::primitives::{Address, TxKind}; +use revm::InMemoryDB; +use tokio::sync::mpsc; +use tokio::time::{Duration, Instant}; +use alloy::primitives::U256; +use trevm::{NoopBlock, NoopCfg}; +use trevm::revm::primitives::ResultAndState; + +#[tokio::test] +async fn test_spawn() { + let test_wallet = PrivateKeySigner::random(); + + let (tx_sender, tx_receiver) = mpsc::unbounded_channel::>(); + let deadline = Instant::now() + Duration::from_secs(2); + + let db = CacheDB::new(InMemoryDB::default()); + let ext = (); + + let evm_factory = SimulatorFactory::new(db, ext); + let evm_pool = EvmPool::new(evm_factory, NoopCfg, NoopBlock); + + // Start the evm pool + let handle = evm_pool.spawn(tx_receiver, mock_evaluator, deadline); + + // Send some transactions + for _ in 0..5 { + let test_tx = Arc::new(SimTxEnvelope(new_test_tx(&test_wallet).unwrap())); + println!("sending tx in {:?}", test_tx.0); + tx_sender.send(test_tx).unwrap(); + } + + // Wait for the handle to complete + let best = handle.await.unwrap(); + + // Check the result + assert!(best.is_some()); + assert_eq!(best.unwrap().score, U256::from(1)); +} + +fn mock_evaluator(_state: &ResultAndState) -> U256 { + U256::from(1) +} + +// Returns a new signed test transaction with default values +fn new_test_tx(wallet: &PrivateKeySigner) -> eyre::Result { + let tx = TxEip1559 { + chain_id: 17001, + nonce: 1, + gas_limit: 50000, + to: TxKind::Call( + Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), + ), + value: U256::from(1_f64), + input: alloy::primitives::bytes!(""), + ..Default::default() + }; + let signature = wallet.sign_hash_sync(&tx.signature_hash())?; + Ok(TxEnvelope::Eip1559(tx.into_signed(signature))) +} From 1af55a6a3457b8d4c3faa054fed25f1eaf3f47a8 Mon Sep 17 00:00:00 2001 From: dylan Date: Tue, 18 Feb 2025 16:33:43 -0700 Subject: [PATCH 02/15] wip: basic simulator test is running but not passing --- src/tasks/simulator.rs | 27 +++++++++++++++++++-------- tests/simulator_test.rs | 2 +- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index 609e01c..d90f12f 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -81,6 +81,7 @@ where // If none, then simulation is over. let evm = evm.upgrade()?; println!("evm upgraded"); + // If none, tx can be skipped let tx = tx.upgrade()?; println!("tx upgraded"); @@ -133,7 +134,10 @@ where loop { tokio::select! { biased; - _ = &mut sleep => break, + _ = &mut sleep => { + println!("simulation deadline exceeded"); + break + }, tx = inbound_tx.recv() => { let tx = match tx { Some(tx) => tx, @@ -176,17 +180,21 @@ impl SimulatorFactory { } } +// Wraps a Db into an EvmFactory compatible [`Database`] impl<'a, Db, Ext> DbConnect<'a> for SimulatorFactory where Db: Database + DatabaseRef + DatabaseCommit + Clone + Sync, Ext: Sync, { - type Database = ConcurrentState>; + type Database = ConcurrentState>; type Error = Infallible; fn connect(&'a self) -> Result { - println!("db connect called"); - todo!() + println!("connect - function called"); + let cache: CacheDB= CacheDB::new(self.db.clone()); + let concurrent_db = ConcurrentState::new(cache, ConcurrentStateInfo::default()); + println!("connect - concurrent db created"); + Ok(concurrent_db) } } @@ -198,11 +206,14 @@ where type Ext = (); fn create(&'a self) -> Result, Self::Error> { - let empty_db = EmptyDBTyped::default(); - let inner_cache = CacheDB::new(empty_db); - let outer_cache = CacheDB::new(inner_cache); - let concurrent_db = ConcurrentState::new(outer_cache, ConcurrentStateInfo::default()); + println!("create - function called"); + // let db = InMemoryDB::new(EmptyDBTyped::new()); + // let cache = CacheDB::new(db); + let cache = CacheDB::new(self.db.clone()); + let concurrent_db = ConcurrentState::new(cache, ConcurrentStateInfo::default()); + println!("create - cloned database"); let t = trevm::revm::EvmBuilder::default().with_db(concurrent_db).build_trevm(); + println!("create - trevm created {:?}", t); Ok(t) } } diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index 3335b78..103c82c 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -18,7 +18,7 @@ async fn test_spawn() { let test_wallet = PrivateKeySigner::random(); let (tx_sender, tx_receiver) = mpsc::unbounded_channel::>(); - let deadline = Instant::now() + Duration::from_secs(2); + let deadline = Instant::now() + Duration::from_secs(5); let db = CacheDB::new(InMemoryDB::default()); let ext = (); From 1afb397c3f64789f87b021e7b89a98f31c16bf72 Mon Sep 17 00:00:00 2001 From: dylan Date: Wed, 19 Feb 2025 16:10:15 -0700 Subject: [PATCH 03/15] wip: adding alloydb and rpc data source support --- Cargo.toml | 2 +- src/tasks/simulator.rs | 10 +++------- tests/simulator_test.rs | 33 ++++++++++++++++++++++----------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 75912e3..10f9e8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,4 +52,4 @@ oauth2 = "4.4.2" metrics = "0.24.1" metrics-exporter-prometheus = "0.16.0" trevm = "0.19.3" -revm = "19.5.0" +revm = { version = "19.4.0", features = [ "alloydb" ]} diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index d90f12f..bcb2abb 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -1,8 +1,6 @@ use alloy::consensus::TxEnvelope; use alloy::primitives::U256; -use revm::db::EmptyDBTyped; -use revm::{DatabaseRef, InMemoryDB}; -use revm::db::CacheDB; +use revm::{db::CacheDB, DatabaseRef}; use trevm::db::ConcurrentStateInfo; use std::{ convert::Infallible, @@ -207,10 +205,8 @@ where fn create(&'a self) -> Result, Self::Error> { println!("create - function called"); - // let db = InMemoryDB::new(EmptyDBTyped::new()); - // let cache = CacheDB::new(db); let cache = CacheDB::new(self.db.clone()); - let concurrent_db = ConcurrentState::new(cache, ConcurrentStateInfo::default()); + let concurrent_db: ConcurrentState> = ConcurrentState::new(cache, ConcurrentStateInfo::default()); println!("create - cloned database"); let t = trevm::revm::EvmBuilder::default().with_db(concurrent_db).build_trevm(); println!("create - trevm created {:?}", t); @@ -273,7 +269,7 @@ pub struct SimTxEnvelope(pub TxEnvelope); impl Tx for SimTxEnvelope { fn fill_tx_env(&self, tx_env: &mut revm::primitives::TxEnv) { - println!("fillng tx env {:?}", tx_env); + println!("fillng tx env {:?}", tx_env); // Possible cause let revm::primitives::TxEnv { .. } = tx_env; } } diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index 103c82c..cbb5707 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -1,29 +1,42 @@ -use std::str::FromStr; -use std::sync::Arc; use alloy::consensus::{SignableTransaction as _, TxEip1559, TxEnvelope}; +use alloy::eips::BlockId; +use alloy::primitives::U256; +use alloy::providers::{Provider, ProviderBuilder}; +use alloy::rpc::client::RpcClient; use alloy::signers::local::PrivateKeySigner; use alloy::signers::SignerSync as _; use builder::tasks::simulator::{EvmPool, SimTxEnvelope, SimulatorFactory}; -use revm::db::CacheDB; +use revm::db::{AlloyDB, CacheDB}; use revm::primitives::{Address, TxKind}; -use revm::InMemoryDB; +use std::str::FromStr; +use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; -use alloy::primitives::U256; -use trevm::{NoopBlock, NoopCfg}; use trevm::revm::primitives::ResultAndState; +use trevm::{NoopBlock, NoopCfg}; #[tokio::test] async fn test_spawn() { + // Create test identity let test_wallet = PrivateKeySigner::random(); + // Plumb the transaction pipeline let (tx_sender, tx_receiver) = mpsc::unbounded_channel::>(); let deadline = Instant::now() + Duration::from_secs(5); - let db = CacheDB::new(InMemoryDB::default()); + // Create an RPC provider for a data source + let url = "https://eth.merkle.io".parse().unwrap(); + let rpc_client = RpcClient::new_http(url); + let root_provider = ProviderBuilder::new().on_client(rpc_client.clone()); + + // TODO: Add a sanity check that the root_provider has real access and block numbers + + let runtime = tokio::runtime::Builder::new_current_thread().build().unwrap(); + let latest = root_provider.get_block_number().await.unwrap(); + let alloy_db = Arc::new(AlloyDB::with_runtime(root_provider.clone(), BlockId::from(latest), runtime)); let ext = (); - let evm_factory = SimulatorFactory::new(db, ext); + let evm_factory = SimulatorFactory::new(CacheDB::new(alloy_db), ext); let evm_pool = EvmPool::new(evm_factory, NoopCfg, NoopBlock); // Start the evm pool @@ -54,9 +67,7 @@ fn new_test_tx(wallet: &PrivateKeySigner) -> eyre::Result { chain_id: 17001, nonce: 1, gas_limit: 50000, - to: TxKind::Call( - Address::from_str("0x0000000000000000000000000000000000000000").unwrap(), - ), + to: TxKind::Call(Address::from_str("0x0000000000000000000000000000000000000000").unwrap()), value: U256::from(1_f64), input: alloy::primitives::bytes!(""), ..Default::default() From 9f2522f6014ed07246d25e0bab5af6e4c7982d1b Mon Sep 17 00:00:00 2001 From: dylan Date: Thu, 20 Feb 2025 13:00:11 -0700 Subject: [PATCH 04/15] wip: makes eval_fn take Arc instead of Weak --- src/tasks/simulator.rs | 16 ++-------------- tests/simulator_test.rs | 24 +++++++++++++----------- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index bcb2abb..98f6da9 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -15,9 +15,6 @@ use trevm::{ BlockDriver, DbConnect, NoopBlock, NoopCfg, TrevmBuilder, }; -/// Defines the SimulatorDatabase type for ease of use and clarity -// pub type SimulatorDatabase = ConcurrentState>; - /// Evm context inner #[derive(Debug, Clone)] pub struct EvmCtxInner { @@ -64,7 +61,7 @@ where fn eval_fn( evm: Weak>, - tx: Weak, + tx: Arc, evaluator: F, ) -> Option> where @@ -80,16 +77,8 @@ where let evm = evm.upgrade()?; println!("evm upgraded"); - // If none, tx can be skipped - let tx = tx.upgrade()?; - println!("tx upgraded"); - // If none, then tx errored, and can be skipped. let result = evm.evm_factory.run(&evm.cfg, &evm.block, tx.as_ref()).ok()?; - // Best never comes back because run never returns. Why not? - - // TODO: Result is never returning - but eval_fn is definitely running. - // So what is happening? println!("result: {:?}", &result); let score = evaluator(&result); @@ -144,10 +133,9 @@ where println!("receiving transaction"); - let weak_tx = Arc::downgrade(&tx); let evm = self.weak_evm(); let eval = evaluator.clone(); - futs.spawn_blocking(|| eval_fn(evm, weak_tx, eval)); + futs.spawn_blocking(|| eval_fn(evm, tx, eval)); } Some(Ok(Some(candidate))) = futs.join_next() => { println!("candidate used gas: {:?}", candidate.result.result.gas_used()); diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index cbb5707..4065318 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -15,25 +15,27 @@ use tokio::time::{Duration, Instant}; use trevm::revm::primitives::ResultAndState; use trevm::{NoopBlock, NoopCfg}; -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_spawn() { // Create test identity let test_wallet = PrivateKeySigner::random(); // Plumb the transaction pipeline let (tx_sender, tx_receiver) = mpsc::unbounded_channel::>(); - let deadline = Instant::now() + Duration::from_secs(5); + let deadline = Instant::now() + Duration::from_secs(2); - // Create an RPC provider for a data source - let url = "https://eth.merkle.io".parse().unwrap(); - let rpc_client = RpcClient::new_http(url); - let root_provider = ProviderBuilder::new().on_client(rpc_client.clone()); + // Create an RPC provider from the rollup + let url = "https://rpc.havarti.signet.sh ".parse().unwrap(); + let root_provider = ProviderBuilder::new().on_client(RpcClient::new_http(url)); + + let block_number = root_provider.get_block_number().await.unwrap(); + println!("block number {}", block_number); - // TODO: Add a sanity check that the root_provider has real access and block numbers - - let runtime = tokio::runtime::Builder::new_current_thread().build().unwrap(); let latest = root_provider.get_block_number().await.unwrap(); - let alloy_db = Arc::new(AlloyDB::with_runtime(root_provider.clone(), BlockId::from(latest), runtime)); + + let db = AlloyDB::new(Arc::new(root_provider.clone()), BlockId::from(latest)).unwrap(); + let alloy_db = Arc::new(db); + let ext = (); let evm_factory = SimulatorFactory::new(CacheDB::new(alloy_db), ext); @@ -45,7 +47,7 @@ async fn test_spawn() { // Send some transactions for _ in 0..5 { let test_tx = Arc::new(SimTxEnvelope(new_test_tx(&test_wallet).unwrap())); - println!("sending tx in {:?}", test_tx.0); + println!("dispatching tx {:?}", test_tx.0); tx_sender.send(test_tx).unwrap(); } From 4c31fe2a5b646cdb326e1ba63678bb6f43d718db Mon Sep 17 00:00:00 2001 From: Anna Carroll Date: Thu, 20 Feb 2025 17:09:13 -0600 Subject: [PATCH 05/15] fmt --- src/tasks/mod.rs | 2 +- src/tasks/simulator.rs | 15 ++++++++------- tests/simulator_test.rs | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index 3816934..01034f6 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -2,6 +2,6 @@ pub mod block; pub mod bundler; pub mod metrics; pub mod oauth; +pub mod simulator; pub mod submit; pub mod tx_poller; -pub mod simulator; diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index 98f6da9..43777e0 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -1,12 +1,12 @@ use alloy::consensus::TxEnvelope; use alloy::primitives::U256; use revm::{db::CacheDB, DatabaseRef}; -use trevm::db::ConcurrentStateInfo; use std::{ convert::Infallible, sync::{Arc, Weak}, }; use tokio::{sync::mpsc::UnboundedReceiver, task::JoinSet}; +use trevm::db::ConcurrentStateInfo; use trevm::{ self, db::ConcurrentState, revm::primitives::ResultAndState, Block, Cfg, EvmFactory, Tx, }; @@ -72,14 +72,14 @@ where F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, { println!("eval_fn running - evm: {:?}", evm); - + // If none, then simulation is over. let evm = evm.upgrade()?; println!("evm upgraded"); // If none, then tx errored, and can be skipped. let result = evm.evm_factory.run(&evm.cfg, &evm.block, tx.as_ref()).ok()?; - println!("result: {:?}", &result); + println!("result: {:?}", &result); let score = evaluator(&result); println!("score: {}", score); @@ -177,7 +177,7 @@ where fn connect(&'a self) -> Result { println!("connect - function called"); - let cache: CacheDB= CacheDB::new(self.db.clone()); + let cache: CacheDB = CacheDB::new(self.db.clone()); let concurrent_db = ConcurrentState::new(cache, ConcurrentStateInfo::default()); println!("connect - concurrent db created"); Ok(concurrent_db) @@ -194,7 +194,8 @@ where fn create(&'a self) -> Result, Self::Error> { println!("create - function called"); let cache = CacheDB::new(self.db.clone()); - let concurrent_db: ConcurrentState> = ConcurrentState::new(cache, ConcurrentStateInfo::default()); + let concurrent_db: ConcurrentState> = + ConcurrentState::new(cache, ConcurrentStateInfo::default()); println!("create - cloned database"); let t = trevm::revm::EvmBuilder::default().with_db(concurrent_db).build_trevm(); println!("create - trevm created {:?}", t); @@ -257,8 +258,8 @@ pub struct SimTxEnvelope(pub TxEnvelope); impl Tx for SimTxEnvelope { fn fill_tx_env(&self, tx_env: &mut revm::primitives::TxEnv) { - println!("fillng tx env {:?}", tx_env); // Possible cause - let revm::primitives::TxEnv { .. } = tx_env; + println!("fillng tx env {:?}", tx_env); // Possible cause + let revm::primitives::TxEnv { .. } = tx_env; } } diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index 4065318..48709a8 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -27,7 +27,7 @@ async fn test_spawn() { // Create an RPC provider from the rollup let url = "https://rpc.havarti.signet.sh ".parse().unwrap(); let root_provider = ProviderBuilder::new().on_client(RpcClient::new_http(url)); - + let block_number = root_provider.get_block_number().await.unwrap(); println!("block number {}", block_number); From 3d2b025b7f9282a23a0ac215bca06f88997e51a6 Mon Sep 17 00:00:00 2001 From: Anna Carroll Date: Thu, 20 Feb 2025 17:09:26 -0600 Subject: [PATCH 06/15] impl mock evaluator --- src/tasks/simulator.rs | 5 ++++- tests/simulator_test.rs | 20 ++++++++++++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index 43777e0..5814680 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -139,7 +139,10 @@ where } Some(Ok(Some(candidate))) = futs.join_next() => { println!("candidate used gas: {:?}", candidate.result.result.gas_used()); - if candidate.score > best.as_ref().map(|b| b.score).unwrap_or_default() { + // TODO: think about equality statement here. + // if using ">" then no candidate will be returned if every score is zero + // if using ">=" then the candidate will be replaced every time if every score is zero + if candidate.score >= best.as_ref().map(|b| b.score).unwrap_or_default() { best = Some(candidate); } } diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index 48709a8..7212788 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -12,7 +12,7 @@ use std::str::FromStr; use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; -use trevm::revm::primitives::ResultAndState; +use trevm::revm::primitives::{Account, ExecutionResult, ResultAndState}; use trevm::{NoopBlock, NoopCfg}; #[tokio::test(flavor = "multi_thread")] @@ -56,11 +56,23 @@ async fn test_spawn() { // Check the result assert!(best.is_some()); - assert_eq!(best.unwrap().score, U256::from(1)); + assert_eq!(best.unwrap().score, U256::from(0)); } -fn mock_evaluator(_state: &ResultAndState) -> U256 { - U256::from(1) +fn mock_evaluator(state: &ResultAndState) -> U256 { + // log the transaction results + match &state.result { + ExecutionResult::Success { .. } => println!("Execution was successful."), + ExecutionResult::Revert { .. } => println!("Execution reverted."), + ExecutionResult::Halt { .. } => println!("Execution halted."), + } + + // return the target account balance + let target_addr = Address::from_str("0x0000000000000000000000000000000000000000").unwrap(); + let default_account = Account::default(); + let target_account = state.state.get(&target_addr).unwrap_or(&default_account); + println!("target account balance: {:?}", target_account.info.balance); + target_account.info.balance } // Returns a new signed test transaction with default values From ba647f739029a14142227448d21cd43361043024 Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 24 Feb 2025 11:49:26 -0700 Subject: [PATCH 07/15] WIP: integrating trevm --- src/tasks/simulator.rs | 290 ++++++++++++++++++++++++++-------------- tests/simulator_test.rs | 18 ++- 2 files changed, 199 insertions(+), 109 deletions(-) diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index 5814680..23b9bc5 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -1,11 +1,12 @@ use alloy::consensus::TxEnvelope; use alloy::primitives::U256; -use revm::{db::CacheDB, DatabaseRef}; +use alloy_rlp::Encodable; +use revm::{db::CacheDB, primitives::Bytes, DatabaseRef}; use std::{ convert::Infallible, sync::{Arc, Weak}, }; -use tokio::{sync::mpsc::UnboundedReceiver, task::JoinSet}; +use tokio::{select, sync::mpsc::UnboundedReceiver, task::JoinSet}; use trevm::db::ConcurrentStateInfo; use trevm::{ self, db::ConcurrentState, revm::primitives::ResultAndState, Block, Cfg, EvmFactory, Tx, @@ -48,44 +49,43 @@ where { let inner = EvmCtxInner { evm_factory, cfg, block }; let evm = EvmCtx(Arc::new(inner)); - println!("evm factory - making new evm"); EvmPool { evm } } /// Obtains a weak reference to the evm that can be upgrade to run threaded simulation fn weak_evm(&self) -> Weak> { - println!("obtaining weak evm"); Arc::downgrade(&self.evm.0) } } -fn eval_fn( - evm: Weak>, - tx: Arc, - evaluator: F, -) -> Option> -where - Ef: for<'a> EvmFactory<'a> + Send + 'static, - C: Cfg + 'static, - B: Block + 'static, - T: Tx + 'static, - F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, -{ - println!("eval_fn running - evm: {:?}", evm); - - // If none, then simulation is over. - let evm = evm.upgrade()?; - println!("evm upgraded"); - - // If none, then tx errored, and can be skipped. - let result = evm.evm_factory.run(&evm.cfg, &evm.block, tx.as_ref()).ok()?; - println!("result: {:?}", &result); - - let score = evaluator(&result); - println!("score: {}", score); - - Some(Best { tx, result, score }) -} +// fn eval_fn( +// evm: Weak>, +// tx: Arc, +// evaluator: F, +// ) -> Option> +// where +// Ef: for<'a> EvmFactory<'a> + Send + 'static, +// C: Cfg + 'static, +// B: Block + 'static, +// T: Tx + 'static, +// F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, +// { +// tracing::info!("eval_fn running - evm: {:?}", evm); + +// // If none, then simulation is over. +// let evm = evm.upgrade()?; +// tracing::info!("evm upgraded"); + +// // If none, then tx errored, and can be skipped. +// let result = evm.evm_factory.run(&evm.cfg, &evm.block, tx.as_ref()).ok()?; +// result.state. +// tracing::info!("result: {:?}", &result); + +// let score = evaluator(&result); +// tracing::info!("score: {}", score); + +// Some(Best { tx, result, score }) +// } pub struct Best { pub tx: Arc, @@ -93,80 +93,145 @@ pub struct Best { pub score: Score, } -impl EvmPool +// impl EvmPool +// where +// Ef: for<'a> EvmFactory<'a> + Send + 'static, +// C: Cfg + 'static, +// B: Block + 'static, +// { +// /// Spawn a task that will evaluate the best candidate from a channel of +// /// candidates. +// pub fn spawn( +// self, +// mut inbound_tx: UnboundedReceiver>, +// evaluator: Box U256 + Send + Sync>, +// deadline: tokio::time::Instant, +// ) -> tokio::task::JoinHandle>> +// where +// T: Tx + 'static, +// { +// tokio::spawn(async move { +// let mut futs = JoinSet::new(); +// let sleep = tokio::time::sleep_until(deadline); +// tokio::pin!(sleep); + +// let mut best: Option> = None; + +// loop { +// tokio::select! { +// biased; +// _ = &mut sleep => { +// tracing::info!("simulation deadline exceeded"); +// break +// }, +// tx = inbound_tx.recv() => { +// let tx = match tx { +// Some(tx) => tx, +// None => break, +// }; + +// tracing::info!("receiving transaction"); + +// let evm = self.weak_evm(); +// let eval = evaluator; +// let eval = Arc::new(evaluator.as_ref()); +// } +// Some(Ok(Some(candidate))) = futs.join_next() => { +// tracing::info!("candidate used gas: {:?}", candidate.result.result.gas_used()); +// // TODO: think about equality statement here. +// // if using ">" then no candidate will be returned if every score is zero +// // if using ">=" then the candidate will be replaced every time if every score is zero +// if candidate.score >= best.as_ref().map(|b| b.score).unwrap_or_default() { +// best = Some(candidate); +// } +// } +// } +// } +// best +// }) +// } +// } + +/// SimBlock wraps an array of SimBundles +pub struct SimBlock(Vec); + +/// +/// Simulator Factory +/// + +/// Binds a database and a simulation extension together +#[derive(Clone)] +pub struct SimulatorFactory { + pub db: Db, + pub ext: Ext, +} + +type EvalFn = Arc U256 + Send + Sync>; + + +/// Creates a new SimulatorFactory from the given Database and Extension +impl SimulatorFactory where - Ef: for<'a> EvmFactory<'a> + Send + 'static, - C: Cfg + 'static, - B: Block + 'static, + Db: Database + DatabaseRef + DatabaseCommit + Clone + Send + Sync + 'static, { - /// Spawn a task that will evaluate the best candidate from a channel of - /// candidates. - pub fn spawn( + pub fn new(db: Db, ext: Ext) -> Self { + Self { db, ext } + } + + /// Spawns a trevm simulator + pub fn spawn_trevm( self, mut inbound_tx: UnboundedReceiver>, - evaluator: F, + mut inbound_bundle: UnboundedReceiver>, deadline: tokio::time::Instant, - ) -> tokio::task::JoinHandle>> + ) -> tokio::task::JoinHandle>> where - T: Tx + 'static, - F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static + Clone, + T: Tx + Send + Sync + 'static, + Db: Send + Sync + 'static, { tokio::spawn(async move { - let mut futs = JoinSet::new(); + let evaluator: Arc U256 + Send + Sync> = Arc::new(|result| { + // ... your logic ... + U256::from(1) + }); + + + // let mut futs = JoinSet::new(); let sleep = tokio::time::sleep_until(deadline); tokio::pin!(sleep); - let mut best: Option> = None; + let mut best: Option> = None; + + let mut extractor = SimulatorExtractor { + db: self.db.clone(), + }; + + let t = extractor.trevm(self.db); loop { - tokio::select! { - biased; - _ = &mut sleep => { - println!("simulation deadline exceeded"); - break + select! { + _ = sleep => { + break; }, tx = inbound_tx.recv() => { let tx = match tx { Some(tx) => tx, None => break, }; - - println!("receiving transaction"); - - let evm = self.weak_evm(); - let eval = evaluator.clone(); - futs.spawn_blocking(|| eval_fn(evm, tx, eval)); - } - Some(Ok(Some(candidate))) = futs.join_next() => { - println!("candidate used gas: {:?}", candidate.result.result.gas_used()); - // TODO: think about equality statement here. - // if using ">" then no candidate will be returned if every score is zero - // if using ">=" then the candidate will be replaced every time if every score is zero - if candidate.score >= best.as_ref().map(|b| b.score).unwrap_or_default() { - best = Some(candidate); - } + let trevm = extractor.trevm(); + tracing::info(tx = ?tx); + todo!(); + }, + b = inbound_bundle.recv() => { + todo!(); } } } + best }) } -} - -/// -/// Simulator Factory -/// -#[derive(Clone)] -pub struct SimulatorFactory { - pub db: Db, - pub ext: Ext, -} - -impl SimulatorFactory { - pub fn new(db: Db, ext: Ext) -> Self { - Self { db, ext } - } } // Wraps a Db into an EvmFactory compatible [`Database`] @@ -179,10 +244,9 @@ where type Error = Infallible; fn connect(&'a self) -> Result { - println!("connect - function called"); let cache: CacheDB = CacheDB::new(self.db.clone()); let concurrent_db = ConcurrentState::new(cache, ConcurrentStateInfo::default()); - println!("connect - concurrent db created"); + tracing::info!("created concurrent database"); Ok(concurrent_db) } } @@ -194,14 +258,11 @@ where { type Ext = (); + /// Create makes a [`ConcurrentState`] database by calling connect fn create(&'a self) -> Result, Self::Error> { - println!("create - function called"); - let cache = CacheDB::new(self.db.clone()); - let concurrent_db: ConcurrentState> = - ConcurrentState::new(cache, ConcurrentStateInfo::default()); - println!("create - cloned database"); + let concurrent_db = self.connect()?; let t = trevm::revm::EvmBuilder::default().with_db(concurrent_db).build_trevm(); - println!("create - trevm created {:?}", t); + tracing::info!("created trevm"); Ok(t) } } @@ -224,11 +285,16 @@ pub trait BlockExtractor: Send + Sync + 'sta } /// An implementation of BlockExtractor for Simulation purposes -pub struct SimulatorExtractor {} +#[derive(Clone)] +pub struct SimulatorExtractor { + db: Db, +} -impl BlockExtractor<(), Db> for SimulatorExtractor +/// SimulatorExtractor implements a block extractor and trevm block driver +/// for simulating and successively applying state updates from transactions. +impl BlockExtractor<(), Db> for SimulatorExtractor where - Db: Database + DatabaseCommit + Send + 'static, + Db: Database + DatabaseCommit + Send + Sync + 'static, { type Driver = SimBundle; @@ -237,6 +303,7 @@ where } fn extract(&mut self, bytes: &[u8]) -> Self::Driver { + // TODO: Should this use SimBundle instead of Vec? #[allow(clippy::useless_asref)] let txs: Vec = alloy_rlp::Decodable::decode(&mut bytes.as_ref()).unwrap_or_default(); @@ -244,24 +311,43 @@ where } } -pub fn create_simulator_extractor() -> SimulatorExtractor -where - Db: Database + DatabaseCommit + Send + 'static, -{ - SimulatorExtractor {} -} - -/// -/// Bundle Driver -/// - pub struct SimBundle(Vec, NoopBlock); pub struct SimTxEnvelope(pub TxEnvelope); +impl SimTxEnvelope { + /// Converts bytes into a SimTxEnvelope + pub fn to_tx(bytes: &[u8]) -> Option { + let tx: TxEnvelope = alloy_rlp::Decodable::decode(&mut bytes.as_ref()).ok()?; + Some(SimTxEnvelope(tx)) + } + + /// Converts a SimTxEnvelope into bytes + pub fn to_bytes(&self) -> Vec { + let mut out = Vec::new(); + self.0.encode(&mut out); + out + } +} + +impl From<&[u8]> for SimTxEnvelope { + fn from(bytes: &[u8]) -> Self { + let tx: TxEnvelope = alloy_rlp::Decodable::decode(&mut bytes.as_ref()).unwrap(); + SimTxEnvelope(tx) + } +} + +impl From<&SimTxEnvelope> for Vec { + fn from(tx: &SimTxEnvelope) -> Self { + let mut out = Vec::new(); + tx.0.encode(&mut out); + out + } +} + impl Tx for SimTxEnvelope { fn fill_tx_env(&self, tx_env: &mut revm::primitives::TxEnv) { - println!("fillng tx env {:?}", tx_env); // Possible cause + tracing::info!("fillng tx env {:?}", tx_env); // Possible cause let revm::primitives::TxEnv { .. } = tx_env; } } diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index 7212788..8c46bef 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -5,7 +5,7 @@ use alloy::providers::{Provider, ProviderBuilder}; use alloy::rpc::client::RpcClient; use alloy::signers::local::PrivateKeySigner; use alloy::signers::SignerSync as _; -use builder::tasks::simulator::{EvmPool, SimTxEnvelope, SimulatorFactory}; +use builder::tasks::simulator::{EvmPool, SimBundle, SimTxEnvelope, SimulatorFactory}; use revm::db::{AlloyDB, CacheDB}; use revm::primitives::{Address, TxKind}; use std::str::FromStr; @@ -22,6 +22,8 @@ async fn test_spawn() { // Plumb the transaction pipeline let (tx_sender, tx_receiver) = mpsc::unbounded_channel::>(); + let (inbound_tx, inbound_tx_receiver) = mpsc::unbounded_channel::>(); + let (inbound_bundle, inbound_bundle_receiver) = mpsc::unbounded_channel::>(); let deadline = Instant::now() + Duration::from_secs(2); // Create an RPC provider from the rollup @@ -29,25 +31,27 @@ async fn test_spawn() { let root_provider = ProviderBuilder::new().on_client(RpcClient::new_http(url)); let block_number = root_provider.get_block_number().await.unwrap(); - println!("block number {}", block_number); + assert_ne!(block_number, 0, "root provider is reporting block number 0"); let latest = root_provider.get_block_number().await.unwrap(); + assert!(latest > 0); let db = AlloyDB::new(Arc::new(root_provider.clone()), BlockId::from(latest)).unwrap(); let alloy_db = Arc::new(db); let ext = (); - let evm_factory = SimulatorFactory::new(CacheDB::new(alloy_db), ext); - let evm_pool = EvmPool::new(evm_factory, NoopCfg, NoopBlock); + let evaluator: Arc U256 + Send + Sync> = Arc::new(|result| { + U256::from(1) + }); - // Start the evm pool - let handle = evm_pool.spawn(tx_receiver, mock_evaluator, deadline); + let sim_factory = SimulatorFactory::new(CacheDB::new(alloy_db), ext); + sim_factory.spawn_trevm(inbound_tx_receiver, inbound_bundle_receiver, deadline); // Send some transactions for _ in 0..5 { let test_tx = Arc::new(SimTxEnvelope(new_test_tx(&test_wallet).unwrap())); - println!("dispatching tx {:?}", test_tx.0); + tracing::debug!("dispatching tx {:?}", test_tx.0); tx_sender.send(test_tx).unwrap(); } From b903212ae2796e32e6e5f35cd9066f420c42e085 Mon Sep 17 00:00:00 2001 From: dylan Date: Tue, 25 Feb 2025 15:50:24 -0700 Subject: [PATCH 08/15] WIP: adding trevm simulation --- src/tasks/simulator.rs | 238 ++++++++-------------------------------- tests/simulator_test.rs | 16 ++- 2 files changed, 55 insertions(+), 199 deletions(-) diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index 23b9bc5..87a3a1d 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -1,163 +1,27 @@ use alloy::consensus::TxEnvelope; use alloy::primitives::U256; use alloy_rlp::Encodable; -use revm::{db::CacheDB, primitives::Bytes, DatabaseRef}; -use std::{ - convert::Infallible, - sync::{Arc, Weak}, -}; -use tokio::{select, sync::mpsc::UnboundedReceiver, task::JoinSet}; +use revm::db::AlloyDB; +use revm::EvmBuilder; +use revm::{db::CacheDB, DatabaseRef}; +use std::{convert::Infallible, sync::Arc}; +use tokio::{select, sync::mpsc::UnboundedReceiver}; use trevm::db::ConcurrentStateInfo; -use trevm::{ - self, db::ConcurrentState, revm::primitives::ResultAndState, Block, Cfg, EvmFactory, Tx, -}; +use trevm::EvmNeedsBlock; +use trevm::{self, db::ConcurrentState, revm::primitives::ResultAndState, EvmFactory, Tx}; use trevm::{ revm::{primitives::EVMError, Database, DatabaseCommit}, BlockDriver, DbConnect, NoopBlock, NoopCfg, TrevmBuilder, }; -/// Evm context inner -#[derive(Debug, Clone)] -pub struct EvmCtxInner { - evm_factory: Ef, - cfg: C, - block: B, -} - -/// Evm evaluation context -#[derive(Debug, Clone)] -pub struct EvmCtx(Arc>); - -/// Evm simulation pool -#[derive(Debug, Clone)] -pub struct EvmPool { - evm: EvmCtx, -} - -impl EvmPool -where - Ef: for<'a> EvmFactory<'a> + Send + 'static, - C: Cfg + 'static, - B: Block + 'static, -{ - /// Creates a new Evm Pool from the given factory and configs - pub fn new(evm_factory: Ef, cfg: C, block: B) -> EvmPool - where - Ef: for<'a> EvmFactory<'a> + Send + 'static, - C: Cfg + 'static, - B: Block + 'static, - { - let inner = EvmCtxInner { evm_factory, cfg, block }; - let evm = EvmCtx(Arc::new(inner)); - EvmPool { evm } - } - - /// Obtains a weak reference to the evm that can be upgrade to run threaded simulation - fn weak_evm(&self) -> Weak> { - Arc::downgrade(&self.evm.0) - } -} - -// fn eval_fn( -// evm: Weak>, -// tx: Arc, -// evaluator: F, -// ) -> Option> -// where -// Ef: for<'a> EvmFactory<'a> + Send + 'static, -// C: Cfg + 'static, -// B: Block + 'static, -// T: Tx + 'static, -// F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, -// { -// tracing::info!("eval_fn running - evm: {:?}", evm); - -// // If none, then simulation is over. -// let evm = evm.upgrade()?; -// tracing::info!("evm upgraded"); - -// // If none, then tx errored, and can be skipped. -// let result = evm.evm_factory.run(&evm.cfg, &evm.block, tx.as_ref()).ok()?; -// result.state. -// tracing::info!("result: {:?}", &result); - -// let score = evaluator(&result); -// tracing::info!("score: {}", score); - -// Some(Best { tx, result, score }) -// } - pub struct Best { pub tx: Arc, pub result: ResultAndState, pub score: Score, } -// impl EvmPool -// where -// Ef: for<'a> EvmFactory<'a> + Send + 'static, -// C: Cfg + 'static, -// B: Block + 'static, -// { -// /// Spawn a task that will evaluate the best candidate from a channel of -// /// candidates. -// pub fn spawn( -// self, -// mut inbound_tx: UnboundedReceiver>, -// evaluator: Box U256 + Send + Sync>, -// deadline: tokio::time::Instant, -// ) -> tokio::task::JoinHandle>> -// where -// T: Tx + 'static, -// { -// tokio::spawn(async move { -// let mut futs = JoinSet::new(); -// let sleep = tokio::time::sleep_until(deadline); -// tokio::pin!(sleep); - -// let mut best: Option> = None; - -// loop { -// tokio::select! { -// biased; -// _ = &mut sleep => { -// tracing::info!("simulation deadline exceeded"); -// break -// }, -// tx = inbound_tx.recv() => { -// let tx = match tx { -// Some(tx) => tx, -// None => break, -// }; - -// tracing::info!("receiving transaction"); - -// let evm = self.weak_evm(); -// let eval = evaluator; -// let eval = Arc::new(evaluator.as_ref()); -// } -// Some(Ok(Some(candidate))) = futs.join_next() => { -// tracing::info!("candidate used gas: {:?}", candidate.result.result.gas_used()); -// // TODO: think about equality statement here. -// // if using ">" then no candidate will be returned if every score is zero -// // if using ">=" then the candidate will be replaced every time if every score is zero -// if candidate.score >= best.as_ref().map(|b| b.score).unwrap_or_default() { -// best = Some(candidate); -// } -// } -// } -// } -// best -// }) -// } -// } - /// SimBlock wraps an array of SimBundles -pub struct SimBlock(Vec); - -/// -/// Simulator Factory -/// +pub struct SimBlock(pub Vec); /// Binds a database and a simulation extension together #[derive(Clone)] @@ -166,11 +30,7 @@ pub struct SimulatorFactory { pub ext: Ext, } -type EvalFn = Arc U256 + Send + Sync>; - - -/// Creates a new SimulatorFactory from the given Database and Extension -impl SimulatorFactory +impl<'a, Db, Ext> SimulatorFactory where Db: Database + DatabaseRef + DatabaseCommit + Clone + Send + Sync + 'static, { @@ -179,59 +39,65 @@ where } /// Spawns a trevm simulator - pub fn spawn_trevm( + pub fn spawn( self, - mut inbound_tx: UnboundedReceiver>, + mut inbound_tx: UnboundedReceiver>, mut inbound_bundle: UnboundedReceiver>, + evaluator: Arc, deadline: tokio::time::Instant, ) -> tokio::task::JoinHandle>> where T: Tx + Send + Sync + 'static, - Db: Send + Sync + 'static, + F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, + Ext: Send + Sync + 'static, + ::Error: Send, { - tokio::spawn(async move { - let evaluator: Arc U256 + Send + Sync> = Arc::new(|result| { - // ... your logic ... - U256::from(1) - }); - - - // let mut futs = JoinSet::new(); + let jh = tokio::spawn(async move { + let mut best: Option> = None; let sleep = tokio::time::sleep_until(deadline); tokio::pin!(sleep); - let mut best: Option> = None; - - let mut extractor = SimulatorExtractor { - db: self.db.clone(), - }; - - let t = extractor.trevm(self.db); - loop { select! { - _ = sleep => { + _ = &mut sleep => { break; }, tx = inbound_tx.recv() => { - let tx = match tx { - Some(tx) => tx, - None => break, - }; - let trevm = extractor.trevm(); - tracing::info(tx = ?tx); - todo!(); - }, - b = inbound_bundle.recv() => { - todo!(); + println!("received tx"); + if let Some(inbound_tx) = tx { + self.handle_inbound_tx(inbound_tx, evaluator.clone()); + } else { + break; + } + } + bundle = inbound_bundle.recv() => { + println!("received bundle"); + if let Some(bundle) = bundle { + // TODO: Wire this up with proper type + // self.handle_inbound_bundle(bundle, evaluator); + } else { + break; + } } } } best - }) + }); + + jh + } + + /// simulates an inbound tx and applies its state if it's successfully simualted + pub fn handle_inbound_tx(&self, tx: Arc, evaluator: Arc) { + println!("received tx"); } + /// Simulates an inbound bundle and applies its state if it's successfully simulated + pub fn handle_inbound_bundle(&self, bundle: Arc>, evaluator: Arc) { + println!("received tx"); + todo!() + } } // Wraps a Db into an EvmFactory compatible [`Database`] @@ -246,7 +112,6 @@ where fn connect(&'a self) -> Result { let cache: CacheDB = CacheDB::new(self.db.clone()); let concurrent_db = ConcurrentState::new(cache, ConcurrentStateInfo::default()); - tracing::info!("created concurrent database"); Ok(concurrent_db) } } @@ -262,7 +127,6 @@ where fn create(&'a self) -> Result, Self::Error> { let concurrent_db = self.connect()?; let t = trevm::revm::EvmBuilder::default().with_db(concurrent_db).build_trevm(); - tracing::info!("created trevm"); Ok(t) } } @@ -286,13 +150,11 @@ pub trait BlockExtractor: Send + Sync + 'sta /// An implementation of BlockExtractor for Simulation purposes #[derive(Clone)] -pub struct SimulatorExtractor { - db: Db, -} +pub struct SimulatorExtractor {} /// SimulatorExtractor implements a block extractor and trevm block driver /// for simulating and successively applying state updates from transactions. -impl BlockExtractor<(), Db> for SimulatorExtractor +impl BlockExtractor<(), Db> for SimulatorExtractor where Db: Database + DatabaseCommit + Send + Sync + 'static, { @@ -391,10 +253,6 @@ impl BlockDriver for SimBundle { } } -/// -/// Impls -/// - pub struct Error(EVMError); impl From> for Error diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index 8c46bef..15baca6 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -5,15 +5,16 @@ use alloy::providers::{Provider, ProviderBuilder}; use alloy::rpc::client::RpcClient; use alloy::signers::local::PrivateKeySigner; use alloy::signers::SignerSync as _; -use builder::tasks::simulator::{EvmPool, SimBundle, SimTxEnvelope, SimulatorFactory}; +use builder::tasks::simulator::{SimBundle, SimTxEnvelope, SimulatorFactory}; use revm::db::{AlloyDB, CacheDB}; use revm::primitives::{Address, TxKind}; +use revm::Database; +use trevm::Tx; use std::str::FromStr; use std::sync::Arc; use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; use trevm::revm::primitives::{Account, ExecutionResult, ResultAndState}; -use trevm::{NoopBlock, NoopCfg}; #[tokio::test(flavor = "multi_thread")] async fn test_spawn() { @@ -22,8 +23,7 @@ async fn test_spawn() { // Plumb the transaction pipeline let (tx_sender, tx_receiver) = mpsc::unbounded_channel::>(); - let (inbound_tx, inbound_tx_receiver) = mpsc::unbounded_channel::>(); - let (inbound_bundle, inbound_bundle_receiver) = mpsc::unbounded_channel::>(); + let (bundle_sender, bundle_receiver) = mpsc::unbounded_channel::>(); let deadline = Instant::now() + Duration::from_secs(2); // Create an RPC provider from the rollup @@ -41,17 +41,15 @@ async fn test_spawn() { let ext = (); - let evaluator: Arc U256 + Send + Sync> = Arc::new(|result| { - U256::from(1) - }); + let evaluator = Arc::new(|_state: &ResultAndState| U256::from(1)); let sim_factory = SimulatorFactory::new(CacheDB::new(alloy_db), ext); - sim_factory.spawn_trevm(inbound_tx_receiver, inbound_bundle_receiver, deadline); + let handle = sim_factory.spawn(tx_receiver, bundle_receiver, evaluator, deadline); // Send some transactions for _ in 0..5 { let test_tx = Arc::new(SimTxEnvelope(new_test_tx(&test_wallet).unwrap())); - tracing::debug!("dispatching tx {:?}", test_tx.0); + println!("dispatching tx {:?}", test_tx.0); tx_sender.send(test_tx).unwrap(); } From 401e250423589475d582b4f0093c12389ab72caf Mon Sep 17 00:00:00 2001 From: dylan Date: Thu, 27 Feb 2025 12:50:45 -0700 Subject: [PATCH 09/15] wip: trevm lifetimes in simulator task now figured out --- src/tasks/simulator.rs | 132 ++++++++++++++++++++++++---------------- tests/simulator_test.rs | 2 +- 2 files changed, 82 insertions(+), 52 deletions(-) diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index 87a3a1d..3336b2a 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -1,17 +1,17 @@ use alloy::consensus::TxEnvelope; use alloy::primitives::U256; -use alloy_rlp::Encodable; -use revm::db::AlloyDB; -use revm::EvmBuilder; use revm::{db::CacheDB, DatabaseRef}; use std::{convert::Infallible, sync::Arc}; use tokio::{select, sync::mpsc::UnboundedReceiver}; -use trevm::db::ConcurrentStateInfo; -use trevm::EvmNeedsBlock; -use trevm::{self, db::ConcurrentState, revm::primitives::ResultAndState, EvmFactory, Tx}; + use trevm::{ - revm::{primitives::EVMError, Database, DatabaseCommit}, - BlockDriver, DbConnect, NoopBlock, NoopCfg, TrevmBuilder, + self, + db::{ConcurrentState, ConcurrentStateInfo}, + revm::{ + primitives::{EVMError, ResultAndState}, + Database, DatabaseCommit, + }, + BlockDriver, DbConnect, EvmFactory, NoopBlock, NoopCfg, TrevmBuilder, Tx, }; pub struct Best { @@ -49,11 +49,12 @@ where where T: Tx + Send + Sync + 'static, F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, - Ext: Send + Sync + 'static, + Ext: Send + Sync + Clone + 'static, ::Error: Send, { let jh = tokio::spawn(async move { let mut best: Option> = None; + let sleep = tokio::time::sleep_until(deadline); tokio::pin!(sleep); @@ -63,20 +64,35 @@ where break; }, tx = inbound_tx.recv() => { - println!("received tx"); + tracing::debug!("received tx"); + if let Some(inbound_tx) = tx { - self.handle_inbound_tx(inbound_tx, evaluator.clone()); - } else { - break; + tracing::debug!("handling inbound tx"); + + let trevm_instance = match self.create() { + Ok(instance) => instance, + Err(e) => { + tracing::error!(e = ?e, "Failed to create trevm instance"); + continue + } + }; + + self.handle_inbound_tx(inbound_tx, evaluator.clone(), trevm_instance); } } bundle = inbound_bundle.recv() => { - println!("received bundle"); - if let Some(bundle) = bundle { - // TODO: Wire this up with proper type - // self.handle_inbound_bundle(bundle, evaluator); - } else { - break; + if let Some(_bundle) = bundle { + println!("handling inbound bundle"); + + let _trevm_instance = match self.create() { + Ok(instance) => instance, + Err(e) => { + tracing::error!(e = ?e, "Failed to create trevm instance"); + continue + } + }; + + todo!() } } } @@ -89,8 +105,37 @@ where } /// simulates an inbound tx and applies its state if it's successfully simualted - pub fn handle_inbound_tx(&self, tx: Arc, evaluator: Arc) { - println!("received tx"); + pub fn handle_inbound_tx( + &self, + tx: Arc, + evaluator: Arc, + trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState>>, + ) -> Option> + where + T: Tx, + F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, + { + let mut block_driver = SimBundle(vec![todo!()], NoopBlock); + + // Configure and run the transaction + let result = trevm_instance.fill_cfg(&NoopCfg).drive_block(&mut block_driver); + + match result { + Ok(result) => { + // TODO: Run the evaluator on the completed state, returning the Best block + // let score = evaluator(result); + // Some(Best { + // tx, + // result: result_and_state, + // score, + // }) + todo!() + } + Err(e) => { + tracing::error!("Failed to drive block: {:?}", e); + None + } + } } /// Simulates an inbound bundle and applies its state if it's successfully simulated @@ -103,8 +148,8 @@ where // Wraps a Db into an EvmFactory compatible [`Database`] impl<'a, Db, Ext> DbConnect<'a> for SimulatorFactory where - Db: Database + DatabaseRef + DatabaseCommit + Clone + Sync, - Ext: Sync, + Db: Database + DatabaseRef + DatabaseCommit + Clone + Sync + Send + 'static, + Ext: Sync + Clone, { type Database = ConcurrentState>; type Error = Infallible; @@ -169,29 +214,15 @@ where #[allow(clippy::useless_asref)] let txs: Vec = alloy_rlp::Decodable::decode(&mut bytes.as_ref()).unwrap_or_default(); - SimBundle(txs, NoopBlock) + let sim_txs = txs.iter().map(|f| SimTxEnvelope(f.clone())).collect(); + SimBundle(sim_txs, NoopBlock) } } -pub struct SimBundle(Vec, NoopBlock); +pub struct SimBundle(Vec, NoopBlock); pub struct SimTxEnvelope(pub TxEnvelope); -impl SimTxEnvelope { - /// Converts bytes into a SimTxEnvelope - pub fn to_tx(bytes: &[u8]) -> Option { - let tx: TxEnvelope = alloy_rlp::Decodable::decode(&mut bytes.as_ref()).ok()?; - Some(SimTxEnvelope(tx)) - } - - /// Converts a SimTxEnvelope into bytes - pub fn to_bytes(&self) -> Vec { - let mut out = Vec::new(); - self.0.encode(&mut out); - out - } -} - impl From<&[u8]> for SimTxEnvelope { fn from(bytes: &[u8]) -> Self { let tx: TxEnvelope = alloy_rlp::Decodable::decode(&mut bytes.as_ref()).unwrap(); @@ -199,14 +230,6 @@ impl From<&[u8]> for SimTxEnvelope { } } -impl From<&SimTxEnvelope> for Vec { - fn from(tx: &SimTxEnvelope) -> Self { - let mut out = Vec::new(); - tx.0.encode(&mut out); - out - } -} - impl Tx for SimTxEnvelope { fn fill_tx_env(&self, tx_env: &mut revm::primitives::TxEnv) { tracing::info!("fillng tx env {:?}", tx_env); // Possible cause @@ -227,10 +250,17 @@ impl BlockDriver for SimBundle { mut trevm: trevm::EvmNeedsTx<'a, Ext, Db>, ) -> trevm::RunTxResult<'a, Ext, Db, Self> { for tx in self.0.iter() { - if tx.recover_signer().is_ok() { - let sim_tx = SimTxEnvelope(tx.clone()); + if tx.0.recover_signer().is_ok() { + let sim_tx = SimTxEnvelope(tx.0.clone()); let t = match trevm.run_tx(&sim_tx) { - Ok(t) => t, + Ok(t) => { + print!( + "successfully ran transaction - gas used {}", + t.result_and_state().result.gas_used() + ); + + t + } Err(e) => { if e.is_transaction_error() { return Ok(e.discard_error()); diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index 15baca6..77c05fd 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -44,7 +44,7 @@ async fn test_spawn() { let evaluator = Arc::new(|_state: &ResultAndState| U256::from(1)); let sim_factory = SimulatorFactory::new(CacheDB::new(alloy_db), ext); - let handle = sim_factory.spawn(tx_receiver, bundle_receiver, evaluator, deadline); + let handle = sim_factory.spawn::(tx_receiver, bundle_receiver, evaluator, deadline); // Send some transactions for _ in 0..5 { From dd24b5a687c840c516d24d8063ad9711209e049d Mon Sep 17 00:00:00 2001 From: dylan Date: Thu, 27 Feb 2025 19:55:12 -0700 Subject: [PATCH 10/15] wip: initial trevm integration working --- Cargo.toml | 2 +- src/tasks/simulator.rs | 41 +++++++++++++++++++---------------------- tests/simulator_test.rs | 39 ++++++++++++++++++++++++++------------- 3 files changed, 46 insertions(+), 36 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 10f9e8f..1f176e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ path = "bin/submit_transaction.rs" [dependencies] zenith-types = "0.13" -alloy = { version = "0.7.3", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp"] } +alloy = { version = "0.7.3", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp", "node-bindings"] } alloy-rlp = { version = "0.3.4" } aws-config = "1.1.7" diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index 3336b2a..9aba7ed 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -53,9 +53,9 @@ where ::Error: Send, { let jh = tokio::spawn(async move { - let mut best: Option> = None; + let best: Option> = None; - let sleep = tokio::time::sleep_until(deadline); + let sleep: tokio::time::Sleep = tokio::time::sleep_until(deadline); tokio::pin!(sleep); loop { @@ -77,14 +77,16 @@ where } }; - self.handle_inbound_tx(inbound_tx, evaluator.clone(), trevm_instance); + if let Some(result) = self.handle_inbound_tx::(inbound_tx, evaluator.clone(), trevm_instance) { + println!("simulation score: {}", result.score) + } } } bundle = inbound_bundle.recv() => { if let Some(_bundle) = bundle { println!("handling inbound bundle"); - let _trevm_instance = match self.create() { + let trevm_instance = match self.create() { Ok(instance) => instance, Err(e) => { tracing::error!(e = ?e, "Failed to create trevm instance"); @@ -104,35 +106,31 @@ where jh } - /// simulates an inbound tx and applies its state if it's successfully simualted + /// Simulates an inbound tx and applies its state if it's successfully simualted pub fn handle_inbound_tx( &self, - tx: Arc, + tx: Arc, evaluator: Arc, trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState>>, - ) -> Option> + ) -> Option> where T: Tx, F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, { - let mut block_driver = SimBundle(vec![todo!()], NoopBlock); - // Configure and run the transaction - let result = trevm_instance.fill_cfg(&NoopCfg).drive_block(&mut block_driver); + let result = trevm_instance + .fill_cfg(&NoopCfg) + .fill_block(&NoopBlock) + .fill_tx(tx.as_ref()) // Use as_ref() to get &SimTxEnvelope from Arc + .run(); match result { - Ok(result) => { - // TODO: Run the evaluator on the completed state, returning the Best block - // let score = evaluator(result); - // Some(Best { - // tx, - // result: result_and_state, - // score, - // }) - todo!() + Ok(success) => { + let score = evaluator(&success.result_and_state()); + Some(Best { tx, result: success.result_and_state().clone(), score }) } Err(e) => { - tracing::error!("Failed to drive block: {:?}", e); + tracing::error!("Failed to run transaction: {:?}", e); None } } @@ -210,7 +208,6 @@ where } fn extract(&mut self, bytes: &[u8]) -> Self::Driver { - // TODO: Should this use SimBundle instead of Vec? #[allow(clippy::useless_asref)] let txs: Vec = alloy_rlp::Decodable::decode(&mut bytes.as_ref()).unwrap_or_default(); @@ -232,7 +229,7 @@ impl From<&[u8]> for SimTxEnvelope { impl Tx for SimTxEnvelope { fn fill_tx_env(&self, tx_env: &mut revm::primitives::TxEnv) { - tracing::info!("fillng tx env {:?}", tx_env); // Possible cause + tracing::info!("fillng tx env {:?}", tx_env); let revm::primitives::TxEnv { .. } = tx_env; } } diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index 77c05fd..b5f9baf 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -1,10 +1,12 @@ use alloy::consensus::{SignableTransaction as _, TxEip1559, TxEnvelope}; use alloy::eips::BlockId; use alloy::primitives::U256; -use alloy::providers::{Provider, ProviderBuilder}; +use alloy::providers::{Provider, ProviderBuilder, RootProvider}; use alloy::rpc::client::RpcClient; use alloy::signers::local::PrivateKeySigner; use alloy::signers::SignerSync as _; +use alloy::transports::http::{Client, Http}; +use aws_sdk_kms::types::RotationsListEntry; use builder::tasks::simulator::{SimBundle, SimTxEnvelope, SimulatorFactory}; use revm::db::{AlloyDB, CacheDB}; use revm::primitives::{Address, TxKind}; @@ -23,33 +25,29 @@ async fn test_spawn() { // Plumb the transaction pipeline let (tx_sender, tx_receiver) = mpsc::unbounded_channel::>(); - let (bundle_sender, bundle_receiver) = mpsc::unbounded_channel::>(); + let (_bundle_sender, bundle_receiver) = mpsc::unbounded_channel::>(); let deadline = Instant::now() + Duration::from_secs(2); - // Create an RPC provider from the rollup - let url = "https://rpc.havarti.signet.sh ".parse().unwrap(); - let root_provider = ProviderBuilder::new().on_client(RpcClient::new_http(url)); - - let block_number = root_provider.get_block_number().await.unwrap(); - assert_ne!(block_number, 0, "root provider is reporting block number 0"); - + // Create a provider + let root_provider = new_rpc_provider("https://sepolia.gateway.tenderly.co".to_string()).unwrap(); let latest = root_provider.get_block_number().await.unwrap(); - assert!(latest > 0); let db = AlloyDB::new(Arc::new(root_provider.clone()), BlockId::from(latest)).unwrap(); let alloy_db = Arc::new(db); let ext = (); - let evaluator = Arc::new(|_state: &ResultAndState| U256::from(1)); + // Define the evaluator function + let evaluator = Arc::new(test_evaluator); + // Create a simulation factory let sim_factory = SimulatorFactory::new(CacheDB::new(alloy_db), ext); let handle = sim_factory.spawn::(tx_receiver, bundle_receiver, evaluator, deadline); // Send some transactions for _ in 0..5 { let test_tx = Arc::new(SimTxEnvelope(new_test_tx(&test_wallet).unwrap())); - println!("dispatching tx {:?}", test_tx.0); + // println!("dispatching tx {:?}", test_tx.0); tx_sender.send(test_tx).unwrap(); } @@ -61,7 +59,8 @@ async fn test_spawn() { assert_eq!(best.unwrap().score, U256::from(0)); } -fn mock_evaluator(state: &ResultAndState) -> U256 { +/// An example of a simple evaluator function for use in testing +fn test_evaluator(state: &ResultAndState) -> U256 { // log the transaction results match &state.result { ExecutionResult::Success { .. } => println!("Execution was successful."), @@ -91,3 +90,17 @@ fn new_test_tx(wallet: &PrivateKeySigner) -> eyre::Result { let signature = wallet.sign_hash_sync(&tx.signature_hash())?; Ok(TxEnvelope::Eip1559(tx.into_signed(signature))) } + +/// Returns a new RPC provider from a given URL +pub fn new_rpc_provider(url: String) -> eyre::Result>> { + let url = url.parse().unwrap(); + let root_provider = ProviderBuilder::new().on_client(RpcClient::new_http(url)); + Ok(root_provider) +} + +/// Returns a provider based on a local Anvil instance that it creates +pub fn new_anvil_provider() -> eyre::Result>> { + let anvil = alloy::node_bindings::Anvil::new().block_time(1).chain_id(17003).try_spawn().unwrap(); + let root_provider = ProviderBuilder::new().on_http(anvil.endpoint_url()); + Ok(root_provider) +} \ No newline at end of file From 504293616009fdc5a8ad5a5c66854d0b97f6a769 Mon Sep 17 00:00:00 2001 From: dylan Date: Fri, 28 Feb 2025 14:58:37 -0700 Subject: [PATCH 11/15] wip: more wire up --- src/tasks/simulator.rs | 59 ++++++++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 16 deletions(-) diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index 9aba7ed..61150f2 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -2,7 +2,7 @@ use alloy::consensus::TxEnvelope; use alloy::primitives::U256; use revm::{db::CacheDB, DatabaseRef}; use std::{convert::Infallible, sync::Arc}; -use tokio::{select, sync::mpsc::UnboundedReceiver}; +use tokio::{select, sync::mpsc::UnboundedReceiver, task::JoinSet}; use trevm::{ self, @@ -55,6 +55,8 @@ where let jh = tokio::spawn(async move { let best: Option> = None; + let mut join_set = JoinSet::new(); + let sleep: tokio::time::Sleep = tokio::time::sleep_until(deadline); tokio::pin!(sleep); @@ -67,19 +69,23 @@ where tracing::debug!("received tx"); if let Some(inbound_tx) = tx { - tracing::debug!("handling inbound tx"); - - let trevm_instance = match self.create() { - Ok(instance) => instance, - Err(e) => { - tracing::error!(e = ?e, "Failed to create trevm instance"); - continue + // set off a job that creates a new trevm with the concurrent database + // and then runs the simulation on that trevm instance + let simulation_handle = join_set.spawn(async move { + tracing::debug!("handling inbound tx"); + let trevm_instance = match self.create() { + Ok(instance) => instance, + Err(e) => { + tracing::error!(e = ?e, "Failed to create trevm instance"); + return + } + }; + + // simulate the transaction on the created trevm instance + if let Some(result) = self.simulate_tx::(inbound_tx, evaluator.clone(), trevm_instance) { + println!("simulation score: {}", result.score) } - }; - - if let Some(result) = self.handle_inbound_tx::(inbound_tx, evaluator.clone(), trevm_instance) { - println!("simulation score: {}", result.score) - } + }); } } bundle = inbound_bundle.recv() => { @@ -94,9 +100,21 @@ where } }; + if let Some(result) = self.simulate_bundle::(inbound_bundle, evaluator.clone(), trevm_instance) { + println!("simulation score: {}", result) + } + todo!() } } + Some(res) = join_set.join_next() => { + match res { + Ok(simulation_result) => { + println!("simulation result: {}") + }, + Err(e) => tracing::error!("Task failed: {}", e), + } + } } } @@ -107,7 +125,7 @@ where } /// Simulates an inbound tx and applies its state if it's successfully simualted - pub fn handle_inbound_tx( + pub fn simulate_tx( &self, tx: Arc, evaluator: Arc, @@ -117,7 +135,6 @@ where T: Tx, F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, { - let result = trevm_instance .fill_cfg(&NoopCfg) .fill_block(&NoopBlock) @@ -137,8 +154,18 @@ where } /// Simulates an inbound bundle and applies its state if it's successfully simulated - pub fn handle_inbound_bundle(&self, bundle: Arc>, evaluator: Arc) { + pub fn simulate_bundle( + &self, + bundle: Arc>, + evaluator: Arc, + trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState>>, + ) -> Option> + { println!("received tx"); + + let result = trevm_instance.fill_cfg(&NoopCfg); + let mut driver: &mut D = todo!(); // TODO: Make SimBundle mirror the SignetEthBundle type + result.drive_block(driver); todo!() } } From e72b8de9d7e3aa545935f281b8babd85e90ac454 Mon Sep 17 00:00:00 2001 From: dylan Date: Mon, 3 Mar 2025 11:11:39 -0700 Subject: [PATCH 12/15] wip: trevm simulation now working --- src/tasks/simulator.rs | 107 ++++++++++++++++++---------------------- tests/simulator_test.rs | 2 +- 2 files changed, 49 insertions(+), 60 deletions(-) diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index 61150f2..693111d 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -45,76 +45,47 @@ where mut inbound_bundle: UnboundedReceiver>, evaluator: Arc, deadline: tokio::time::Instant, - ) -> tokio::task::JoinHandle>> + ) -> tokio::task::JoinHandle>> where - T: Tx + Send + Sync + 'static, + T: Tx, F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, Ext: Send + Sync + Clone + 'static, ::Error: Send, { let jh = tokio::spawn(async move { - let best: Option> = None; - + let mut best: Option> = None; let mut join_set = JoinSet::new(); - let sleep: tokio::time::Sleep = tokio::time::sleep_until(deadline); + let sleep = tokio::time::sleep_until(deadline); tokio::pin!(sleep); loop { select! { - _ = &mut sleep => { - break; - }, + _ = &mut sleep => break, tx = inbound_tx.recv() => { - tracing::debug!("received tx"); - if let Some(inbound_tx) = tx { - // set off a job that creates a new trevm with the concurrent database - // and then runs the simulation on that trevm instance - let simulation_handle = join_set.spawn(async move { - tracing::debug!("handling inbound tx"); - let trevm_instance = match self.create() { + let eval = evaluator.clone(); + let sim = self.clone(); + + join_set.spawn(async move { + let trevm_instance = match sim.create() { Ok(instance) => instance, Err(e) => { tracing::error!(e = ?e, "Failed to create trevm instance"); - return + return None } }; - - // simulate the transaction on the created trevm instance - if let Some(result) = self.simulate_tx::(inbound_tx, evaluator.clone(), trevm_instance) { - println!("simulation score: {}", result.score) - } + sim.simulate_tx(inbound_tx, eval, trevm_instance) }); } } - bundle = inbound_bundle.recv() => { - if let Some(_bundle) = bundle { - println!("handling inbound bundle"); - - let trevm_instance = match self.create() { - Ok(instance) => instance, - Err(e) => { - tracing::error!(e = ?e, "Failed to create trevm instance"); - continue - } - }; - - if let Some(result) = self.simulate_bundle::(inbound_bundle, evaluator.clone(), trevm_instance) { - println!("simulation score: {}", result) - } - - todo!() - } - } - Some(res) = join_set.join_next() => { - match res { - Ok(simulation_result) => { - println!("simulation result: {}") - }, - Err(e) => tracing::error!("Task failed: {}", e), + Some(Ok(Some(candidate))) = join_set.join_next() => { + tracing::debug!(score = ?candidate.score, "job finished"); + if candidate.score > best.as_ref().map(|b| b.score).unwrap_or_default() { + best = Some(candidate); } } + else => break, } } @@ -125,14 +96,13 @@ where } /// Simulates an inbound tx and applies its state if it's successfully simualted - pub fn simulate_tx( + pub fn simulate_tx( &self, tx: Arc, evaluator: Arc, trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState>>, ) -> Option> where - T: Tx, F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, { let result = trevm_instance @@ -162,10 +132,10 @@ where ) -> Option> { println!("received tx"); - - let result = trevm_instance.fill_cfg(&NoopCfg); - let mut driver: &mut D = todo!(); // TODO: Make SimBundle mirror the SignetEthBundle type - result.drive_block(driver); + // TODO: Implement bundle handling, making sure to respect bundle revert guarantees + // let result = trevm_instance.fill_cfg(&NoopCfg); + // let mut driver: &mut D = todo!(); // TODO: Make SimBundle mirror the SignetEthBundle type + // result.drive_block(driver); todo!() } } @@ -196,8 +166,8 @@ where /// Create makes a [`ConcurrentState`] database by calling connect fn create(&'a self) -> Result, Self::Error> { let concurrent_db = self.connect()?; - let t = trevm::revm::EvmBuilder::default().with_db(concurrent_db).build_trevm(); - Ok(t) + let trevm_instance = trevm::revm::EvmBuilder::default().with_db(concurrent_db).build_trevm(); + Ok(trevm_instance) } } @@ -239,12 +209,26 @@ where let txs: Vec = alloy_rlp::Decodable::decode(&mut bytes.as_ref()).unwrap_or_default(); let sim_txs = txs.iter().map(|f| SimTxEnvelope(f.clone())).collect(); - SimBundle(sim_txs, NoopBlock) + SimBundle::new(sim_txs) } } -pub struct SimBundle(Vec, NoopBlock); +#[derive(Clone)] +pub struct SimBundle { + pub transactions: Vec, + pub block: NoopBlock, +} + +impl SimBundle { + pub fn new(transactions: Vec) -> Self { + Self { + transactions, + block: NoopBlock, + } + } +} +#[derive(Clone)] pub struct SimTxEnvelope(pub TxEnvelope); impl From<&[u8]> for SimTxEnvelope { @@ -254,6 +238,12 @@ impl From<&[u8]> for SimTxEnvelope { } } +impl From for Vec { + fn from(tx: SimTxEnvelope) -> Vec { + alloy_rlp::encode(tx.0) + } +} + impl Tx for SimTxEnvelope { fn fill_tx_env(&self, tx_env: &mut revm::primitives::TxEnv) { tracing::info!("fillng tx env {:?}", tx_env); @@ -266,14 +256,14 @@ impl BlockDriver for SimBundle { type Error = Error; fn block(&self) -> &Self::Block { - &NoopBlock + &self.block } fn run_txns<'a, Db: Database + DatabaseCommit>( &mut self, mut trevm: trevm::EvmNeedsTx<'a, Ext, Db>, ) -> trevm::RunTxResult<'a, Ext, Db, Self> { - for tx in self.0.iter() { + for tx in self.transactions.iter() { if tx.0.recover_signer().is_ok() { let sim_tx = SimTxEnvelope(tx.0.clone()); let t = match trevm.run_tx(&sim_tx) { @@ -282,7 +272,6 @@ impl BlockDriver for SimBundle { "successfully ran transaction - gas used {}", t.result_and_state().result.gas_used() ); - t } Err(e) => { diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index b5f9baf..136fe21 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -56,7 +56,7 @@ async fn test_spawn() { // Check the result assert!(best.is_some()); - assert_eq!(best.unwrap().score, U256::from(0)); + assert_ne!(best.unwrap().score, U256::from(0)); } /// An example of a simple evaluator function for use in testing From 13a9efb3771c525387951dbae7c22948a5619316 Mon Sep 17 00:00:00 2001 From: dylan Date: Tue, 4 Mar 2025 11:05:43 -0700 Subject: [PATCH 13/15] updates trevm version and uses new trevm child methods --- Cargo.toml | 2 +- src/tasks/simulator.rs | 64 ++++++++++++++++++++--------------------- tests/simulator_test.rs | 14 ++++----- 3 files changed, 39 insertions(+), 41 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1f176e7..49e1e75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,5 +51,5 @@ async-trait = "0.1.80" oauth2 = "4.4.2" metrics = "0.24.1" metrics-exporter-prometheus = "0.16.0" -trevm = "0.19.3" +trevm = { version = "0.19.11", features = [ "concurrent-db" ]} revm = { version = "19.4.0", features = [ "alloydb" ]} diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index 693111d..d7fbaab 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -6,10 +6,10 @@ use tokio::{select, sync::mpsc::UnboundedReceiver, task::JoinSet}; use trevm::{ self, - db::{ConcurrentState, ConcurrentStateInfo}, + db::sync::{ConcurrentState, ConcurrentStateInfo}, revm::{ primitives::{EVMError, ResultAndState}, - Database, DatabaseCommit, + Database, DatabaseCommit, EvmBuilder, }, BlockDriver, DbConnect, EvmFactory, NoopBlock, NoopCfg, TrevmBuilder, Tx, }; @@ -42,7 +42,7 @@ where pub fn spawn( self, mut inbound_tx: UnboundedReceiver>, - mut inbound_bundle: UnboundedReceiver>, + _inbound_bundle: UnboundedReceiver>, evaluator: Arc, deadline: tokio::time::Instant, ) -> tokio::task::JoinHandle>> @@ -50,6 +50,7 @@ where T: Tx, F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, Ext: Send + Sync + Clone + 'static, + Db: Database, ::Error: Send, { let jh = tokio::spawn(async move { @@ -67,21 +68,20 @@ where let eval = evaluator.clone(); let sim = self.clone(); + let parent_db = + Arc::new(ConcurrentState::new(self.db.clone(), ConcurrentStateInfo::default())); + join_set.spawn(async move { - let trevm_instance = match sim.create() { - Ok(instance) => instance, - Err(e) => { - tracing::error!(e = ?e, "Failed to create trevm instance"); - return None - } - }; - sim.simulate_tx(inbound_tx, eval, trevm_instance) + sim.simulate_tx(inbound_tx, eval, parent_db) }); } } Some(Ok(Some(candidate))) = join_set.join_next() => { - tracing::debug!(score = ?candidate.score, "job finished"); + tracing::debug!(score = ?candidate.0.score, "job finished"); + let (candidate, _) = candidate; + if candidate.score > best.as_ref().map(|b| b.score).unwrap_or_default() { + println!("best score found: {}", candidate.score); best = Some(candidate); } } @@ -97,14 +97,17 @@ where /// Simulates an inbound tx and applies its state if it's successfully simualted pub fn simulate_tx( - &self, + self, tx: Arc, evaluator: Arc, - trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState>>, - ) -> Option> + parent_db: Arc>, + ) -> Option<(Best, ConcurrentState>>)> where F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, { + let child_db = parent_db.child(); + let trevm_instance = EvmBuilder::default().with_db(child_db).build_trevm(); + let result = trevm_instance .fill_cfg(&NoopCfg) .fill_block(&NoopBlock) @@ -113,8 +116,10 @@ where match result { Ok(success) => { - let score = evaluator(&success.result_and_state()); - Some(Best { tx, result: success.result_and_state().clone(), score }) + let score = evaluator(success.result_and_state()); + let result_and_state = success.result_and_state().clone(); + let updated_db = success.into_db(); + Some((Best { tx, result: result_and_state, score }, updated_db)) } Err(e) => { tracing::error!("Failed to run transaction: {:?}", e); @@ -126,17 +131,12 @@ where /// Simulates an inbound bundle and applies its state if it's successfully simulated pub fn simulate_bundle( &self, - bundle: Arc>, - evaluator: Arc, - trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState>>, - ) -> Option> - { + _bundle: Arc>, + _evaluator: Arc, + _trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState>>, + ) -> Option> { println!("received tx"); - // TODO: Implement bundle handling, making sure to respect bundle revert guarantees - // let result = trevm_instance.fill_cfg(&NoopCfg); - // let mut driver: &mut D = todo!(); // TODO: Make SimBundle mirror the SignetEthBundle type - // result.drive_block(driver); - todo!() + todo!("implement bundle simulation") } } @@ -166,7 +166,8 @@ where /// Create makes a [`ConcurrentState`] database by calling connect fn create(&'a self) -> Result, Self::Error> { let concurrent_db = self.connect()?; - let trevm_instance = trevm::revm::EvmBuilder::default().with_db(concurrent_db).build_trevm(); + let trevm_instance = + trevm::revm::EvmBuilder::default().with_db(concurrent_db).build_trevm(); Ok(trevm_instance) } } @@ -221,10 +222,7 @@ pub struct SimBundle { impl SimBundle { pub fn new(transactions: Vec) -> Self { - Self { - transactions, - block: NoopBlock, - } + Self { transactions, block: NoopBlock } } } @@ -253,7 +251,7 @@ impl Tx for SimTxEnvelope { impl BlockDriver for SimBundle { type Block = NoopBlock; - type Error = Error; + type Error = Error; fn block(&self) -> &Self::Block { &self.block diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index 136fe21..69042de 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -6,12 +6,9 @@ use alloy::rpc::client::RpcClient; use alloy::signers::local::PrivateKeySigner; use alloy::signers::SignerSync as _; use alloy::transports::http::{Client, Http}; -use aws_sdk_kms::types::RotationsListEntry; use builder::tasks::simulator::{SimBundle, SimTxEnvelope, SimulatorFactory}; use revm::db::{AlloyDB, CacheDB}; use revm::primitives::{Address, TxKind}; -use revm::Database; -use trevm::Tx; use std::str::FromStr; use std::sync::Arc; use tokio::sync::mpsc; @@ -29,7 +26,8 @@ async fn test_spawn() { let deadline = Instant::now() + Duration::from_secs(2); // Create a provider - let root_provider = new_rpc_provider("https://sepolia.gateway.tenderly.co".to_string()).unwrap(); + let root_provider = + new_rpc_provider("https://sepolia.gateway.tenderly.co".to_string()).unwrap(); let latest = root_provider.get_block_number().await.unwrap(); let db = AlloyDB::new(Arc::new(root_provider.clone()), BlockId::from(latest)).unwrap(); @@ -42,7 +40,8 @@ async fn test_spawn() { // Create a simulation factory let sim_factory = SimulatorFactory::new(CacheDB::new(alloy_db), ext); - let handle = sim_factory.spawn::(tx_receiver, bundle_receiver, evaluator, deadline); + let handle = + sim_factory.spawn::(tx_receiver, bundle_receiver, evaluator, deadline); // Send some transactions for _ in 0..5 { @@ -100,7 +99,8 @@ pub fn new_rpc_provider(url: String) -> eyre::Result>> /// Returns a provider based on a local Anvil instance that it creates pub fn new_anvil_provider() -> eyre::Result>> { - let anvil = alloy::node_bindings::Anvil::new().block_time(1).chain_id(17003).try_spawn().unwrap(); + let anvil = + alloy::node_bindings::Anvil::new().block_time(1).chain_id(17003).try_spawn().unwrap(); let root_provider = ProviderBuilder::new().on_http(anvil.endpoint_url()); Ok(root_provider) -} \ No newline at end of file +} From 6e228657f0d8410ccf74e16f41c6ad657ba889b3 Mon Sep 17 00:00:00 2001 From: dylan Date: Wed, 5 Mar 2025 20:11:02 -0700 Subject: [PATCH 14/15] initial database integration working and merging --- src/tasks/simulator.rs | 96 +++++++++++++++++++++++++---------------- src/tasks/submit.rs | 1 + tests/simulator_test.rs | 11 ++--- 3 files changed, 65 insertions(+), 43 deletions(-) diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index d7fbaab..4a91023 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -6,7 +6,7 @@ use tokio::{select, sync::mpsc::UnboundedReceiver, task::JoinSet}; use trevm::{ self, - db::sync::{ConcurrentState, ConcurrentStateInfo}, + db::sync::{Child, ConcurrentState, ConcurrentStateInfo}, revm::{ primitives::{EVMError, ResultAndState}, Database, DatabaseCommit, EvmBuilder, @@ -32,7 +32,8 @@ pub struct SimulatorFactory { impl<'a, Db, Ext> SimulatorFactory where - Db: Database + DatabaseRef + DatabaseCommit + Clone + Send + Sync + 'static, + Ext: Send + Sync + Clone + 'static, + Db: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static, { pub fn new(db: Db, ext: Ext) -> Self { Self { db, ext } @@ -49,9 +50,6 @@ where where T: Tx, F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, - Ext: Send + Sync + Clone + 'static, - Db: Database, - ::Error: Send, { let jh = tokio::spawn(async move { let mut best: Option> = None; @@ -63,23 +61,33 @@ where loop { select! { _ = &mut sleep => break, + // Handle incoming tx = inbound_tx.recv() => { if let Some(inbound_tx) = tx { - let eval = evaluator.clone(); + // Setup the simulation environment let sim = self.clone(); + let eval = evaluator.clone(); + let mut parent_db = Arc::new(sim.connect().unwrap()); - let parent_db = - Arc::new(ConcurrentState::new(self.db.clone(), ConcurrentStateInfo::default())); - + // Kick off the work in a new thread join_set.spawn(async move { - sim.simulate_tx(inbound_tx, eval, parent_db) + let result = sim.simulate_tx(inbound_tx, eval, parent_db.child()); + if let Some((best, db)) = result { + if let Ok(()) = parent_db.can_merge(&db) { + if let Ok(()) = parent_db.merge_child(db) { + println!("merged db"); + } + } + Some(best) + } else { + None + } }); } } Some(Ok(Some(candidate))) = join_set.join_next() => { - tracing::debug!(score = ?candidate.0.score, "job finished"); - let (candidate, _) = candidate; - + println!("job finished"); + tracing::debug!(score = ?candidate.score, "job finished"); if candidate.score > best.as_ref().map(|b| b.score).unwrap_or_default() { println!("best score found: {}", candidate.score); best = Some(candidate); @@ -100,12 +108,13 @@ where self, tx: Arc, evaluator: Arc, - parent_db: Arc>, - ) -> Option<(Best, ConcurrentState>>)> + child_db: Child, + ) -> Option<(Best, Child)> where F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, + Db: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static, { - let child_db = parent_db.child(); + // take the first child, which must be resolved before this function ends let trevm_instance = EvmBuilder::default().with_db(child_db).build_trevm(); let result = trevm_instance @@ -115,11 +124,20 @@ where .run(); match result { - Ok(success) => { - let score = evaluator(success.result_and_state()); - let result_and_state = success.result_and_state().clone(); - let updated_db = success.into_db(); - Some((Best { tx, result: result_and_state, score }, updated_db)) + Ok(t) => { + // run the evaluation against the result and state + let res = t.result_and_state(); + let score = evaluator(res); + let result_and_state = res.clone(); + println!("gas used: {}", result_and_state.result.gas_used()); + + // accept and return the updated_db with the execution score + let t = t.accept(); + println!("execution logs: {:?} ", t.0.into_logs()); + + let db = t.1.into_db(); + + Some((Best { tx, result: result_and_state, score }, db)) } Err(e) => { tracing::error!("Failed to run transaction: {:?}", e); @@ -129,46 +147,48 @@ where } /// Simulates an inbound bundle and applies its state if it's successfully simulated - pub fn simulate_bundle( + pub fn simulate_bundle( &self, _bundle: Arc>, _evaluator: Arc, - _trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState>>, - ) -> Option> { - println!("received tx"); - todo!("implement bundle simulation") + _trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState>>>, + ) -> Option> + where + T: Tx + Send + Sync + 'static, + F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, + { + todo!("implement bundle handling") } } -// Wraps a Db into an EvmFactory compatible [`Database`] +/// Wraps a Db into an EvmFactory compatible [`Database`] impl<'a, Db, Ext> DbConnect<'a> for SimulatorFactory where - Db: Database + DatabaseRef + DatabaseCommit + Clone + Sync + Send + 'static, + Db: Database + DatabaseRef + DatabaseCommit + Sync + Send + Clone + 'static, Ext: Sync + Clone, { - type Database = ConcurrentState>; + type Database = ConcurrentState; type Error = Infallible; fn connect(&'a self) -> Result { - let cache: CacheDB = CacheDB::new(self.db.clone()); - let concurrent_db = ConcurrentState::new(cache, ConcurrentStateInfo::default()); - Ok(concurrent_db) + let inner = ConcurrentState::new(self.db.clone(), ConcurrentStateInfo::default()); + Ok(inner) } } +/// Makes a SimulatorFactory capable of creating and configuring trevm instances impl<'a, Db, Ext> EvmFactory<'a> for SimulatorFactory where - Db: Database + DatabaseRef + DatabaseCommit + Clone + Sync + Send + 'static, + Db: Database + DatabaseRef + DatabaseCommit + Sync + Send + Clone + 'static, Ext: Sync + Clone, { type Ext = (); /// Create makes a [`ConcurrentState`] database by calling connect fn create(&'a self) -> Result, Self::Error> { - let concurrent_db = self.connect()?; - let trevm_instance = - trevm::revm::EvmBuilder::default().with_db(concurrent_db).build_trevm(); - Ok(trevm_instance) + let db = self.connect()?; + let trevm = trevm::revm::EvmBuilder::default().with_db(db).build_trevm(); + Ok(trevm) } } @@ -176,7 +196,7 @@ where /// Extractor /// -/// A trait for extracting transactions from a block. +/// A trait for extracting transactions from pub trait BlockExtractor: Send + Sync + 'static { type Driver: BlockDriver: core::error::Error>; diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 8adf084..ca7d920 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -181,6 +181,7 @@ impl SubmitTask { self.send_transaction(resp, tx).await } + /// Send the transaction to the network async fn send_transaction( &self, resp: &SignResponse, diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index 69042de..f73a1dc 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -30,23 +30,24 @@ async fn test_spawn() { new_rpc_provider("https://sepolia.gateway.tenderly.co".to_string()).unwrap(); let latest = root_provider.get_block_number().await.unwrap(); - let db = AlloyDB::new(Arc::new(root_provider.clone()), BlockId::from(latest)).unwrap(); - let alloy_db = Arc::new(db); + // Create an alloyDB from the provider at the latest height + let alloy_db = AlloyDB::new(Arc::new(root_provider.clone()), BlockId::from(latest)).unwrap(); + let db = CacheDB::new(Arc::new(alloy_db)); + // Define trevm extension, if any let ext = (); // Define the evaluator function let evaluator = Arc::new(test_evaluator); - // Create a simulation factory - let sim_factory = SimulatorFactory::new(CacheDB::new(alloy_db), ext); + // Create a simulation factory with the provided DB + let sim_factory = SimulatorFactory::new(db, ext); let handle = sim_factory.spawn::(tx_receiver, bundle_receiver, evaluator, deadline); // Send some transactions for _ in 0..5 { let test_tx = Arc::new(SimTxEnvelope(new_test_tx(&test_wallet).unwrap())); - // println!("dispatching tx {:?}", test_tx.0); tx_sender.send(test_tx).unwrap(); } From 5fadb46ccff9f54ea0d4f01589eb7f1e2c720d60 Mon Sep 17 00:00:00 2001 From: dylan Date: Thu, 6 Mar 2025 15:38:11 -0700 Subject: [PATCH 15/15] cleanup --- src/tasks/block.rs | 5 ++ src/tasks/simulator.rs | 141 ++++++++++++++++++++-------------------- tests/simulator_test.rs | 12 ++-- 3 files changed, 83 insertions(+), 75 deletions(-) diff --git a/src/tasks/block.rs b/src/tasks/block.rs index 24d32ed..7b9d855 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -114,6 +114,11 @@ impl InProgressBlock { coder.ingest(self.encode_raw()); coder } + + /// Returns the current set of transactions in the block. + pub fn transactions(&self) -> &Vec { + &self.transactions + } } /// BlockBuilder is a task that periodically builds a block then sends it for signing and submission. diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs index 4a91023..7071ae9 100644 --- a/src/tasks/simulator.rs +++ b/src/tasks/simulator.rs @@ -1,9 +1,11 @@ +use super::bundler::Bundle; +use crate::tasks::block::InProgressBlock; use alloy::consensus::TxEnvelope; use alloy::primitives::U256; +use eyre::Result; use revm::{db::CacheDB, DatabaseRef}; use std::{convert::Infallible, sync::Arc}; use tokio::{select, sync::mpsc::UnboundedReceiver, task::JoinSet}; - use trevm::{ self, db::sync::{Child, ConcurrentState, ConcurrentStateInfo}, @@ -13,6 +15,7 @@ use trevm::{ }, BlockDriver, DbConnect, EvmFactory, NoopBlock, NoopCfg, TrevmBuilder, Tx, }; +use zenith_types::ZenithEthBundle; pub struct Best { pub tx: Arc, @@ -20,9 +23,6 @@ pub struct Best { pub score: Score, } -/// SimBlock wraps an array of SimBundles -pub struct SimBlock(pub Vec); - /// Binds a database and a simulation extension together #[derive(Clone)] pub struct SimulatorFactory { @@ -39,11 +39,14 @@ where Self { db, ext } } - /// Spawns a trevm simulator + /// Spawns a trevm simulator. + /// Spawn does not guarantee that a thread is finished before the deadline. + /// This is intentional, so that it can maximize simulation time before the deadline. + /// This function will always return whatever the latest finished best was. pub fn spawn( self, mut inbound_tx: UnboundedReceiver>, - _inbound_bundle: UnboundedReceiver>, + _inbound_bundle: UnboundedReceiver>>, evaluator: Arc, deadline: tokio::time::Instant, ) -> tokio::task::JoinHandle>> @@ -52,16 +55,18 @@ where F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, { let jh = tokio::spawn(async move { - let mut best: Option> = None; + // Spawn a join set to track all simulation threads let mut join_set = JoinSet::new(); + let mut best: Option> = None; + let sleep = tokio::time::sleep_until(deadline); tokio::pin!(sleep); loop { select! { _ = &mut sleep => break, - // Handle incoming + // Handle incoming tx = inbound_tx.recv() => { if let Some(inbound_tx) = tx { // Setup the simulation environment @@ -69,13 +74,13 @@ where let eval = evaluator.clone(); let mut parent_db = Arc::new(sim.connect().unwrap()); - // Kick off the work in a new thread + // Kick off the work in a new thread join_set.spawn(async move { let result = sim.simulate_tx(inbound_tx, eval, parent_db.child()); if let Some((best, db)) = result { if let Ok(()) = parent_db.can_merge(&db) { if let Ok(()) = parent_db.merge_child(db) { - println!("merged db"); + tracing::info!("merged db"); } } Some(best) @@ -86,10 +91,9 @@ where } } Some(Ok(Some(candidate))) = join_set.join_next() => { - println!("job finished"); tracing::debug!(score = ?candidate.score, "job finished"); if candidate.score > best.as_ref().map(|b| b.score).unwrap_or_default() { - println!("best score found: {}", candidate.score); + tracing::info!(score = ?candidate.score, "new best candidate found"); best = Some(candidate); } } @@ -114,7 +118,6 @@ where F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, Db: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static, { - // take the first child, which must be resolved before this function ends let trevm_instance = EvmBuilder::default().with_db(child_db).build_trevm(); let result = trevm_instance @@ -125,16 +128,20 @@ where match result { Ok(t) => { - // run the evaluation against the result and state + let hash = tx.0.tx_hash(); + tracing::info!(hash = ?hash, "simulated transaction"); + let res = t.result_and_state(); let score = evaluator(res); - let result_and_state = res.clone(); - println!("gas used: {}", result_and_state.result.gas_used()); + tracing::debug!(score = ?score, "evaluated transaction score"); + let result_and_state = res.clone(); + tracing::debug!(gas_used = result_and_state.result.gas_used(), "gas consumed"); + // accept and return the updated_db with the execution score let t = t.accept(); - println!("execution logs: {:?} ", t.0.into_logs()); + // take the db and return it wth the best let db = t.1.into_db(); Some((Best { tx, result: result_and_state, score }, db)) @@ -146,13 +153,49 @@ where } } + /// Adds a given bundle to a given block, creates a [`trevm`] instance, + /// runs the [`BlockDriver`] with that instance, and then returns + /// the [`Child`] of the updated state. + pub fn apply_bundle( + &self, + mut block: InProgressBlock, + bundle: Arc, + child_db: Child, + ) -> Result> { + let trevm = EvmBuilder::default() + .with_db(child_db) + .build_trevm() + .fill_cfg(&NoopCfg) + .fill_block(&NoopBlock); + + block.ingest_bundle(Bundle { + id: bundle.replacement_uuid().unwrap_or_default().to_string(), + bundle: ZenithEthBundle { + bundle: bundle.bundle.clone(), + host_fills: bundle.host_fills.clone(), + }, + }); + + let result = block.run_txns(trevm); + match result { + Ok(t) => { + let db = t.into_db(); + Ok(db) + } + Err(t_error) => { + tracing::error!(err = ?t_error, "Failed to run block"); + eyre::bail!("Failed to run block"); + } + } + } + /// Simulates an inbound bundle and applies its state if it's successfully simulated pub fn simulate_bundle( &self, _bundle: Arc>, _evaluator: Arc, _trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState>>>, - ) -> Option> + ) -> Option>> where T: Tx + Send + Sync + 'static, F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, @@ -192,10 +235,6 @@ where } } -/// -/// Extractor -/// - /// A trait for extracting transactions from pub trait BlockExtractor: Send + Sync + 'static { type Driver: BlockDriver: core::error::Error>; @@ -209,43 +248,6 @@ pub trait BlockExtractor: Send + Sync + 'sta fn extract(&mut self, bytes: &[u8]) -> Self::Driver; } -/// An implementation of BlockExtractor for Simulation purposes -#[derive(Clone)] -pub struct SimulatorExtractor {} - -/// SimulatorExtractor implements a block extractor and trevm block driver -/// for simulating and successively applying state updates from transactions. -impl BlockExtractor<(), Db> for SimulatorExtractor -where - Db: Database + DatabaseCommit + Send + Sync + 'static, -{ - type Driver = SimBundle; - - fn trevm(&self, db: Db) -> trevm::EvmNeedsBlock<'static, (), Db> { - trevm::revm::EvmBuilder::default().with_db(db).build_trevm().fill_cfg(&NoopCfg) - } - - fn extract(&mut self, bytes: &[u8]) -> Self::Driver { - #[allow(clippy::useless_asref)] - let txs: Vec = - alloy_rlp::Decodable::decode(&mut bytes.as_ref()).unwrap_or_default(); - let sim_txs = txs.iter().map(|f| SimTxEnvelope(f.clone())).collect(); - SimBundle::new(sim_txs) - } -} - -#[derive(Clone)] -pub struct SimBundle { - pub transactions: Vec, - pub block: NoopBlock, -} - -impl SimBundle { - pub fn new(transactions: Vec) -> Self { - Self { transactions, block: NoopBlock } - } -} - #[derive(Clone)] pub struct SimTxEnvelope(pub TxEnvelope); @@ -269,29 +271,26 @@ impl Tx for SimTxEnvelope { } } -impl BlockDriver for SimBundle { +impl BlockDriver for InProgressBlock { type Block = NoopBlock; + type Error = Error; fn block(&self) -> &Self::Block { - &self.block + &NoopBlock } + /// Loops through the transactions in the block and runs them, accepting the state at the end + /// if it was successful and returning and erroring out otherwise. fn run_txns<'a, Db: Database + DatabaseCommit>( &mut self, mut trevm: trevm::EvmNeedsTx<'a, Ext, Db>, ) -> trevm::RunTxResult<'a, Ext, Db, Self> { - for tx in self.transactions.iter() { - if tx.0.recover_signer().is_ok() { - let sim_tx = SimTxEnvelope(tx.0.clone()); + for tx in self.transactions().iter() { + if tx.recover_signer().is_ok() { + let sim_tx = SimTxEnvelope(tx.clone()); let t = match trevm.run_tx(&sim_tx) { - Ok(t) => { - print!( - "successfully ran transaction - gas used {}", - t.result_and_state().result.gas_used() - ); - t - } + Ok(t) => t, Err(e) => { if e.is_transaction_error() { return Ok(e.discard_error()); diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs index f73a1dc..6bf6e30 100644 --- a/tests/simulator_test.rs +++ b/tests/simulator_test.rs @@ -6,7 +6,7 @@ use alloy::rpc::client::RpcClient; use alloy::signers::local::PrivateKeySigner; use alloy::signers::SignerSync as _; use alloy::transports::http::{Client, Http}; -use builder::tasks::simulator::{SimBundle, SimTxEnvelope, SimulatorFactory}; +use builder::tasks::simulator::{SimTxEnvelope, SimulatorFactory}; use revm::db::{AlloyDB, CacheDB}; use revm::primitives::{Address, TxKind}; use std::str::FromStr; @@ -22,7 +22,7 @@ async fn test_spawn() { // Plumb the transaction pipeline let (tx_sender, tx_receiver) = mpsc::unbounded_channel::>(); - let (_bundle_sender, bundle_receiver) = mpsc::unbounded_channel::>(); + let (_bundle_sender, bundle_receiver) = mpsc::unbounded_channel::>>(); let deadline = Instant::now() + Duration::from_secs(2); // Create a provider @@ -56,7 +56,10 @@ async fn test_spawn() { // Check the result assert!(best.is_some()); - assert_ne!(best.unwrap().score, U256::from(0)); + let result = best.unwrap(); + assert_ne!(result.score, U256::from(0)); + + println!("Best: {:?}", result.score); } /// An example of a simple evaluator function for use in testing @@ -72,7 +75,8 @@ fn test_evaluator(state: &ResultAndState) -> U256 { let target_addr = Address::from_str("0x0000000000000000000000000000000000000000").unwrap(); let default_account = Account::default(); let target_account = state.state.get(&target_addr).unwrap_or(&default_account); - println!("target account balance: {:?}", target_account.info.balance); + tracing::info!(balance = ?target_account.info.balance, "target account balance"); + target_account.info.balance }