Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(code/starknet): Add mempool load generator to the Starknet test app #821

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
22 changes: 22 additions & 0 deletions code/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub struct Config {
/// Mempool configuration options
pub mempool: MempoolConfig,

/// Mempool load configuration options
pub mempool_load: MempoolLoadConfig,

/// Sync configuration options
pub sync: SyncConfig,

Expand Down Expand Up @@ -339,6 +342,25 @@ mod gossipsub {
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum MempoolLoadType {
UniformLoad {
count: usize,
size: usize,
},
#[default]
NoLoad,
NonUniformLoad,
}

/// Mempool configuration options
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct MempoolLoadConfig {
/// Mempool loading type
pub load_type: MempoolLoadType,
}

/// Mempool configuration options
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct MempoolConfig {
Expand Down
9 changes: 8 additions & 1 deletion code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ use crate::host::proposal::compute_proposal_signature;
use crate::host::state::HostState;
use crate::host::{Host as _, StarknetHost};
use crate::mempool::{MempoolMsg, MempoolRef};
use crate::mempool_load::MempoolLoadRef;
use crate::proto::Protobuf;
use crate::types::*;

pub struct Host {
mempool: MempoolRef,
mempool_load: MempoolLoadRef,
network: NetworkRef<MockContext>,
metrics: Metrics,
span: tracing::Span,
Expand All @@ -41,6 +43,7 @@ impl Host {
home_dir: PathBuf,
host: StarknetHost,
mempool: MempoolRef,
mempool_load: MempoolLoadRef,
network: NetworkRef<MockContext>,
metrics: Metrics,
span: tracing::Span,
Expand All @@ -51,7 +54,7 @@ impl Host {

let (actor_ref, _) = Actor::spawn(
None,
Self::new(mempool, network, metrics, span),
Self::new(mempool, mempool_load, network, metrics, span),
HostState::new(host, db_path, &mut StdRng::from_entropy()),
)
.await?;
Expand All @@ -61,12 +64,14 @@ impl Host {

pub fn new(
mempool: MempoolRef,
mempool_load: MempoolLoadRef,
network: NetworkRef<MockContext>,
metrics: Metrics,
span: tracing::Span,
) -> Self {
Self {
mempool,
mempool_load,
network,
metrics,
span,
Expand All @@ -86,6 +91,7 @@ impl Actor for Host {
initial_state: Self::State,
) -> Result<Self::State, ActorProcessingErr> {
self.mempool.link(myself.get_cell());
self.mempool_load.link(myself.get_cell());

Ok(initial_state)
}
Expand Down Expand Up @@ -545,6 +551,7 @@ async fn on_received_proposal_part(
Ok(())
}

//TODO
async fn on_decided(
state: &mut HostState,
consensus: &ConsensusRef<MockContext>,
Expand Down
5 changes: 5 additions & 0 deletions code/crates/starknet/host/src/host/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use malachitebft_core_types::{CommitCertificate, Extension, Round, SignedExtensi

use crate::host::Host;
use crate::mempool::MempoolRef;
use crate::mempool_load::MempoolLoadRef;
// use crate::mempool_load::MempoolLoadRef;
OakenKnight marked this conversation as resolved.
Show resolved Hide resolved
use crate::part_store::PartStore;
use crate::types::*;

Expand All @@ -33,6 +35,7 @@ pub struct StarknetParams {
pub struct StarknetHost {
pub params: StarknetParams,
pub mempool: MempoolRef,
pub mempool_load: MempoolLoadRef,
pub address: Address,
pub private_key: PrivateKey,
pub validator_set: ValidatorSet,
Expand All @@ -43,13 +46,15 @@ impl StarknetHost {
pub fn new(
params: StarknetParams,
mempool: MempoolRef,
mempool_load: MempoolLoadRef,
address: Address,
private_key: PrivateKey,
validator_set: ValidatorSet,
) -> Self {
Self {
params,
mempool,
mempool_load,
address,
private_key,
validator_set,
Expand Down
4 changes: 2 additions & 2 deletions code/crates/starknet/host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ pub mod block_store;
pub mod codec;
pub mod host;
pub mod mempool;
pub mod mempool_load;
pub mod node;
pub mod spawn;
pub mod streaming;

pub mod utils;
pub use malachitebft_app::part_store;

pub mod proto {
pub use malachitebft_proto::*;
pub use malachitebft_starknet_p2p_proto::*;
Expand Down
163 changes: 163 additions & 0 deletions code/crates/starknet/host/src/mempool_load.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use crate::proto::Protobuf;
use async_trait::async_trait;
use malachitebft_config::MempoolLoadType;
use malachitebft_starknet_p2p_types::{Transaction, Transactions};
use malachitebft_test_mempool::types::MempoolTransactionBatch;
use ractor::{concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef};
use rand::{Rng, RngCore};
use std::time::Duration;
use tracing::debug;
OakenKnight marked this conversation as resolved.
Show resolved Hide resolved

use crate::{
mempool::network::{MempoolNetworkMsg, MempoolNetworkRef},
utils::ticker::ticker,
};

pub type MempoolLoadMsg = Msg;
pub type MempoolLoadRef = ActorRef<Msg>;
OakenKnight marked this conversation as resolved.
Show resolved Hide resolved
pub enum Msg {
GenerateTransactions { count: usize, size: usize },
}

#[derive(Debug)]
pub struct State {
ticker: JoinHandle<()>,
}

#[derive(Debug)]
pub struct Params {
pub load_type: MempoolLoadType,
}

pub struct MempoolLoad {
params: Params,
network: MempoolNetworkRef,
span: tracing::Span,
}

impl Default for Params {
fn default() -> Self {
OakenKnight marked this conversation as resolved.
Show resolved Hide resolved
Self {
load_type: MempoolLoadType::UniformLoad {
size: 555,
count: 127,
},
}
}
}

impl MempoolLoad {
pub fn new(params: Params, network: MempoolNetworkRef, span: tracing::Span) -> Self {
Self {
params,
network,
span,
}
}

pub async fn spawn(
params: Params,
network: MempoolNetworkRef,
span: tracing::Span,
) -> Result<MempoolLoadRef, ractor::SpawnErr> {
debug!("spawning actor mempool_load");

let actor = Self::new(params, network, span);
let (actor_ref, _) = Actor::spawn(None, actor, ()).await?;
Ok(actor_ref)
}

pub fn generate_transactions(count: usize, size: usize) -> Vec<Transaction> {
let mut transactions: Vec<Transaction> = Vec::with_capacity(count);
let mut rng = rand::thread_rng();

for _ in 0..count {
let mut tx_bytes = vec![0; size];
rng.fill_bytes(&mut tx_bytes);
let tx = Transaction::new(tx_bytes);
// debug!("transaction {:?}", tx.clone());

transactions.push(tx);
}
debug!("transactions generated {:?}", transactions.clone().len());

transactions
}
}

#[async_trait]
impl Actor for MempoolLoad {
type Msg = Msg;
type State = State;
type Arguments = ();

async fn pre_start(
&self,
myself: MempoolLoadRef,
_args: (),
) -> Result<State, ActorProcessingErr> {
debug!("starting ticker");

let ticker = match self.params.load_type {
MempoolLoadType::UniformLoad { count, size } => {
debug!("entered uniform load branch");

let interval = Duration::from_secs(1);
tokio::spawn(ticker(interval, myself.clone(), move || {
Msg::GenerateTransactions { count, size }
}))
}
MempoolLoadType::NoLoad => tokio::spawn(async {}),
MempoolLoadType::NonUniformLoad => {
debug!("entered nonuniform load branch");

let mut rng = rand::thread_rng();
let interval = Duration::from_secs(rng.gen_range(1..10));
let count = rng.gen_range(500..=10000) as usize;
let size = rng.gen_range(128..=512) as usize;
tokio::spawn(ticker(interval, myself.clone(), move || {
Msg::GenerateTransactions { count, size }
}))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still uniform load but with random parameters. A non-uniform load would have spikes of high load relative to the normal.

Maybe we could do something like this (in pseudo-Rust):

MempoolLoadType::NonUniformLoad {
    /// Base transaction count
    base_count: usize,

    /// Base transaction size
    base_size: usize,

    /// How much the transaction count can vary
    count_variation: std::ops::Range<i32>,

    size_variation: std::ops::Range<i32>,

    /// Chance of generating a spike.
    /// e.g. 0.1 = 10% chance of spike
    spike_probability: f64,

    /// Multiplier for spike transactions
    /// e.g. 10 = 10x more transactions during spike
    spike_multiplier: usize,

    /// Range of intervals between generating load, in milliseconds
    sleep_interval: std::ops::Range<u64>,
}
tokio::spawn(move async {
// Determine if this iteration should generate a spike
let is_spike = rng.gen_bool(params.spike_probability);

// Vary transaction count and size
let count_variation = rng.gen_range(params.count_variation);
let size_variation = rng.gen_range(params.size_variation);

let count = if is_spike {
    (params.base_count + count_variation) * params.spike_multiplier
} else {
    params.base_count + count_variation
};

let size = params.base_size + size_variation;

// Create and send the message
let msg = HostMsg::GenerateTransactions {
    count: count.max(1), // Ensure count is at least 1
    size: size.max(1),   // Ensure size is at least 1
};

if myself.send(msg).is_err() {
    println!("Channel closed, stopping load generator");
    break;
}

// Random sleep between 100ms and 1s
let sleep_duration = Duration::from_millis(params.sleep_interval);
sleep(sleep_duration);

Note that this does not require ticker as the sleep duration is random.

}
};
Ok(State { ticker })
}

async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
state.ticker.abort();
Ok(())
}

#[tracing::instrument("host.mempool_load", parent = &self.span, skip_all)]
async fn handle(
&self,
_myself: MempoolLoadRef,
msg: Msg,
_state: &mut State,
) -> Result<(), ActorProcessingErr> {
match msg {
Msg::GenerateTransactions { count, size } => {
debug!("entered message handler GenerateTransactions");

let mut tx_batch = Transactions::default();
let transactions = Self::generate_transactions(count, size);
debug!("broadcasting transactions {:?}", transactions.len());

for tx in transactions {
tx_batch.push(tx);
}
let tx_batch1 = std::mem::take(&mut tx_batch).to_any().unwrap();
let mempool_batch = MempoolTransactionBatch::new(tx_batch1);
debug!("broadcasting batch {:?}", tx_batch.len());

self.network
.cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?;
Ok(())
}
}
}
}
29 changes: 28 additions & 1 deletion code/crates/starknet/host/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use malachitebft_engine::wal::{Wal, WalRef};
use tokio::task::JoinHandle;

use malachitebft_config::{
self as config, Config as NodeConfig, MempoolConfig, SyncConfig, TestConfig, TransportProtocol,
self as config, Config as NodeConfig, MempoolConfig, MempoolLoadConfig, SyncConfig, TestConfig,
TransportProtocol,
};
use malachitebft_core_consensus::ValuePayload;
use malachitebft_engine::consensus::{Consensus, ConsensusParams, ConsensusRef};
Expand All @@ -26,6 +27,7 @@ use crate::codec::ProtobufCodec;
use crate::host::{StarknetHost, StarknetParams};
use crate::mempool::network::{MempoolNetwork, MempoolNetworkRef};
use crate::mempool::{Mempool, MempoolRef};
use crate::mempool_load::{MempoolLoad, MempoolLoadRef, Params};
use crate::types::MockContext;
use crate::types::{Address, Height, PrivateKey, ValidatorSet};

Expand All @@ -50,6 +52,8 @@ pub async fn spawn_node_actor(
let mempool_network = spawn_mempool_network_actor(&cfg, &private_key, &registry, &span).await;
let mempool =
spawn_mempool_actor(mempool_network.clone(), &cfg.mempool, &cfg.test, &span).await;
let mempool_load =
spawn_mempool_load_actor(&cfg.mempool_load, mempool_network.clone(), &span).await;

// Spawn consensus gossip
let network = spawn_network_actor(&cfg, &private_key, &registry, &span).await;
Expand All @@ -62,6 +66,7 @@ pub async fn spawn_node_actor(
&private_key,
&initial_validator_set,
mempool.clone(),
mempool_load,
network.clone(),
metrics.clone(),
&span,
Expand Down Expand Up @@ -291,6 +296,25 @@ async fn spawn_mempool_actor(
.unwrap()
}

async fn spawn_mempool_load_actor(
mempool_load_config: &MempoolLoadConfig,
network: MempoolNetworkRef,
span: &tracing::Span,
) -> MempoolLoadRef {
// let params = mempool_load::Params::default();

// debug!("spawned mempool load actor with params {:?}", params);
MempoolLoad::spawn(
Params {
load_type: mempool_load_config.load_type,
},
network,
span.clone(),
)
.await
.unwrap()
}

async fn spawn_mempool_network_actor(
cfg: &NodeConfig,
private_key: &PrivateKey,
Expand Down Expand Up @@ -322,6 +346,7 @@ async fn spawn_host_actor(
private_key: &PrivateKey,
initial_validator_set: &ValidatorSet,
mempool: MempoolRef,
mempool_load: MempoolLoadRef,
network: NetworkRef<MockContext>,
metrics: Metrics,
span: &tracing::Span,
Expand All @@ -346,6 +371,7 @@ async fn spawn_host_actor(
let mock_host = StarknetHost::new(
mock_params,
mempool.clone(),
mempool_load.clone(),
*address,
*private_key,
initial_validator_set.clone(),
Expand All @@ -355,6 +381,7 @@ async fn spawn_host_actor(
home_dir.to_owned(),
mock_host,
mempool,
mempool_load,
network,
metrics,
span.clone(),
Expand Down
1 change: 1 addition & 0 deletions code/crates/starknet/host/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod ticker;
Loading
Loading