Skip to content

Commit

Permalink
reworked configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
OakenKnight committed Jan 31, 2025
1 parent 9ecce10 commit 8efd171
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 52 deletions.
73 changes: 65 additions & 8 deletions code/crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,18 +342,75 @@ mod gossipsub {
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "load_type", rename_all = "snake_case")]
pub enum MempoolLoadType {
UniformLoad {
count: usize,
size: usize,
},
#[default]
UniformLoad(UniformLoadConfig),
// #[default]
NoLoad,
NonUniformLoad,
}

impl Default for MempoolLoadType {
fn default() -> Self {
Self::UniformLoad(UniformLoadConfig::default())
}
}

#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(from = "uniformload::RawConfig", default)]
pub struct UniformLoadConfig {
#[serde(with = "humantime_serde")]
interval: Duration,
count: usize,
size: usize,
}
impl UniformLoadConfig {
fn new(interval: Duration, count: usize, size: usize) -> Self {
Self {
interval: interval,
count: count,
size: size,
}
}
pub fn interval(&self) -> Duration {
self.interval
}

pub fn count(&self) -> usize {
self.count
}

pub fn size(&self) -> usize {
self.size
}
}
impl Default for UniformLoadConfig {
fn default() -> Self {
Self::new(Duration::from_secs(10), 1000, 256)
}
}

