diff --git a/Cargo.toml b/Cargo.toml index b1c9cb9..49e1e75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ path = "bin/submit_transaction.rs" [dependencies] zenith-types = "0.13" -alloy = { version = "0.7.3", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp"] } +alloy = { version = "0.7.3", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp", "node-bindings"] } alloy-rlp = { version = "0.3.4" } aws-config = "1.1.7" @@ -51,3 +51,5 @@ async-trait = "0.1.80" oauth2 = "4.4.2" metrics = "0.24.1" metrics-exporter-prometheus = "0.16.0" +trevm = { version = "0.19.11", features = [ "concurrent-db" ]} +revm = { version = "19.4.0", features = [ "alloydb" ]} diff --git a/src/tasks/block.rs b/src/tasks/block.rs index 24d32ed..7b9d855 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -114,6 +114,11 @@ impl InProgressBlock { coder.ingest(self.encode_raw()); coder } + + /// Returns the current set of transactions in the block. + pub fn transactions(&self) -> &Vec { + &self.transactions + } } /// BlockBuilder is a task that periodically builds a block then sends it for signing and submission. diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index 3149dec..01034f6 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -2,5 +2,6 @@ pub mod block; pub mod bundler; pub mod metrics; pub mod oauth; +pub mod simulator; pub mod submit; pub mod tx_poller; diff --git a/src/tasks/simulator.rs b/src/tasks/simulator.rs new file mode 100644 index 0000000..7071ae9 --- /dev/null +++ b/src/tasks/simulator.rs @@ -0,0 +1,339 @@ +use super::bundler::Bundle; +use crate::tasks::block::InProgressBlock; +use alloy::consensus::TxEnvelope; +use alloy::primitives::U256; +use eyre::Result; +use revm::{db::CacheDB, DatabaseRef}; +use std::{convert::Infallible, sync::Arc}; +use tokio::{select, sync::mpsc::UnboundedReceiver, task::JoinSet}; +use trevm::{ + self, + db::sync::{Child, ConcurrentState, ConcurrentStateInfo}, + revm::{ + primitives::{EVMError, ResultAndState}, + Database, DatabaseCommit, EvmBuilder, + }, + BlockDriver, DbConnect, EvmFactory, NoopBlock, NoopCfg, TrevmBuilder, Tx, +}; +use zenith_types::ZenithEthBundle; + +pub struct Best { + pub tx: Arc, + pub result: ResultAndState, + pub score: Score, +} + +/// Binds a database and a simulation extension together +#[derive(Clone)] +pub struct SimulatorFactory { + pub db: Db, + pub ext: Ext, +} + +impl<'a, Db, Ext> SimulatorFactory +where + Ext: Send + Sync + Clone + 'static, + Db: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static, +{ + pub fn new(db: Db, ext: Ext) -> Self { + Self { db, ext } + } + + /// Spawns a trevm simulator. + /// Spawn does not guarantee that a thread is finished before the deadline. + /// This is intentional, so that it can maximize simulation time before the deadline. + /// This function will always return whatever the latest finished best was. + pub fn spawn( + self, + mut inbound_tx: UnboundedReceiver>, + _inbound_bundle: UnboundedReceiver>>, + evaluator: Arc, + deadline: tokio::time::Instant, + ) -> tokio::task::JoinHandle>> + where + T: Tx, + F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, + { + let jh = tokio::spawn(async move { + // Spawn a join set to track all simulation threads + let mut join_set = JoinSet::new(); + + let mut best: Option> = None; + + let sleep = tokio::time::sleep_until(deadline); + tokio::pin!(sleep); + + loop { + select! { + _ = &mut sleep => break, + // Handle incoming + tx = inbound_tx.recv() => { + if let Some(inbound_tx) = tx { + // Setup the simulation environment + let sim = self.clone(); + let eval = evaluator.clone(); + let mut parent_db = Arc::new(sim.connect().unwrap()); + + // Kick off the work in a new thread + join_set.spawn(async move { + let result = sim.simulate_tx(inbound_tx, eval, parent_db.child()); + if let Some((best, db)) = result { + if let Ok(()) = parent_db.can_merge(&db) { + if let Ok(()) = parent_db.merge_child(db) { + tracing::info!("merged db"); + } + } + Some(best) + } else { + None + } + }); + } + } + Some(Ok(Some(candidate))) = join_set.join_next() => { + tracing::debug!(score = ?candidate.score, "job finished"); + if candidate.score > best.as_ref().map(|b| b.score).unwrap_or_default() { + tracing::info!(score = ?candidate.score, "new best candidate found"); + best = Some(candidate); + } + } + else => break, + } + } + + best + }); + + jh + } + + /// Simulates an inbound tx and applies its state if it's successfully simualted + pub fn simulate_tx( + self, + tx: Arc, + evaluator: Arc, + child_db: Child, + ) -> Option<(Best, Child)> + where + F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, + Db: Database + DatabaseRef + DatabaseCommit + Send + Sync + Clone + 'static, + { + let trevm_instance = EvmBuilder::default().with_db(child_db).build_trevm(); + + let result = trevm_instance + .fill_cfg(&NoopCfg) + .fill_block(&NoopBlock) + .fill_tx(tx.as_ref()) // Use as_ref() to get &SimTxEnvelope from Arc + .run(); + + match result { + Ok(t) => { + let hash = tx.0.tx_hash(); + tracing::info!(hash = ?hash, "simulated transaction"); + + let res = t.result_and_state(); + let score = evaluator(res); + tracing::debug!(score = ?score, "evaluated transaction score"); + + let result_and_state = res.clone(); + tracing::debug!(gas_used = result_and_state.result.gas_used(), "gas consumed"); + + // accept and return the updated_db with the execution score + let t = t.accept(); + + // take the db and return it wth the best + let db = t.1.into_db(); + + Some((Best { tx, result: result_and_state, score }, db)) + } + Err(e) => { + tracing::error!("Failed to run transaction: {:?}", e); + None + } + } + } + + /// Adds a given bundle to a given block, creates a [`trevm`] instance, + /// runs the [`BlockDriver`] with that instance, and then returns + /// the [`Child`] of the updated state. + pub fn apply_bundle( + &self, + mut block: InProgressBlock, + bundle: Arc, + child_db: Child, + ) -> Result> { + let trevm = EvmBuilder::default() + .with_db(child_db) + .build_trevm() + .fill_cfg(&NoopCfg) + .fill_block(&NoopBlock); + + block.ingest_bundle(Bundle { + id: bundle.replacement_uuid().unwrap_or_default().to_string(), + bundle: ZenithEthBundle { + bundle: bundle.bundle.clone(), + host_fills: bundle.host_fills.clone(), + }, + }); + + let result = block.run_txns(trevm); + match result { + Ok(t) => { + let db = t.into_db(); + Ok(db) + } + Err(t_error) => { + tracing::error!(err = ?t_error, "Failed to run block"); + eyre::bail!("Failed to run block"); + } + } + } + + /// Simulates an inbound bundle and applies its state if it's successfully simulated + pub fn simulate_bundle( + &self, + _bundle: Arc>, + _evaluator: Arc, + _trevm_instance: trevm::EvmNeedsCfg<'_, (), ConcurrentState>>>, + ) -> Option>> + where + T: Tx + Send + Sync + 'static, + F: Fn(&ResultAndState) -> U256 + Send + Sync + 'static, + { + todo!("implement bundle handling") + } +} + +/// Wraps a Db into an EvmFactory compatible [`Database`] +impl<'a, Db, Ext> DbConnect<'a> for SimulatorFactory +where + Db: Database + DatabaseRef + DatabaseCommit + Sync + Send + Clone + 'static, + Ext: Sync + Clone, +{ + type Database = ConcurrentState; + type Error = Infallible; + + fn connect(&'a self) -> Result { + let inner = ConcurrentState::new(self.db.clone(), ConcurrentStateInfo::default()); + Ok(inner) + } +} + +/// Makes a SimulatorFactory capable of creating and configuring trevm instances +impl<'a, Db, Ext> EvmFactory<'a> for SimulatorFactory +where + Db: Database + DatabaseRef + DatabaseCommit + Sync + Send + Clone + 'static, + Ext: Sync + Clone, +{ + type Ext = (); + + /// Create makes a [`ConcurrentState`] database by calling connect + fn create(&'a self) -> Result, Self::Error> { + let db = self.connect()?; + let trevm = trevm::revm::EvmBuilder::default().with_db(db).build_trevm(); + Ok(trevm) + } +} + +/// A trait for extracting transactions from +pub trait BlockExtractor: Send + Sync + 'static { + type Driver: BlockDriver: core::error::Error>; + + /// Instantiate an configure a new [`trevm`] instance. + fn trevm(&self, db: Db) -> trevm::EvmNeedsBlock<'static, Ext, Db>; + + /// Extracts transactions from the source. + /// + /// Extraction is infallible. Worst case it should return a no-op driver. + fn extract(&mut self, bytes: &[u8]) -> Self::Driver; +} + +#[derive(Clone)] +pub struct SimTxEnvelope(pub TxEnvelope); + +impl From<&[u8]> for SimTxEnvelope { + fn from(bytes: &[u8]) -> Self { + let tx: TxEnvelope = alloy_rlp::Decodable::decode(&mut bytes.as_ref()).unwrap(); + SimTxEnvelope(tx) + } +} + +impl From for Vec { + fn from(tx: SimTxEnvelope) -> Vec { + alloy_rlp::encode(tx.0) + } +} + +impl Tx for SimTxEnvelope { + fn fill_tx_env(&self, tx_env: &mut revm::primitives::TxEnv) { + tracing::info!("fillng tx env {:?}", tx_env); + let revm::primitives::TxEnv { .. } = tx_env; + } +} + +impl BlockDriver for InProgressBlock { + type Block = NoopBlock; + + type Error = Error; + + fn block(&self) -> &Self::Block { + &NoopBlock + } + + /// Loops through the transactions in the block and runs them, accepting the state at the end + /// if it was successful and returning and erroring out otherwise. + fn run_txns<'a, Db: Database + DatabaseCommit>( + &mut self, + mut trevm: trevm::EvmNeedsTx<'a, Ext, Db>, + ) -> trevm::RunTxResult<'a, Ext, Db, Self> { + for tx in self.transactions().iter() { + if tx.recover_signer().is_ok() { + let sim_tx = SimTxEnvelope(tx.clone()); + let t = match trevm.run_tx(&sim_tx) { + Ok(t) => t, + Err(e) => { + if e.is_transaction_error() { + return Ok(e.discard_error()); + } else { + return Err(e.err_into()); + } + } + }; + (_, trevm) = t.accept(); + } + } + Ok(trevm) + } + + fn post_block( + &mut self, + _trevm: &trevm::EvmNeedsBlock<'_, Ext, Db>, + ) -> Result<(), Self::Error> { + Ok(()) + } +} + +pub struct Error(EVMError); + +impl From> for Error +where + Db: Database, +{ + fn from(e: EVMError) -> Self { + Self(e) + } +} + +impl core::error::Error for Error {} + +impl core::fmt::Debug for Error { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(f, "Error") + } +} + +impl core::fmt::Display for Error { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(f, "Error") + } +} diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 8adf084..ca7d920 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -181,6 +181,7 @@ impl SubmitTask { self.send_transaction(resp, tx).await } + /// Send the transaction to the network async fn send_transaction( &self, resp: &SignResponse, diff --git a/tests/simulator_test.rs b/tests/simulator_test.rs new file mode 100644 index 0000000..6bf6e30 --- /dev/null +++ b/tests/simulator_test.rs @@ -0,0 +1,111 @@ +use alloy::consensus::{SignableTransaction as _, TxEip1559, TxEnvelope}; +use alloy::eips::BlockId; +use alloy::primitives::U256; +use alloy::providers::{Provider, ProviderBuilder, RootProvider}; +use alloy::rpc::client::RpcClient; +use alloy::signers::local::PrivateKeySigner; +use alloy::signers::SignerSync as _; +use alloy::transports::http::{Client, Http}; +use builder::tasks::simulator::{SimTxEnvelope, SimulatorFactory}; +use revm::db::{AlloyDB, CacheDB}; +use revm::primitives::{Address, TxKind}; +use std::str::FromStr; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::time::{Duration, Instant}; +use trevm::revm::primitives::{Account, ExecutionResult, ResultAndState}; + +#[tokio::test(flavor = "multi_thread")] +async fn test_spawn() { + // Create test identity + let test_wallet = PrivateKeySigner::random(); + + // Plumb the transaction pipeline + let (tx_sender, tx_receiver) = mpsc::unbounded_channel::>(); + let (_bundle_sender, bundle_receiver) = mpsc::unbounded_channel::>>(); + let deadline = Instant::now() + Duration::from_secs(2); + + // Create a provider + let root_provider = + new_rpc_provider("https://sepolia.gateway.tenderly.co".to_string()).unwrap(); + let latest = root_provider.get_block_number().await.unwrap(); + + // Create an alloyDB from the provider at the latest height + let alloy_db = AlloyDB::new(Arc::new(root_provider.clone()), BlockId::from(latest)).unwrap(); + let db = CacheDB::new(Arc::new(alloy_db)); + + // Define trevm extension, if any + let ext = (); + + // Define the evaluator function + let evaluator = Arc::new(test_evaluator); + + // Create a simulation factory with the provided DB + let sim_factory = SimulatorFactory::new(db, ext); + let handle = + sim_factory.spawn::(tx_receiver, bundle_receiver, evaluator, deadline); + + // Send some transactions + for _ in 0..5 { + let test_tx = Arc::new(SimTxEnvelope(new_test_tx(&test_wallet).unwrap())); + tx_sender.send(test_tx).unwrap(); + } + + // Wait for the handle to complete + let best = handle.await.unwrap(); + + // Check the result + assert!(best.is_some()); + let result = best.unwrap(); + assert_ne!(result.score, U256::from(0)); + + println!("Best: {:?}", result.score); +} + +/// An example of a simple evaluator function for use in testing +fn test_evaluator(state: &ResultAndState) -> U256 { + // log the transaction results + match &state.result { + ExecutionResult::Success { .. } => println!("Execution was successful."), + ExecutionResult::Revert { .. } => println!("Execution reverted."), + ExecutionResult::Halt { .. } => println!("Execution halted."), + } + + // return the target account balance + let target_addr = Address::from_str("0x0000000000000000000000000000000000000000").unwrap(); + let default_account = Account::default(); + let target_account = state.state.get(&target_addr).unwrap_or(&default_account); + tracing::info!(balance = ?target_account.info.balance, "target account balance"); + + target_account.info.balance +} + +// Returns a new signed test transaction with default values +fn new_test_tx(wallet: &PrivateKeySigner) -> eyre::Result { + let tx = TxEip1559 { + chain_id: 17001, + nonce: 1, + gas_limit: 50000, + to: TxKind::Call(Address::from_str("0x0000000000000000000000000000000000000000").unwrap()), + value: U256::from(1_f64), + input: alloy::primitives::bytes!(""), + ..Default::default() + }; + let signature = wallet.sign_hash_sync(&tx.signature_hash())?; + Ok(TxEnvelope::Eip1559(tx.into_signed(signature))) +} + +/// Returns a new RPC provider from a given URL +pub fn new_rpc_provider(url: String) -> eyre::Result>> { + let url = url.parse().unwrap(); + let root_provider = ProviderBuilder::new().on_client(RpcClient::new_http(url)); + Ok(root_provider) +} + +/// Returns a provider based on a local Anvil instance that it creates +pub fn new_anvil_provider() -> eyre::Result>> { + let anvil = + alloy::node_bindings::Anvil::new().block_time(1).chain_id(17003).try_spawn().unwrap(); + let root_provider = ProviderBuilder::new().on_http(anvil.endpoint_url()); + Ok(root_provider) +}