diff --git a/bin/rundler/src/cli/builder.rs b/bin/rundler/src/cli/builder.rs index c3fcab024..30461f2f1 100644 --- a/bin/rundler/src/cli/builder.rs +++ b/bin/rundler/src/cli/builder.rs @@ -442,8 +442,6 @@ pub(crate) struct EntryPointBuilderConfigs { pub(crate) struct EntryPointBuilderConfig { // Entry point address address: Address, - // Index offset for builders - index_offset: u64, // Builder configs builders: Vec, } @@ -453,8 +451,12 @@ pub(crate) struct EntryPointBuilderConfig { pub(crate) struct BuilderConfig { // Number of builders using this config count: u64, + // Builder index offset - defaults to 0 + index_offset: Option, // Submitter proxy to use for builders proxy: Option
, + // Optional filter to apply to the builders + filter_id: Option, } impl EntryPointBuilderConfigs { @@ -471,14 +473,13 @@ impl EntryPointBuilderConfigs { impl EntryPointBuilderConfig { pub fn builders(&self) -> Vec { - let mut index = self.index_offset; let mut builders = vec![]; for builder in &self.builders { builders.extend((0..builder.count).map(|i| BuilderSettings { - index: index + i, + index: builder.index_offset.unwrap_or(0) + i, submitter_proxy: builder.proxy, + filter_id: builder.filter_id.clone(), })); - index += builder.count; } builders } @@ -489,6 +490,7 @@ fn builder_settings_from_cli(index_offset: u64, count: u64) -> Vec { builder_index: u64, + builder_tag: String, settings: Settings, ep_providers: EP, bundle_providers: BP, event_sender: broadcast::Sender>, condition_not_met_notified: bool, metric: BuilderProposerMetric, + filter_id: Option, } #[derive(Debug)] @@ -343,19 +345,23 @@ where #[allow(clippy::too_many_arguments)] pub(crate) fn new( builder_index: u64, + builder_tag: String, ep_providers: EP, bundle_providers: BP, settings: Settings, event_sender: broadcast::Sender>, + filter_id: Option, ) -> Self { Self { builder_index, + builder_tag, ep_providers, bundle_providers, settings, event_sender, condition_not_met_notified: false, metric: BuilderProposerMetric::default(), + filter_id, } } @@ -380,7 +386,7 @@ where || op.uo.max_priority_fee_per_gas() < required_op_fees.max_priority_fee_per_gas { self.emit(BuilderEvent::skipped_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&op.uo), SkipReason::InsufficientFees { required_fees: required_op_fees, @@ -433,7 +439,7 @@ where "Failed to calculate required pre-verification gas for op: {e:?}, skipping" ); self.emit(BuilderEvent::skipped_op( - self.builder_index, + self.builder_tag.clone(), op_hash, SkipReason::Other { reason: Arc::new(format!( @@ -454,7 +460,7 @@ where if op.uo.pre_verification_gas() < required_pvg { self.emit(BuilderEvent::skipped_op( - self.builder_index, + self.builder_tag.clone(), op_hash, SkipReason::InsufficientPreVerificationGas { base_fee, @@ -506,7 +512,7 @@ where entity_infos: _, } => { self.emit(BuilderEvent::skipped_op( - self.builder_index, + self.builder_tag.clone(), op_hash, SkipReason::Other { reason: Arc::new(format!("Failed to simulate op: {error:?}, skipping")), @@ -541,7 +547,7 @@ where Ok(simulation) => simulation, Err(error) => { self.emit(BuilderEvent::rejected_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&op), OpRejectionReason::FailedRevalidation { error: error.clone(), @@ -566,7 +572,7 @@ where .contains(Timestamp::now(), TIME_RANGE_BUFFER) { self.emit(BuilderEvent::skipped_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&op), SkipReason::InvalidTimeRange { valid_range: simulation.valid_time_range, @@ -583,7 +589,7 @@ where >= self.settings.chain_spec.max_transaction_size_bytes { self.emit(BuilderEvent::skipped_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&op), SkipReason::TransactionSizeLimit, )); @@ -595,7 +601,7 @@ where gas_spent + op.computation_gas_limit(&self.settings.chain_spec, None); if required_gas > self.settings.max_bundle_gas { self.emit(BuilderEvent::skipped_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&op), SkipReason::GasLimit, )); @@ -606,14 +612,14 @@ where let mut new_expected_storage = context.expected_storage.clone(); if let Err(e) = new_expected_storage.merge(&simulation.expected_storage) { self.emit(BuilderEvent::skipped_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&op), SkipReason::ExpectedStorageConflict(e.to_string()), )); continue; } else if new_expected_storage.num_slots() > self.settings.max_expected_storage_slots { self.emit(BuilderEvent::skipped_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&op), SkipReason::ExpectedStorageLimit, )); @@ -630,7 +636,7 @@ where // batch, but don't reject them (remove them from pool). info!("Excluding op from {:?} because it accessed the address of another sender in the bundle.", op.sender()); self.emit(BuilderEvent::skipped_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&op), SkipReason::AccessedOtherSender { other_sender }, )); @@ -697,7 +703,7 @@ where for (index, reason) in to_reject { self.emit(BuilderEvent::rejected_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&context.get_op_at(index)?.op), OpRejectionReason::ConditionNotMet(reason), )); @@ -839,7 +845,7 @@ where HandleOpsOut::Success => Ok(Some(gas_limit)), HandleOpsOut::FailedOp(index, message) => { self.emit(BuilderEvent::rejected_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&context.get_op_at(index)?.op), OpRejectionReason::FailedInBundle { message: Arc::new(message.clone()), @@ -876,6 +882,7 @@ where *self.ep_providers.entry_point().address(), self.settings.max_bundle_size, self.builder_index, + self.filter_id.clone(), ) .await .context("should get ops from pool")? @@ -1041,7 +1048,7 @@ where // iterate in reverse so that we can remove ops without affecting the index of the next op to remove for index in to_remove.into_iter().rev() { self.emit(BuilderEvent::rejected_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&context.get_op_at(index)?.op), OpRejectionReason::FailedInBundle { message: Arc::new("post op reverted leading to entry point revert".to_owned()), @@ -1143,7 +1150,7 @@ where .is_none() { self.emit(BuilderEvent::skipped_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&op.uo), SkipReason::UnsupportedAggregator(agg), )); @@ -1156,7 +1163,7 @@ where let gas = op.uo.computation_gas_limit(&self.settings.chain_spec, None); if gas_left < gas { self.emit(BuilderEvent::skipped_op( - self.builder_index, + self.builder_tag.clone(), self.op_hash(&op.uo), SkipReason::GasLimit, )); @@ -2882,13 +2889,14 @@ mod tests { entity_infos: EntityInfos::default(), aggregator: None, da_gas_data: Default::default(), + filter_id: None, }) .collect(); let mut pool_client = MockPool::new(); pool_client .expect_get_ops() - .returning(move |_, _, _| Ok(ops.clone())); + .returning(move |_, _, _, _| Ok(ops.clone())); let simulations_by_op: HashMap<_, _> = mock_ops .into_iter() @@ -2994,6 +3002,7 @@ mod tests { let mut proposer = BundleProposerImpl::new( 0, + "test".to_string(), ProvidersWithEntryPoint::new( Arc::new(provider), Arc::new(entry_point), @@ -3012,6 +3021,7 @@ mod tests { max_expected_storage_slots: MAX_EXPECTED_STORAGE_SLOTS, }, event_sender, + None, ); if notify_condition_not_met { diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index 34e19d00f..5475f4f7f 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -62,7 +62,7 @@ pub(crate) struct Settings { #[derive(Debug)] pub(crate) struct BundleSenderImpl { - builder_index: u64, + builder_tag: String, bundle_action_receiver: Option>, chain_spec: ChainSpec, sender_eoa: Address, @@ -143,7 +143,7 @@ where /// Loops forever, attempting to form and send a bundle on each new block, /// then waiting for one bundle to be mined or dropped before forming the /// next one. - #[instrument(skip_all, fields(entry_point = self.entry_point.address().to_string(), builder_index = self.builder_index))] + #[instrument(skip_all, fields(entry_point = self.entry_point.address().to_string(), tag = self.builder_tag))] async fn send_bundles_in_loop(mut self, task_spawner: TS) { // trigger for sending bundles let sender_trigger = BundleSenderTrigger::new( @@ -179,7 +179,7 @@ where { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - builder_index: u64, + builder_tag: String, bundle_action_receiver: mpsc::Receiver, chain_spec: ChainSpec, sender_eoa: Address, @@ -192,7 +192,7 @@ where event_sender: broadcast::Sender>, ) -> Self { Self { - builder_index, + builder_tag: builder_tag.clone(), bundle_action_receiver: Some(bundle_action_receiver), chain_spec, sender_eoa, @@ -204,7 +204,7 @@ where event_sender, metrics: BuilderMetric::new_with_labels(&[ ("entry_point", entry_point.address().to_string()), - ("builder_index", builder_index.to_string()), + ("builder_tag", builder_tag), ]), entry_point, _uo_type: PhantomData, @@ -371,7 +371,7 @@ where } self.emit(BuilderEvent::transaction_mined( - self.builder_index, + self.builder_tag.clone(), tx_hash, nonce, block_number, @@ -381,7 +381,7 @@ where TrackerUpdate::LatestTxDropped { nonce } => { info!("Latest transaction dropped, starting new bundle attempt"); self.emit(BuilderEvent::latest_transaction_dropped( - self.builder_index, + self.builder_tag.clone(), nonce, )); self.metrics.bundle_txns_dropped.increment(1); @@ -391,7 +391,7 @@ where TrackerUpdate::NonceUsedForOtherTx { nonce } => { info!("Nonce used externally, starting new bundle attempt"); self.emit(BuilderEvent::nonce_used_for_other_transaction( - self.builder_index, + self.builder_tag.clone(), nonce, )); self.metrics.bundle_txns_nonce_used.increment(1); @@ -578,7 +578,7 @@ where let Some(bundle_tx) = self.get_bundle_tx(nonce, bundle).await? else { self.emit(BuilderEvent::formed_bundle( - self.builder_index, + self.builder_tag.clone(), None, nonce, fee_increase_count, @@ -602,7 +602,7 @@ where match send_result { Ok(tx_hash) => { self.emit(BuilderEvent::formed_bundle( - self.builder_index, + self.builder_tag.clone(), Some(BundleTxDetails { tx_hash, tx, @@ -1791,7 +1791,7 @@ mod tests { MockPool, > { BundleSenderImpl::new( - 0, + "any:0".to_string(), mpsc::channel(1000).1, ChainSpec::default(), Address::default(), diff --git a/crates/builder/src/emit.rs b/crates/builder/src/emit.rs index 9ab4933e3..ae24a0050 100644 --- a/crates/builder/src/emit.rs +++ b/crates/builder/src/emit.rs @@ -22,29 +22,26 @@ use rundler_utils::strs; /// Builder event #[derive(Clone, Debug)] pub struct BuilderEvent { - /// Builder index that emitted the event - pub builder_index: u64, + /// Builder tag that emitted the event + pub tag: String, /// Event kind pub kind: BuilderEventKind, } impl BuilderEvent { - pub(crate) fn new(builder_index: u64, kind: BuilderEventKind) -> Self { - Self { - builder_index, - kind, - } + pub(crate) fn new(tag: String, kind: BuilderEventKind) -> Self { + Self { tag, kind } } pub(crate) fn formed_bundle( - builder_index: u64, + tag: String, tx_details: Option, nonce: u64, fee_increase_count: u64, required_fees: Option, ) -> Self { Self::new( - builder_index, + tag, BuilderEventKind::FormedBundle { tx_details, nonce, @@ -55,13 +52,13 @@ impl BuilderEvent { } pub(crate) fn transaction_mined( - builder_index: u64, + tag: String, tx_hash: B256, nonce: u64, block_number: u64, ) -> Self { Self::new( - builder_index, + tag, BuilderEventKind::TransactionMined { tx_hash, nonce, @@ -70,36 +67,23 @@ impl BuilderEvent { ) } - pub(crate) fn latest_transaction_dropped(builder_index: u64, nonce: u64) -> Self { - Self::new( - builder_index, - BuilderEventKind::LatestTransactionDropped { nonce }, - ) + pub(crate) fn latest_transaction_dropped(tag: String, nonce: u64) -> Self { + Self::new(tag, BuilderEventKind::LatestTransactionDropped { nonce }) } - pub(crate) fn nonce_used_for_other_transaction(builder_index: u64, nonce: u64) -> Self { + pub(crate) fn nonce_used_for_other_transaction(tag: String, nonce: u64) -> Self { Self::new( - builder_index, + tag, BuilderEventKind::NonceUsedForOtherTransaction { nonce }, ) } - pub(crate) fn skipped_op(builder_index: u64, op_hash: B256, reason: SkipReason) -> Self { - Self::new( - builder_index, - BuilderEventKind::SkippedOp { op_hash, reason }, - ) + pub(crate) fn skipped_op(tag: String, op_hash: B256, reason: SkipReason) -> Self { + Self::new(tag, BuilderEventKind::SkippedOp { op_hash, reason }) } - pub(crate) fn rejected_op( - builder_index: u64, - op_hash: B256, - reason: OpRejectionReason, - ) -> Self { - Self::new( - builder_index, - BuilderEventKind::RejectedOp { op_hash, reason }, - ) + pub(crate) fn rejected_op(tag: String, op_hash: B256, reason: OpRejectionReason) -> Self { + Self::new(tag, BuilderEventKind::RejectedOp { op_hash, reason }) } } @@ -245,7 +229,7 @@ impl Display for BuilderEvent { f, concat!( "Bundle transaction sent!", - " Builder index: {:?}", + " Builder tag: {}", " Transaction hash: {:?}", " Nonce: {}", " Fee increases: {}", @@ -253,7 +237,7 @@ impl Display for BuilderEvent { " Required maxPriorityFeePerGas: {}", " Op hashes: {}", ), - self.builder_index, + self.tag, tx_details.tx_hash, nonce, fee_increase_count, @@ -266,13 +250,13 @@ impl Display for BuilderEvent { f, concat!( "Bundle was empty.", - " Builder index: {:?}", + " Builder tag: {}", " Nonce: {}", " Fee increases: {}", " Required maxFeePerGas: {}", " Required maxPriorityFeePerGas: {}", ), - self.builder_index, + self.tag, nonce, fee_increase_count, required_max_fee_per_gas, @@ -288,28 +272,40 @@ impl Display for BuilderEvent { f, concat!( "Transaction mined!", - " Builder index: {:?}", + " Builder tag: {}", " Transaction hash: {:?}", " Nonce: {}", " Block number: {}", ), - self.builder_index, tx_hash, nonce, block_number, + self.tag, tx_hash, nonce, block_number, ), BuilderEventKind::LatestTransactionDropped { nonce } => { write!( f, - "Latest transaction dropped. Higher fees are needed. Builder index: {:?} Nonce: {nonce}", - self.builder_index + "Latest transaction dropped. Higher fees are needed. Builder tag: {} Nonce: {nonce}", + self.tag ) } BuilderEventKind::NonceUsedForOtherTransaction { nonce } => { - write!(f, "Transaction failed because nonce was used by another transaction outside of this Rundler. Builder index: {:?} Nonce: {nonce}", self.builder_index) + write!( + f, + "Transaction failed because nonce was used by another transaction outside of this Rundler. Builder tag: {} Nonce: {nonce}", + self.tag + ) } BuilderEventKind::SkippedOp { op_hash, reason } => { - write!(f, "Op skipped in bundle (but remains in pool). Builder index: {:?} Op hash: {op_hash:?} Reason: {reason:?}", self.builder_index) + write!( + f, + "Op skipped in bundle (but remains in pool). Builder tag: {} Op hash: {op_hash:?} Reason: {reason:?}", + self.tag + ) } BuilderEventKind::RejectedOp { op_hash, reason } => { - write!(f, "Op rejected from bundle and removed from pool. Builder index: {:?} Op hash: {op_hash:?} Reason: {reason:?}", self.builder_index) + write!( + f, + "Op rejected from bundle and removed from pool. Builder tag: {} Op hash: {op_hash:?} Reason: {reason:?}", + self.tag + ) } } } diff --git a/crates/builder/src/task.rs b/crates/builder/src/task.rs index 83a63f916..37d04dc2f 100644 --- a/crates/builder/src/task.rs +++ b/crates/builder/src/task.rs @@ -102,6 +102,19 @@ pub struct BuilderSettings { pub index: u64, /// Optional submitter proxy to use for this builder pub submitter_proxy: Option
, + /// Optional filter id to apply to this builder + pub filter_id: Option, +} + +impl BuilderSettings { + /// Unique string tag for this builder + pub fn tag(&self) -> String { + format!( + "{}:{}", + self.filter_id.as_ref().map_or("any", |v| v), + self.index + ) + } } /// Builder settings for an entrypoint @@ -402,14 +415,16 @@ where let proposer = BundleProposerImpl::new( builder_settings.index, + builder_settings.tag(), ep_providers.clone(), BundleProposerProviders::new(self.pool.clone(), simulator, fee_estimator), proposer_settings, self.event_sender.clone(), + builder_settings.filter_id.clone(), ); let builder = BundleSenderImpl::new( - builder_settings.index, + builder_settings.tag(), send_bundle_rx, self.args.chain_spec.clone(), sender_eoa, diff --git a/crates/pool/proto/op_pool/op_pool.proto b/crates/pool/proto/op_pool/op_pool.proto index bcad6ad73..fb1631f88 100644 --- a/crates/pool/proto/op_pool/op_pool.proto +++ b/crates/pool/proto/op_pool/op_pool.proto @@ -178,6 +178,8 @@ message MempoolOp { bytes entry_point = 8; // The DA gas data for the UO DaGasUoData da_gas_data = 9; + // The filter ID to apply to the UserOperation + string filter_id = 10; } // Data associated with a user operation for DA gas calculations @@ -290,6 +292,8 @@ message GetOpsRequest { uint64 max_ops = 2; // The mempool shard num retrieve UserOperations from uint64 shard_index = 3; + // The filter ID to apply to the UserOperations + string filter_id = 4; } message GetOpsResponse { oneof result { diff --git a/crates/pool/src/mempool/mod.rs b/crates/pool/src/mempool/mod.rs index 38f840c12..ad4aba6f9 100644 --- a/crates/pool/src/mempool/mod.rs +++ b/crates/pool/src/mempool/mod.rs @@ -88,6 +88,7 @@ pub trait Mempool: Send + Sync { &self, max: usize, shard_index: u64, + filter_id: Option, ) -> MempoolResult>>; /// Returns the all operations from the pool up to a max size @@ -241,6 +242,7 @@ mod tests { }), }, da_gas_data: Default::default(), + filter_id: None, }; let entities = po.entities().collect::>(); diff --git a/crates/pool/src/mempool/paymaster.rs b/crates/pool/src/mempool/paymaster.rs index 2f4cc8d84..1d9f8340d 100644 --- a/crates/pool/src/mempool/paymaster.rs +++ b/crates/pool/src/mempool/paymaster.rs @@ -530,6 +530,7 @@ mod tests { account_is_staked: true, entity_infos: EntityInfos::default(), da_gas_data: rundler_types::da::DAGasUOData::Empty, + filter_id: None, } } diff --git a/crates/pool/src/mempool/pool.rs b/crates/pool/src/mempool/pool.rs index 3f09871d1..9a15ebebd 100644 --- a/crates/pool/src/mempool/pool.rs +++ b/crates/pool/src/mempool/pool.rs @@ -1595,6 +1595,7 @@ mod tests { sim_block_number: 0, account_is_staked: false, da_gas_data: Default::default(), + filter_id: None, } } diff --git a/crates/pool/src/mempool/uo_pool.rs b/crates/pool/src/mempool/uo_pool.rs index 6808aa017..4d342a746 100644 --- a/crates/pool/src/mempool/uo_pool.rs +++ b/crates/pool/src/mempool/uo_pool.rs @@ -23,7 +23,7 @@ use parking_lot::RwLock; use rundler_provider::{ DAGasOracleSync, EvmProvider, ProvidersWithEntryPointT, SimulationProvider, StateOverride, }; -use rundler_sim::{FeeUpdate, Prechecker, Simulator}; +use rundler_sim::{FeeUpdate, MempoolConfig, Prechecker, Simulator}; use rundler_types::{ pool::{ MempoolError, PaymasterMetadata, PoolOperation, Reputation, ReputationStatus, StakeStatus, @@ -60,6 +60,7 @@ pub(crate) struct UoPool { event_sender: broadcast::Sender>, ep_specific_metrics: UoPoolMetricsEPSpecific, metrics: UoPoolMetrics, + mempool_config: MempoolConfig, } struct UoPoolState { @@ -82,6 +83,7 @@ where event_sender: broadcast::Sender>, paymaster: PaymasterTracker, reputation: Arc, + mempool_config: MempoolConfig, ) -> Self { let ep = config.entry_point.to_string(); Self { @@ -104,6 +106,7 @@ where metrics: UoPoolMetrics::default(), ep_providers, pool_providers, + mempool_config, } } @@ -616,6 +619,7 @@ where )); } + let filter_id = self.mempool_config.match_filter(&op); let valid_time_range = sim_result.valid_time_range; let pool_op = PoolOperation { uo: op, @@ -628,6 +632,7 @@ where account_is_staked: sim_result.account_is_staked, entity_infos: sim_result.entity_infos, da_gas_data: precheck_ret.da_gas_data, + filter_id, }; // Check sender count in mempool. If sender has too many operations, must be staked @@ -816,6 +821,7 @@ where &self, max: usize, shard_index: u64, + filter_id: Option, ) -> MempoolResult>> { if shard_index >= self.config.num_shards { Err(anyhow::anyhow!("Invalid shard ID"))?; @@ -842,7 +848,7 @@ where senders.insert(op.uo.sender()) } else { true - } + } && filter_id == op.filter_id }) .take(max) .map(Into::into) @@ -1022,9 +1028,9 @@ mod tests { .add_operation(OperationOrigin::Local, op.op) .await .unwrap(); - check_ops(pool.best_operations(1, 0).unwrap(), uos); + check_ops(pool.best_operations(1, 0, None).unwrap(), uos); pool.remove_operations(&[hash]); - assert_eq!(pool.best_operations(1, 0).unwrap(), vec![]); + assert_eq!(pool.best_operations(1, 0, None).unwrap(), vec![]); } #[tokio::test] @@ -1045,9 +1051,9 @@ mod tests { .unwrap(); hashes.push(hash); } - check_ops(pool.best_operations(3, 0).unwrap(), uos); + check_ops(pool.best_operations(3, 0, None).unwrap(), uos); pool.remove_operations(&hashes); - assert_eq!(pool.best_operations(3, 0).unwrap(), vec![]); + assert_eq!(pool.best_operations(3, 0, None).unwrap(), vec![]); } #[tokio::test] @@ -1066,9 +1072,9 @@ mod tests { .await .unwrap(); } - check_ops(pool.best_operations(3, 0).unwrap(), uos); + check_ops(pool.best_operations(3, 0, None).unwrap(), uos); pool.clear_state(true, true, true); - assert_eq!(pool.best_operations(3, 0).unwrap(), vec![]); + assert_eq!(pool.best_operations(3, 0, None).unwrap(), vec![]); } #[tokio::test] @@ -1113,7 +1119,7 @@ mod tests { entrypoint, ) .await; - check_ops(pool.best_operations(3, 0).unwrap(), uos.clone()); + check_ops(pool.best_operations(3, 0, None).unwrap(), uos.clone()); pool.on_chain_update(&ChainUpdate { latest_block_number: 1, @@ -1146,7 +1152,7 @@ mod tests { }) .await; - check_ops(pool.best_operations(3, 0).unwrap(), uos[1..].to_vec()); + check_ops(pool.best_operations(3, 0, None).unwrap(), uos[1..].to_vec()); let paymaster_balance = pool.paymaster.paymaster_balance(paymaster).await.unwrap(); assert_eq!(paymaster_balance.confirmed_balance, U256::from(1110)); @@ -1197,7 +1203,7 @@ mod tests { let metadata = pool.paymaster.paymaster_balance(paymaster).await.unwrap(); assert_eq!(metadata.pending_balance, U256::from(850)); - check_ops(pool.best_operations(3, 0).unwrap(), uos.clone()); + check_ops(pool.best_operations(3, 0, None).unwrap(), uos.clone()); // mine the first op with actual gas cost of 10 pool.on_chain_update(&ChainUpdate { @@ -1232,7 +1238,7 @@ mod tests { .await; check_ops( - pool.best_operations(3, 0).unwrap(), + pool.best_operations(3, 0, None).unwrap(), uos.clone()[1..].to_vec(), ); @@ -1265,7 +1271,7 @@ mod tests { }) .await; - check_ops(pool.best_operations(3, 0).unwrap(), uos); + check_ops(pool.best_operations(3, 0, None).unwrap(), uos); let metadata = pool.paymaster.paymaster_balance(paymaster).await.unwrap(); assert_eq!(metadata.pending_balance, U256::from(840)); @@ -1279,7 +1285,7 @@ mod tests { create_op(Address::random(), 0, 1, None), ]) .await; - check_ops(pool.best_operations(3, 0).unwrap(), uos.clone()); + check_ops(pool.best_operations(3, 0, None).unwrap(), uos.clone()); pool.on_chain_update(&ChainUpdate { latest_block_number: 1, @@ -1302,7 +1308,7 @@ mod tests { }) .await; - check_ops(pool.best_operations(3, 0).unwrap(), uos); + check_ops(pool.best_operations(3, 0, None).unwrap(), uos); } #[tokio::test] @@ -1315,7 +1321,10 @@ mod tests { ]) .await; // staked, so include all ops - check_ops(pool.best_operations(3, 0).unwrap(), uos[0..2].to_vec()); + check_ops( + pool.best_operations(3, 0, None).unwrap(), + uos[0..2].to_vec(), + ); let rep = pool.dump_reputation(); assert_eq!(rep.len(), 1); @@ -1375,7 +1384,7 @@ mod tests { } check_ops( - pool.best_operations(4, 0).unwrap(), + pool.best_operations(4, 0, None).unwrap(), vec![ uos[0].clone(), uos[1].clone(), @@ -1425,7 +1434,7 @@ mod tests { .await .unwrap(); check_ops( - pool.best_operations(4, 0).unwrap(), + pool.best_operations(4, 0, None).unwrap(), vec![ uos[1].clone(), uos[2].clone(), @@ -1506,7 +1515,7 @@ mod tests { )) => {} _ => panic!("Expected InitCodeTooShort error"), } - assert_eq!(pool.best_operations(1, 0).unwrap(), vec![]); + assert_eq!(pool.best_operations(1, 0, None).unwrap(), vec![]); } #[tokio::test] @@ -1526,7 +1535,7 @@ mod tests { Err(MempoolError::SimulationViolation(SimulationViolation::DidNotRevert)) => {} _ => panic!("Expected DidNotRevert error"), } - assert_eq!(pool.best_operations(1, 0).unwrap(), vec![]); + assert_eq!(pool.best_operations(1, 0, None).unwrap(), vec![]); } #[tokio::test] @@ -1545,7 +1554,7 @@ mod tests { .unwrap_err(); assert!(matches!(err, MempoolError::OperationAlreadyKnown)); - check_ops(pool.best_operations(1, 0).unwrap(), vec![op.op]); + check_ops(pool.best_operations(1, 0, None).unwrap(), vec![op.op]); } #[tokio::test] @@ -1569,7 +1578,7 @@ mod tests { assert!(matches!(err, MempoolError::ReplacementUnderpriced(_, _))); - check_ops(pool.best_operations(1, 0).unwrap(), vec![op.op]); + check_ops(pool.best_operations(1, 0, None).unwrap(), vec![op.op]); } #[tokio::test] @@ -1625,7 +1634,7 @@ mod tests { .await .unwrap(); - check_ops(pool.best_operations(1, 0).unwrap(), vec![replacement]); + check_ops(pool.best_operations(1, 0, None).unwrap(), vec![replacement]); let paymaster_balance = pool.paymaster.paymaster_balance(paymaster).await.unwrap(); assert_eq!(paymaster_balance.pending_balance, U256::from(900)); @@ -1650,7 +1659,10 @@ mod tests { .await .unwrap(); - check_ops(pool.best_operations(1, 0).unwrap(), vec![op.op.clone()]); + check_ops( + pool.best_operations(1, 0, None).unwrap(), + vec![op.op.clone()], + ); pool.on_chain_update(&ChainUpdate { latest_block_timestamp: 11.into(), @@ -1658,7 +1670,7 @@ mod tests { }) .await; - check_ops(pool.best_operations(1, 0).unwrap(), vec![]); + check_ops(pool.best_operations(1, 0, None).unwrap(), vec![]); } #[tokio::test] @@ -1689,7 +1701,7 @@ mod tests { pool.remove_op_by_id(&op.op.id()), Err(MempoolError::OperationDropTooSoon(_, _, _)) )); - check_ops(pool.best_operations(1, 0).unwrap(), vec![op.op]); + check_ops(pool.best_operations(1, 0, None).unwrap(), vec![op.op]); } #[tokio::test] @@ -1709,7 +1721,7 @@ mod tests { }), Ok(None) )); - check_ops(pool.best_operations(1, 0).unwrap(), vec![op.op]); + check_ops(pool.best_operations(1, 0, None).unwrap(), vec![op.op]); } #[tokio::test] @@ -1730,7 +1742,7 @@ mod tests { .await; assert_eq!(pool.remove_op_by_id(&op.op.id()).unwrap().unwrap(), hash); - check_ops(pool.best_operations(1, 0).unwrap(), vec![]); + check_ops(pool.best_operations(1, 0, None).unwrap(), vec![]); } #[tokio::test] @@ -1804,7 +1816,7 @@ mod tests { ]) .await; // staked, so include all ops - check_ops(pool.best_operations(3, 0).unwrap(), uos); + check_ops(pool.best_operations(3, 0, None).unwrap(), uos); } #[tokio::test] @@ -1942,7 +1954,7 @@ mod tests { .await .unwrap(); - let best = pool.best_operations(10000, 0).unwrap(); + let best = pool.best_operations(10000, 0, None).unwrap(); assert_eq!(best.len(), 0); } @@ -2185,6 +2197,7 @@ mod tests { event_sender, paymaster, reputation, + MempoolConfig::default(), ) } diff --git a/crates/pool/src/server/local.rs b/crates/pool/src/server/local.rs index 5725dc51a..2afefd275 100644 --- a/crates/pool/src/server/local.rs +++ b/crates/pool/src/server/local.rs @@ -148,11 +148,13 @@ impl Pool for LocalPoolHandle { entry_point: Address, max_ops: u64, shard_index: u64, + filter_id: Option, ) -> PoolResult> { let req = ServerRequestKind::GetOps { entry_point, max_ops, shard_index, + filter_id, }; let resp = self.send(req).await?; match resp { @@ -388,10 +390,11 @@ impl LocalPoolServerRunner { entry_point: Address, max_ops: u64, shard_index: u64, + filter_id: Option, ) -> PoolResult> { let mempool = self.get_pool(entry_point)?; Ok(mempool - .best_operations(max_ops as usize, shard_index)? + .best_operations(max_ops as usize, shard_index, filter_id)? .iter() .map(|op| (**op).clone()) .collect()) @@ -608,8 +611,8 @@ impl LocalPoolServerRunner { entry_points: self.mempools.keys().copied().collect() }) }, - ServerRequestKind::GetOps { entry_point, max_ops, shard_index } => { - match self.get_ops(entry_point, max_ops, shard_index) { + ServerRequestKind::GetOps { entry_point, max_ops, shard_index, filter_id } => { + match self.get_ops(entry_point, max_ops, shard_index, filter_id) { Ok(ops) => Ok(ServerResponse::GetOps { ops }), Err(e) => Err(e), } @@ -711,6 +714,7 @@ enum ServerRequestKind { entry_point: Address, max_ops: u64, shard_index: u64, + filter_id: Option, }, GetOpByHash { hash: B256, diff --git a/crates/pool/src/server/remote/client.rs b/crates/pool/src/server/remote/client.rs index 5a8428711..c95648e8a 100644 --- a/crates/pool/src/server/remote/client.rs +++ b/crates/pool/src/server/remote/client.rs @@ -179,6 +179,7 @@ impl Pool for RemotePoolClient { entry_point: Address, max_ops: u64, shard_index: u64, + filter_id: Option, ) -> PoolResult> { let res = self .op_pool_client @@ -187,6 +188,7 @@ impl Pool for RemotePoolClient { entry_point: entry_point.to_vec(), max_ops, shard_index, + filter_id: filter_id.unwrap_or_default(), }) .await .map_err(anyhow::Error::from)? diff --git a/crates/pool/src/server/remote/protos.rs b/crates/pool/src/server/remote/protos.rs index 9e352c216..7d3d06ae2 100644 --- a/crates/pool/src/server/remote/protos.rs +++ b/crates/pool/src/server/remote/protos.rs @@ -429,6 +429,7 @@ impl From<&PoolOperation> for MempoolOp { sim_block_hash: op.sim_block_hash.to_proto_bytes(), account_is_staked: op.account_is_staked, da_gas_data: Some(DaGasUoData::from(&op.da_gas_data)), + filter_id: op.filter_id.clone().unwrap_or_default(), } } } @@ -494,6 +495,11 @@ impl TryUoFromProto for PoolOperation { let expected_code_hash = B256::from_slice(&op.expected_code_hash); let sim_block_hash = B256::from_slice(&op.sim_block_hash); + let filter_id = if op.filter_id.is_empty() { + None + } else { + Some(op.filter_id) + }; Ok(PoolOperation { uo, @@ -509,6 +515,7 @@ impl TryUoFromProto for PoolOperation { .da_gas_data .context("DA gas data should be set")? .try_into()?, + filter_id, }) } } diff --git a/crates/pool/src/server/remote/server.rs b/crates/pool/src/server/remote/server.rs index 5e64aade6..535111aa1 100644 --- a/crates/pool/src/server/remote/server.rs +++ b/crates/pool/src/server/remote/server.rs @@ -178,9 +178,15 @@ impl OpPool for OpPoolImpl { let req = request.into_inner(); let ep = self.get_entry_point(&req.entry_point)?; + let filter_id = if req.filter_id.is_empty() { + None + } else { + Some(req.filter_id) + }; + let resp = match self .local_pool - .get_ops(ep, req.max_ops, req.shard_index) + .get_ops(ep, req.max_ops, req.shard_index, filter_id) .await { Ok(ops) => GetOpsResponse { diff --git a/crates/pool/src/task.rs b/crates/pool/src/task.rs index e0cae0208..75480224c 100644 --- a/crates/pool/src/task.rs +++ b/crates/pool/src/task.rs @@ -15,7 +15,7 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use anyhow::{bail, Context}; use futures::FutureExt; -use rundler_provider::{Providers, ProvidersWithEntryPointT}; +use rundler_provider::{EntryPoint, Providers, ProvidersWithEntryPointT}; use rundler_sim::{ gas::{self, FeeEstimatorImpl}, simulation::{self, UnsafeSimulator}, @@ -326,6 +326,14 @@ where ), ); + // There should only be one mempool config per entry point + let mempool_config = pool_config + .mempool_channel_configs + .values() + .find(|c| c.entry_point() == *ep_providers.entry_point().address()) + .cloned() + .unwrap_or_default(); + let uo_pool = UoPool::new( pool_config.clone(), ep_providers, @@ -333,6 +341,7 @@ where event_sender, paymaster, reputation, + mempool_config, ); Ok(Arc::new(uo_pool)) diff --git a/crates/rpc/src/eth/api.rs b/crates/rpc/src/eth/api.rs index bd3b88321..23b4b0af4 100644 --- a/crates/rpc/src/eth/api.rs +++ b/crates/rpc/src/eth/api.rs @@ -218,6 +218,7 @@ mod tests { account_is_staked: false, entity_infos: EntityInfos::default(), da_gas_data: rundler_types::da::DAGasUOData::Empty, + filter_id: None, }; let mut pool = MockPool::default(); diff --git a/crates/sim/src/simulation/mempool.rs b/crates/sim/src/simulation/mempool.rs index 543533a85..7dca68027 100644 --- a/crates/sim/src/simulation/mempool.rs +++ b/crates/sim/src/simulation/mempool.rs @@ -14,7 +14,7 @@ use std::{collections::HashMap, str::FromStr}; use alloy_primitives::{Address, B256, U256}; -use rundler_types::{Entity, EntityType, Opcode}; +use rundler_types::{Entity, EntityType, Opcode, UserOperation, UserOperationVariant}; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; @@ -30,6 +30,8 @@ pub struct MempoolConfig { pub(crate) entry_point: Address, /// Allowlist to match violations against. pub(crate) allowlist: Vec, + /// Mempool filters to tag operations + filters: Option>, } impl MempoolConfig { @@ -37,11 +39,23 @@ impl MempoolConfig { pub fn entry_point(&self) -> Address { self.entry_point } + + /// Match an operation against the mempool filters, returning the first ID that matches, or None + pub fn match_filter(&self, operation: &UserOperationVariant) -> Option { + if let Some(filters) = &self.filters { + filters + .iter() + .find(|f| f.apply(operation)) + .map(|f| f.id.clone()) + } else { + None + } + } } /// A collection of mempool configurations keyed by their ID. #[derive(Debug, Clone, Deserialize, Default)] -pub struct MempoolConfigs(HashMap); +pub struct MempoolConfigs(pub HashMap); impl MempoolConfigs { /// Get the mempool configs for a specific entry point address @@ -91,6 +105,33 @@ impl AllowEntity { } } +/// A mempool filter +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +struct MempoolFilter { + /// The filter ID + id: String, + /// The filter to apply + filter: Filter, +} + +impl MempoolFilter { + /// Apply the filter to an operation + fn apply(&self, operation: &UserOperationVariant) -> bool { + match &self.filter { + Filter::Aggregator(address) => operation.aggregator().is_some_and(|a| a == *address), + } + } +} + +/// A mempool filter +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +enum Filter { + /// Filter operations by aggregator address + Aggregator(Address), +} + /// An allowlist rule. #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] #[serde(tag = "rule", rename_all = "camelCase")] @@ -501,6 +542,7 @@ mod tests { opcode: Opcode::GAS, }, )], + filters: None, }, ), ]); @@ -534,6 +576,7 @@ mod tests { opcode: Opcode::GAS, }, )], + filters: None, }, ), ]); @@ -579,6 +622,7 @@ mod tests { opcode: Opcode::GAS, }, )], + filters: None, }, ), ]); @@ -624,6 +668,7 @@ mod tests { }, ), ], + filters: None, }, ), ( @@ -653,6 +698,7 @@ mod tests { }, ), ], + filters: None, }, ), ]); diff --git a/crates/types/src/pool/traits.rs b/crates/types/src/pool/traits.rs index 5b9f125d7..a7eb83688 100644 --- a/crates/types/src/pool/traits.rs +++ b/crates/types/src/pool/traits.rs @@ -43,6 +43,7 @@ pub trait Pool: Send + Sync { entry_point: Address, max_ops: u64, shard_index: u64, + filter_id: Option, ) -> PoolResult>; /// Get an operation from the pool by hash diff --git a/crates/types/src/pool/types.rs b/crates/types/src/pool/types.rs index a78a89fc1..822eb2c29 100644 --- a/crates/types/src/pool/types.rs +++ b/crates/types/src/pool/types.rs @@ -122,6 +122,8 @@ pub struct PoolOperation { pub entity_infos: EntityInfos, /// The DA gas data for this operation pub da_gas_data: DAGasUOData, + /// The matched filter ID for this operation + pub filter_id: Option, } impl PoolOperation { diff --git a/docs/architecture/builder.md b/docs/architecture/builder.md index c54c60a45..2ae8c4d1c 100644 --- a/docs/architecture/builder.md +++ b/docs/architecture/builder.md @@ -157,14 +157,18 @@ Example: "entryPoints": [ { "address": "0x0000000071727De22E5E9d8BAf0edAc6f37da032", - "indexOffset": 0, "builders": [ { "count": 1, "proxy": "0xA7BD3A9Eb1238842DDB86458aF7dd2a9e166747A" + "filterId": "my-mempool-filter" } ] } ] } ``` + +#### Affinity + +Builders may specify a `filterId` in their custom configuration in order to only receive user operations that match a [mempool filter](./pool.md#filtering). Each mempool filter that is defined must have a matching builder - else the user operations matching that filter will not be mined. diff --git a/docs/architecture/pool.md b/docs/architecture/pool.md index cfe2d6991..2a0c37ad0 100644 --- a/docs/architecture/pool.md +++ b/docs/architecture/pool.md @@ -44,21 +44,11 @@ Upon receiving a chain update event, the `Pool` will update its internal state b The `Pool`'s cache depth is configurable, if a re-org occurs that is deeper than the cache, UOs will be unable to be returned to the pool. -## Mempool Sharding +## Mempool Config -The `Pool` supports a very simple sharding scheme in its `best_operations` interface. The `Pool` is configured with a `num_shards` config, and the caller of `best_operations` provides a `shard_index` parameter. - -User operations are assigned to a shard by their sender address modulo the number of shards. - -Callers can use this feature to ensure that multiple callers are returned a disjoint set of user operations by sender. Callers should ensure that there is exactly 1 caller assigned to each shard index, else risk bundle invalidations (> 1 assigned) or orphaned user operations (0 assigned). - -## Alternative Mempools (in preview) - -**NOTE: this feature presents known risks to the bundler, use at your own risk.** +Default operation of the mempool does not require an explicit configuration file. To use advanced mempool features like filtering and alternative mempool rules, users can specify a specific mempool configuration file using the `--mempool_config_path` CLI option. The schema for this JSON file can be found here: [MempoolConfigs](../../crates/sim/src/simulation/mempool.rs). -The `Pool` supports configuring [alternative mempools](https://eips.ethereum.org/EIPS/eip-4337#alternative-mempools) via a JSON configuration file. This feature is under development with the community and will be modified soon. - -See [here](https://hackmd.io/@dancoombs/BJYRz3h8n) for more details. +NOTE: only one mempool can be defined per entry point address at the moment. Example config: @@ -67,6 +57,14 @@ Example config: "0x0000000000000000000000000000000000000000000000000000000000000000": { "description": "Allow list", "chainIds": ["0x066eed"], + "filters": [ + { + "id": "bls-aggregator", + "filter": { + "aggregator": "0x9d3a231e887a495ce6c454e7a38ed5e734bd5de4" + } + } + ] "allowlist": [ { "description": "My Factory", @@ -78,6 +76,27 @@ Example config: } ``` -## P2P +### Sharding + +The `Pool` supports a very simple sharding scheme in its `best_operations` interface. The `Pool` is configured with a `num_shards` config, and the caller of `best_operations` provides a `shard_index` parameter. + +User operations are assigned to a shard by their sender address modulo the number of shards. + +Callers can use this feature to ensure that multiple callers are returned a disjoint set of user operations by sender. Callers should ensure that there is exactly 1 caller assigned to each shard index, else risk bundle invalidations (> 1 assigned) or orphaned user operations (0 assigned). + +### Filtering + +Advanced use cases may require mempool filtering. These filters MUST be used in conjunction with a [builder affinity](./builder.md#affinity) setting of the same filter ID, else the UOs will not be eligible for bundling. + +These filters are used to tag each user operation with a filter ID as they enter the mempool. Builders can then match on this filter ID to have limit the user operations they receive to only those matching the filter. + +Current filter implementations can be found in [MempoolFilter](../../crates/sim/src/simulation/mempool.rs). + +### Alternative Mempools (in preview) + +**NOTE: this feature presents known risks to the bundler, use at your own risk.** + +The `Pool` supports configuring [alternative mempools](https://eips.ethereum.org/EIPS/eip-4337#alternative-mempools) via a JSON configuration file. This feature is under development with the community and will be modified soon. + +See [here](https://hackmd.io/@dancoombs/BJYRz3h8n) for more details. -P2P mempool implementation is under development. See [here](https://github.com/eth-infinitism/bundler-spec/blob/main/p2p-specs/p2p-interface.md) for spec details.