mod uniformload {
use std::time::Duration;

#[derive(serde::Deserialize)]
pub struct RawConfig {
#[serde(default)]
#[serde(with = "humantime_serde")]
interval: Duration,
#[serde(default)]
count: usize,
#[serde(default)]
size: usize,
}

impl From<RawConfig> for super::UniformLoadConfig {
fn from(raw: RawConfig) -> Self {
super::UniformLoadConfig::new(raw.interval, raw.count, raw.size)
}
}
}
/// Mempool configuration options
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct MempoolLoadConfig {
Expand Down Expand Up @@ -436,7 +493,7 @@ pub struct TimeoutConfig {
#[serde(with = "humantime_serde")]
pub timeout_propose_delta: Duration,

/// How long we wait after receiving +2/3 prevotes for "anything" (ie. not a single block or nil)
/// How long we wait after receiving +2/3 prevotes for anything (ie. not a single block or nil)
#[serde(with = "humantime_serde")]
pub timeout_prevote: Duration,

Expand Down
50 changes: 27 additions & 23 deletions code/crates/starknet/host/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytesize::ByteSize;
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
use rand::RngCore;
use tracing::{debug, info, trace};

use malachitebft_test_mempool::types::MempoolTransactionBatch;
Expand Down Expand Up @@ -216,14 +215,15 @@ impl Actor for Mempool {
reply.send(txes)?;
}

Msg::Update { .. } => {
Msg::Update { tx_hashes } => {
// Clear all transactions from the mempool, given that we consume
// the full mempool when proposing a block.
//
// This reduces the mempool protocol overhead and allow us to
// observe issues strictly related to consensus.
// It also bumps performance, as we reduce the mempool's background traffic.
state.transactions.clear();
// state.transactions.clear();
tx_hashes.iter().for_each(|hash| state.remove_tx(hash));
}
}

Expand Down Expand Up @@ -254,35 +254,39 @@ fn generate_and_broadcast_txes(
let gossip_enabled = gossip_batch_size > 0;

// Start with transactions already in the mempool
let mut transactions = std::mem::take(&mut state.transactions)
let transactions = std::mem::take(&mut state.transactions)
.into_values()
.take(count)
.collect::<Vec<_>>();

let initial_count = transactions.len();
// let initial_count = transactions.len();

let mut tx_batch = Transactions::default();
let mut rng = rand::thread_rng();
let mut tx_batch = Transactions::new(transactions.clone());
// let mut rng = rand::thread_rng();

for _ in initial_count..count {
// Generate transaction
let mut tx_bytes = vec![0; size];
rng.fill_bytes(&mut tx_bytes);
let tx = Transaction::new(tx_bytes);
if gossip_enabled && tx_batch.len() >= batch_size {
let tx_batch = std::mem::take(&mut tx_batch).to_any().unwrap();
let mempool_batch = MempoolTransactionBatch::new(tx_batch);
mempool_network.cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?;
}
debug!("reaped transactions: {:?}", transactions.len());
// for _ in initial_count..count {
// // Generate transaction
// let mut tx_bytes = vec![0; size];
// rng.fill_bytes(&mut tx_bytes);
// let tx = Transaction::new(tx_bytes);

if gossip_enabled {
tx_batch.push(tx.clone());
}
// if gossip_enabled {
// tx_batch.push(tx.clone());
// }

transactions.push(tx);
// transactions.push(tx);

// Gossip tx-es to peers in batches
if gossip_enabled && tx_batch.len() >= batch_size {
let tx_batch = std::mem::take(&mut tx_batch).to_any().unwrap();
let mempool_batch = MempoolTransactionBatch::new(tx_batch);
mempool_network.cast(MempoolNetworkMsg::BroadcastMsg(mempool_batch))?;
}
}
// // Gossip tx-es to peers in batches
// if gossip_enabled && tx_batch.len() >= batch_size {

// }
// }

Ok(transactions)
}
29 changes: 16 additions & 13 deletions code/crates/starknet/host/src/mempool_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct MempoolLoad {
impl Default for Params {
fn default() -> Self {
Self {
load_type: MempoolLoadType::NoLoad,
load_type: MempoolLoadType::default(),
}
}
}
Expand Down Expand Up @@ -80,7 +80,7 @@ impl MempoolLoad {

transactions.push(tx);
}
debug!("transactions generated {:?}", transactions.clone().len());
// debug!("transactions generated {:?}", transactions.clone().len());

transactions
}
Expand All @@ -100,17 +100,20 @@ impl Actor for MempoolLoad {
debug!("starting ticker");

let ticker = match self.params.load_type {
MempoolLoadType::UniformLoad { count, size } => {
debug!("entered uniform load branch");

let interval = Duration::from_secs(1);
tokio::spawn(ticker(interval, myself.clone(), move || {
Msg::GenerateTransactions { count, size }
}))
MempoolLoadType::UniformLoad(uniform_load_config) => {
// debug!("entered uniform load branch");
tokio::spawn(ticker(
uniform_load_config.interval(),
myself.clone(),
move || Msg::GenerateTransactions {
count: uniform_load_config.count(),
size: uniform_load_config.size(),
},
))
}
MempoolLoadType::NoLoad => tokio::spawn(async {}),
MempoolLoadType::NonUniformLoad => {
debug!("entered nonuniform load branch");
// debug!("entered nonuniform load branch");

let mut rng = rand::thread_rng();
let interval = Duration::from_secs(rng.gen_range(1..10));
Expand Down Expand Up @@ -142,13 +145,13 @@ impl Actor for MempoolLoad {
) -> Result<(), ActorProcessingErr> {
match msg {
Msg::GenerateTransactions { count, size } => {
debug!("entered message handler GenerateTransactions");
// debug!("entered message handler GenerateTransactions");

let transactions = Self::generate_transactions(count, size);
debug!("broadcasting transactions {:?}", transactions.len());
// debug!("broadcasting transactions {:?}", transactions.len());

let tx_batch = Transactions::new(transactions).to_any().unwrap();
debug!("broadcasting batch {:?}", tx_batch.clone().value.len());
// debug!("broadcasting batch {:?}", tx_batch.clone().value.len());

let mempool_batch: MempoolTransactionBatch = MempoolTransactionBatch::new(tx_batch);

Expand Down
3 changes: 0 additions & 3 deletions code/crates/starknet/host/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,6 @@ async fn spawn_mempool_load_actor(
network: MempoolNetworkRef,
span: &tracing::Span,
) -> MempoolLoadRef {
// let params = mempool_load::Params::default();

// debug!("spawned mempool load actor with params {:?}", params);
MempoolLoad::spawn(
Params {
load_type: mempool_load_config.load_type,
Expand Down
2 changes: 1 addition & 1 deletion code/crates/starknet/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ pub fn make_node_config<S>(test: &Test<S>, i: usize) -> NodeConfig {
gossip_batch_size: 100,
},
mempool_load: MempoolLoadConfig {
load_type: MempoolLoadType::NonUniformLoad,
load_type: MempoolLoadType::default(),
},
sync: SyncConfig {
enabled: true,
Expand Down
2 changes: 1 addition & 1 deletion code/crates/test/cli/src/cmd/distributed_testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ fn generate_distributed_config(
gossip_batch_size: 0,
},
mempool_load: MempoolLoadConfig {
load_type: MempoolLoadType::NonUniformLoad,
load_type: MempoolLoadType::default(),
},
sync: SyncConfig {
enabled: false,
Expand Down
2 changes: 1 addition & 1 deletion code/crates/test/cli/src/new.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub fn generate_config(
gossip_batch_size: 0,
},
mempool_load: MempoolLoadConfig {
load_type: MempoolLoadType::NonUniformLoad,
load_type: MempoolLoadType::default(),
},
sync: Default::default(),
metrics: MetricsConfig {
Expand Down
13 changes: 11 additions & 2 deletions code/examples/channel/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,19 @@ type = "gossipsub"
### Mempool Load Configuration Options ###
#######################################################

[mempool_load]
[mempool_load.load_type]
# Sets the type of generation for mempool transactions
# Valid options are "no_load", "uniform_load", "non_uniform_load"
load_type = "no_load"
load_type = "uniform_load"

# Only available when "uniform_load", mempool load interval
interval = "5s"

# Only available when "uniform_load", mempool load size
size = 256

# Only available when "uniform_load", mempool load count
count = 1000

#######################################################
### Sync Configuration Options ###
Expand Down

0 comments on commit 8efd171

Please sign in to comment.