Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add mempool filters and builder affinity #989

Open
wants to merge 1 commit into
base: danc/submit-proxy
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions bin/rundler/src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,6 @@ pub(crate) struct EntryPointBuilderConfigs {
pub(crate) struct EntryPointBuilderConfig {
// Entry point address
address: Address,
// Index offset for builders
index_offset: u64,
// Builder configs
builders: Vec<BuilderConfig>,
}
Expand All @@ -453,8 +451,12 @@ pub(crate) struct EntryPointBuilderConfig {
pub(crate) struct BuilderConfig {
// Number of builders using this config
count: u64,
// Builder index offset - defaults to 0
index_offset: Option<u64>,
// Submitter proxy to use for builders
proxy: Option<Address>,
// Optional filter to apply to the builders
filter_id: Option<String>,
}

impl EntryPointBuilderConfigs {
Expand All @@ -471,14 +473,13 @@ impl EntryPointBuilderConfigs {

impl EntryPointBuilderConfig {
pub fn builders(&self) -> Vec<BuilderSettings> {
let mut index = self.index_offset;
let mut builders = vec![];
for builder in &self.builders {
builders.extend((0..builder.count).map(|i| BuilderSettings {
index: index + i,
index: builder.index_offset.unwrap_or(0) + i,
submitter_proxy: builder.proxy,
filter_id: builder.filter_id.clone(),
}));
index += builder.count;
}
builders
}
Expand All @@ -489,6 +490,7 @@ fn builder_settings_from_cli(index_offset: u64, count: u64) -> Vec<BuilderSettin
.map(|i| BuilderSettings {
index: index_offset + i,
submitter_proxy: None,
filter_id: None,
})
.collect()
}
Expand Down
10 changes: 10 additions & 0 deletions bin/rundler/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,16 @@ async fn load_configs(

tracing::info!("Mempool configs: {:?}", mempool_configs);

// For now only allow one mempool defined per entry point
let mut entry_points = vec![];
for mempool_config in mempool_configs.0.values() {
let ep = mempool_config.entry_point();
if entry_points.contains(&ep) {
bail!("multiple mempool configs defined for entry point {:?}", ep);
}
entry_points.push(ep);
}

Some(mempool_configs)
} else {
None
Expand Down
44 changes: 27 additions & 17 deletions crates/builder/src/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,14 @@ pub(crate) enum BundleProposerError {

pub(crate) struct BundleProposerImpl<EP, BP> {
builder_index: u64,
builder_tag: String,
settings: Settings,
ep_providers: EP,
bundle_providers: BP,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
condition_not_met_notified: bool,
metric: BuilderProposerMetric,
filter_id: Option<String>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -343,19 +345,23 @@ where
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
builder_index: u64,
builder_tag: String,
ep_providers: EP,
bundle_providers: BP,
settings: Settings,
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
filter_id: Option<String>,
) -> Self {
Self {
builder_index,
builder_tag,
ep_providers,
bundle_providers,
settings,
event_sender,
condition_not_met_notified: false,
metric: BuilderProposerMetric::default(),
filter_id,
}
}

Expand All @@ -380,7 +386,7 @@ where
|| op.uo.max_priority_fee_per_gas() < required_op_fees.max_priority_fee_per_gas
{
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op.uo),
SkipReason::InsufficientFees {
required_fees: required_op_fees,
Expand Down Expand Up @@ -433,7 +439,7 @@ where
"Failed to calculate required pre-verification gas for op: {e:?}, skipping"
);
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
op_hash,
SkipReason::Other {
reason: Arc::new(format!(
Expand All @@ -454,7 +460,7 @@ where

if op.uo.pre_verification_gas() < required_pvg {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
op_hash,
SkipReason::InsufficientPreVerificationGas {
base_fee,
Expand Down Expand Up @@ -506,7 +512,7 @@ where
entity_infos: _,
} => {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
op_hash,
SkipReason::Other {
reason: Arc::new(format!("Failed to simulate op: {error:?}, skipping")),
Expand Down Expand Up @@ -541,7 +547,7 @@ where
Ok(simulation) => simulation,
Err(error) => {
self.emit(BuilderEvent::rejected_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
OpRejectionReason::FailedRevalidation {
error: error.clone(),
Expand All @@ -566,7 +572,7 @@ where
.contains(Timestamp::now(), TIME_RANGE_BUFFER)
{
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
SkipReason::InvalidTimeRange {
valid_range: simulation.valid_time_range,
Expand All @@ -583,7 +589,7 @@ where
>= self.settings.chain_spec.max_transaction_size_bytes
{
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
SkipReason::TransactionSizeLimit,
));
Expand All @@ -595,7 +601,7 @@ where
gas_spent + op.computation_gas_limit(&self.settings.chain_spec, None);
if required_gas > self.settings.max_bundle_gas {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
SkipReason::GasLimit,
));
Expand All @@ -606,14 +612,14 @@ where
let mut new_expected_storage = context.expected_storage.clone();
if let Err(e) = new_expected_storage.merge(&simulation.expected_storage) {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
SkipReason::ExpectedStorageConflict(e.to_string()),
));
continue;
} else if new_expected_storage.num_slots() > self.settings.max_expected_storage_slots {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
SkipReason::ExpectedStorageLimit,
));
Expand All @@ -630,7 +636,7 @@ where
// batch, but don't reject them (remove them from pool).
info!("Excluding op from {:?} because it accessed the address of another sender in the bundle.", op.sender());
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op),
SkipReason::AccessedOtherSender { other_sender },
));
Expand Down Expand Up @@ -697,7 +703,7 @@ where

for (index, reason) in to_reject {
self.emit(BuilderEvent::rejected_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&context.get_op_at(index)?.op),
OpRejectionReason::ConditionNotMet(reason),
));
Expand Down Expand Up @@ -839,7 +845,7 @@ where
HandleOpsOut::Success => Ok(Some(gas_limit)),
HandleOpsOut::FailedOp(index, message) => {
self.emit(BuilderEvent::rejected_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&context.get_op_at(index)?.op),
OpRejectionReason::FailedInBundle {
message: Arc::new(message.clone()),
Expand Down Expand Up @@ -876,6 +882,7 @@ where
*self.ep_providers.entry_point().address(),
self.settings.max_bundle_size,
self.builder_index,
self.filter_id.clone(),
)
.await
.context("should get ops from pool")?
Expand Down Expand Up @@ -1041,7 +1048,7 @@ where
// iterate in reverse so that we can remove ops without affecting the index of the next op to remove
for index in to_remove.into_iter().rev() {
self.emit(BuilderEvent::rejected_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&context.get_op_at(index)?.op),
OpRejectionReason::FailedInBundle {
message: Arc::new("post op reverted leading to entry point revert".to_owned()),
Expand Down Expand Up @@ -1143,7 +1150,7 @@ where
.is_none()
{
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op.uo),
SkipReason::UnsupportedAggregator(agg),
));
Expand All @@ -1156,7 +1163,7 @@ where
let gas = op.uo.computation_gas_limit(&self.settings.chain_spec, None);
if gas_left < gas {
self.emit(BuilderEvent::skipped_op(
self.builder_index,
self.builder_tag.clone(),
self.op_hash(&op.uo),
SkipReason::GasLimit,
));
Expand Down Expand Up @@ -2882,13 +2889,14 @@ mod tests {
entity_infos: EntityInfos::default(),
aggregator: None,
da_gas_data: Default::default(),
filter_id: None,
})
.collect();

let mut pool_client = MockPool::new();
pool_client
.expect_get_ops()
.returning(move |_, _, _| Ok(ops.clone()));
.returning(move |_, _, _, _| Ok(ops.clone()));

let simulations_by_op: HashMap<_, _> = mock_ops
.into_iter()
Expand Down Expand Up @@ -2994,6 +3002,7 @@ mod tests {

let mut proposer = BundleProposerImpl::new(
0,
"test".to_string(),
ProvidersWithEntryPoint::new(
Arc::new(provider),
Arc::new(entry_point),
Expand All @@ -3012,6 +3021,7 @@ mod tests {
max_expected_storage_slots: MAX_EXPECTED_STORAGE_SLOTS,
},
event_sender,
None,
);

if notify_condition_not_met {
Expand Down
22 changes: 11 additions & 11 deletions crates/builder/src/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub(crate) struct Settings {

#[derive(Debug)]
pub(crate) struct BundleSenderImpl<UO, P, E, T, C> {
builder_index: u64,
builder_tag: String,
bundle_action_receiver: Option<mpsc::Receiver<BundleSenderAction>>,
chain_spec: ChainSpec,
sender_eoa: Address,
Expand Down Expand Up @@ -143,7 +143,7 @@ where
/// Loops forever, attempting to form and send a bundle on each new block,
/// then waiting for one bundle to be mined or dropped before forming the
/// next one.
#[instrument(skip_all, fields(entry_point = self.entry_point.address().to_string(), builder_index = self.builder_index))]
#[instrument(skip_all, fields(entry_point = self.entry_point.address().to_string(), tag = self.builder_tag))]
async fn send_bundles_in_loop<TS: TaskSpawner>(mut self, task_spawner: TS) {
// trigger for sending bundles
let sender_trigger = BundleSenderTrigger::new(
Expand Down Expand Up @@ -179,7 +179,7 @@ where
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
builder_index: u64,
builder_tag: String,
bundle_action_receiver: mpsc::Receiver<BundleSenderAction>,
chain_spec: ChainSpec,
sender_eoa: Address,
Expand All @@ -192,7 +192,7 @@ where
event_sender: broadcast::Sender<WithEntryPoint<BuilderEvent>>,
) -> Self {
Self {
builder_index,
builder_tag: builder_tag.clone(),
bundle_action_receiver: Some(bundle_action_receiver),
chain_spec,
sender_eoa,
Expand All @@ -204,7 +204,7 @@ where
event_sender,
metrics: BuilderMetric::new_with_labels(&[
("entry_point", entry_point.address().to_string()),
("builder_index", builder_index.to_string()),
("builder_tag", builder_tag),
]),
entry_point,
_uo_type: PhantomData,
Expand Down Expand Up @@ -371,7 +371,7 @@ where
}

self.emit(BuilderEvent::transaction_mined(
self.builder_index,
self.builder_tag.clone(),
tx_hash,
nonce,
block_number,
Expand All @@ -381,7 +381,7 @@ where
TrackerUpdate::LatestTxDropped { nonce } => {
info!("Latest transaction dropped, starting new bundle attempt");
self.emit(BuilderEvent::latest_transaction_dropped(
self.builder_index,
self.builder_tag.clone(),
nonce,
));
self.metrics.bundle_txns_dropped.increment(1);
Expand All @@ -391,7 +391,7 @@ where
TrackerUpdate::NonceUsedForOtherTx { nonce } => {
info!("Nonce used externally, starting new bundle attempt");
self.emit(BuilderEvent::nonce_used_for_other_transaction(
self.builder_index,
self.builder_tag.clone(),
nonce,
));
self.metrics.bundle_txns_nonce_used.increment(1);
Expand Down Expand Up @@ -578,7 +578,7 @@ where

let Some(bundle_tx) = self.get_bundle_tx(nonce, bundle).await? else {
self.emit(BuilderEvent::formed_bundle(
self.builder_index,
self.builder_tag.clone(),
None,
nonce,
fee_increase_count,
Expand All @@ -602,7 +602,7 @@ where
match send_result {
Ok(tx_hash) => {
self.emit(BuilderEvent::formed_bundle(
self.builder_index,
self.builder_tag.clone(),
Some(BundleTxDetails {
tx_hash,
tx,
Expand Down Expand Up @@ -1791,7 +1791,7 @@ mod tests {
MockPool,
> {
BundleSenderImpl::new(
0,
"any:0".to_string(),
mpsc::channel(1000).1,
ChainSpec::default(),
Address::default(),
Expand Down
Loading
Loading