Skip to content

Commit

Permalink
wip, nonuniform load
Browse files Browse the repository at this point in the history
  • Loading branch information
OakenKnight committed Jan 31, 2025
1 parent 0aa4d13 commit d6c56f4
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 49 deletions.
21 changes: 4 additions & 17 deletions code/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ impl Default for MempoolLoadType {
Self::UniformLoad(UniformLoadConfig::default())
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
#[serde(from = "nonuniformload::RawConfig", default)]
pub struct NonUniformLoadConfig {
/// Base transaction count
Expand Down Expand Up @@ -424,19 +424,6 @@ impl NonUniformLoadConfig {
}
}

impl Default for NonUniformLoadConfig {
fn default() -> Self {
Self {
base_count: Default::default(),
base_size: Default::default(),
count_variation: Default::default(),
size_variation: Default::default(),
spike_probability: Default::default(),
spike_multiplier: Default::default(),
sleep_interval: Default::default(),
}
}
}
mod nonuniformload {
#[derive(serde::Deserialize)]
pub struct RawConfig {
Expand Down Expand Up @@ -495,9 +482,9 @@ pub struct UniformLoadConfig {
impl UniformLoadConfig {
fn new(interval: Duration, count: usize, size: usize) -> Self {
Self {
interval: interval,
count: count,
size: size,
interval,
count,
size,
}
}
pub fn interval(&self) -> Duration {
Expand Down
74 changes: 44 additions & 30 deletions code/crates/starknet/host/src/mempool_load.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::thread::sleep;
use std::time::Duration;

use async_trait::async_trait;
use ractor::{concurrency::JoinHandle, Actor, ActorProcessingErr, ActorRef};
use rand::seq::IteratorRandom;
use rand::{Rng, RngCore};
use tracing::debug;

Expand All @@ -28,7 +30,7 @@ pub struct State {
ticker: JoinHandle<()>,
}

#[derive(Debug)]
#[derive(Debug, Default)]
pub struct Params {
pub load_type: MempoolLoadType,
}
Expand All @@ -39,14 +41,6 @@ pub struct MempoolLoad {
span: tracing::Span,
}

impl Default for Params {
fn default() -> Self {
Self {
load_type: MempoolLoadType::default(),
}
}
}

impl MempoolLoad {
pub fn new(params: Params, network: MempoolNetworkRef, span: tracing::Span) -> Self {
Self {
Expand Down Expand Up @@ -100,29 +94,49 @@ impl Actor for MempoolLoad {
debug!("starting ticker");

let ticker = match self.params.load_type.clone() {
MempoolLoadType::UniformLoad(uniform_load_config) => {
// debug!("entered uniform load branch");
tokio::spawn(ticker(
uniform_load_config.interval(),
myself.clone(),
move || Msg::GenerateTransactions {
count: uniform_load_config.count(),
size: uniform_load_config.size(),
},
))
}
MempoolLoadType::UniformLoad(uniform_load_config) => tokio::spawn(ticker(
uniform_load_config.interval(),
myself.clone(),
move || Msg::GenerateTransactions {
count: uniform_load_config.count(),
size: uniform_load_config.size(),
},
)),
MempoolLoadType::NoLoad => tokio::spawn(async {}),
MempoolLoadType::NonUniformLoad(non_uniform_load) => {
// debug!("entered nonuniform load branch");

MempoolLoadType::NonUniformLoad(params) => tokio::spawn(async move {
// loop {
let mut rng = rand::thread_rng();
let interval = Duration::from_secs(rng.gen_range(1..10));
let count = rng.gen_range(500..=10000) as usize;
let size = rng.gen_range(128..=512) as usize;
tokio::spawn(ticker(interval, myself.clone(), move || {
Msg::GenerateTransactions { count, size }
}))
}
// Determine if this iteration should generate a spike
let is_spike = rng.gen_bool(params.spike_probability());

// Vary transaction count and size
let count_variation = rng.gen_range(params.count_variation());
let size_variation = rng.gen_range(params.size_variation());

let count = if is_spike {
(params.base_count() + count_variation as usize) * params.spike_multiplier()
} else {
params.base_count() + count_variation as usize
};

let size = params.base_size() + size_variation as usize;

// Create and send the message
let msg = Msg::GenerateTransactions {
count: count.max(1), // Ensure count is at least 1
size: size.max(1), // Ensure size is at least 1
};

if let Err(er) = myself.cast(msg) {
tracing::error!(?er, ?myself, "Failed to send tick message");
// break;
}
// Random sleep between 100ms and 1s
let sleep_duration =
Duration::from_millis(params.sleep_interval().choose(&mut rng).unwrap());
sleep(sleep_duration);
// }
}),
};
Ok(State { ticker })
}
Expand Down
13 changes: 11 additions & 2 deletions code/crates/starknet/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use tracing::{debug, error, error_span, info, Instrument, Span};

use malachitebft_config::{
Config as NodeConfig, Config, DiscoveryConfig, LoggingConfig, MempoolLoadConfig,
MempoolLoadType, PubSubProtocol, SyncConfig, TestConfig, TransportProtocol,
MempoolLoadType, NonUniformLoadConfig, PubSubProtocol, SyncConfig, TestConfig,
TransportProtocol,
};
use malachitebft_core_consensus::{LocallyProposedValue, SignedConsensusMsg};
use malachitebft_core_types::{SignedVote, VotingPower};
Expand Down Expand Up @@ -737,7 +738,15 @@ pub fn make_node_config<S>(test: &Test<S>, i: usize) -> NodeConfig {
gossip_batch_size: 100,
},
mempool_load: MempoolLoadConfig {
load_type: MempoolLoadType::default(),
load_type: MempoolLoadType::NonUniformLoad(NonUniformLoadConfig::new(
100, // Base number of transactions
256, // Random variation in transaction count
1024..2048, // Base transaction size in bytes
0..512, // Random variation in transaction size
0.1, // 10% chance of spike
5, // 5x multiplier during spikes
100..500, // Sleep between 100ms and 500ms
)),
},
sync: SyncConfig {
enabled: true,
Expand Down

0 comments on commit d6c56f4

Please sign in to comment.