Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spec/test): part streaming mbt - itf-trace parsing #814

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions code/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions code/crates/engine/src/util/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub enum StreamContent<T> {
Data(T),

/// Fin must be set to true.
/// Q: Why not just leave out the bool then?
Fin(bool),
}

Expand Down
16 changes: 16 additions & 0 deletions code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,11 @@ async fn on_get_decided_block(
Ok(())
}

/// This function handles receiving parts of proposals
/// And assembling proposal in right sequence when all parts are collected
///
/// For each proposal from distinct peer separate stream is opened
/// Bookkeeping of streams is done inside HostState.part_streams_map ((peerId + streamId) -> streamState)
async fn on_received_proposal_part(
state: &mut HostState,
part: StreamMessage<ProposalPart>,
Expand All @@ -495,6 +500,15 @@ async fn on_received_proposal_part(
// TODO - use state.host.receive_proposal() and move some of the logic below there
let sequence = part.sequence;

// When inserting part in a map, stream tries to connect all received parts in the right order,
// starting from beginning and emits parts sequence chunks when it succeeds
// If it can't connect part, it buffers it
// E.g. buffered: 1 3 7
// part 0 (first) arrives -> 0 and 1 are emitted
// 4 arrives -> gets buffered
// 2 arrives -> 2, 3 and 4 are emitted

// 'insert' returns connected sequence of parts if any is emitted
let Some(parts) = state.part_streams_map.insert(from, part) else {
return Ok(());
};
Expand All @@ -512,6 +526,8 @@ async fn on_received_proposal_part(
return Ok(());
}

// Emitted parts are stored and simulated (if it is tx)
// When finish part is stored, proposal value is built from all of them
for part in parts.parts {
debug!(
part.sequence = %sequence,
Expand Down
1 change: 0 additions & 1 deletion code/crates/starknet/host/src/host/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ impl HostState {

if let ProposalPart::Transactions(_txes) = &part {
debug!("Simulating tx execution and proof verification");

// Simulate Tx execution and proof verification (assumes success)
// TODO: Add config knob for invalid blocks
let num_txes = part.tx_count() as u32;
Expand Down
49 changes: 29 additions & 20 deletions code/crates/starknet/host/src/streaming.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/// The most of algorithm logic is explained in '/malachite/code/crates/starknet/host/src/actor.rs'
/// Above 'on_received_proposal_part'
use std::cmp::Ordering;
use std::collections::{BTreeMap, BinaryHeap, HashSet};

Expand All @@ -8,63 +10,69 @@ use malachitebft_core_types::Round;
use malachitebft_engine::util::streaming::{Sequence, StreamId, StreamMessage};

use crate::types::{Address, Height, ProposalInit, ProposalPart};
use std::fmt::Debug;

struct MinSeq<T>(StreamMessage<T>);
// Wraps StreamMessage to implement custom ordering for a BinaryHeap
// The default BinaryHeap is a max-heap, so we reverse the ordering
// by implementing Ord in reverse to make it min-heap, which suits the purpose of efficiently
// providing available proposal part with smallest sequence number
#[derive_where(Debug)]
pub struct MinSeq<T: Debug>(pub StreamMessage<T>);

impl<T> PartialEq for MinSeq<T> {
impl<T: Debug> PartialEq for MinSeq<T> {
fn eq(&self, other: &Self) -> bool {
self.0.sequence == other.0.sequence
}
}

impl<T> Eq for MinSeq<T> {}
impl<T: Debug> Eq for MinSeq<T> {}

impl<T> Ord for MinSeq<T> {
impl<T: Debug> Ord for MinSeq<T> {
fn cmp(&self, other: &Self) -> Ordering {
other.0.sequence.cmp(&self.0.sequence)
}
}

impl<T> PartialOrd for MinSeq<T> {
impl<T: Debug> PartialOrd for MinSeq<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

struct MinHeap<T>(BinaryHeap<MinSeq<T>>);
#[derive_where(Debug)]
pub struct MinHeap<T: Debug>(pub BinaryHeap<MinSeq<T>>);

impl<T> Default for MinHeap<T> {
impl<T: Debug> Default for MinHeap<T> {
fn default() -> Self {
Self(BinaryHeap::new())
}
}

impl<T> MinHeap<T> {
impl<T: Debug> MinHeap<T> {
fn push(&mut self, msg: StreamMessage<T>) {
self.0.push(MinSeq(msg));
}

fn pop(&mut self) -> Option<StreamMessage<T>> {
pub fn pop(&mut self) -> Option<StreamMessage<T>> {
self.0.pop().map(|msg| msg.0)
}

fn peek(&self) -> Option<&StreamMessage<T>> {
self.0.peek().map(|msg| &msg.0)
}
}

#[derive_where(Default)]
struct StreamState<T> {
buffer: MinHeap<T>,
init_info: Option<ProposalInit>,
seen_sequences: HashSet<Sequence>,
next_sequence: Sequence,
total_messages: usize,
fin_received: bool,
emitted_messages: usize,
#[derive_where(Default, Debug)]
pub struct StreamState<T: Debug> {
pub buffer: MinHeap<T>,
pub init_info: Option<ProposalInit>,
pub seen_sequences: HashSet<Sequence>,
pub next_sequence: Sequence,
pub total_messages: usize,
pub fin_received: bool,
pub emitted_messages: usize,
}

impl<T> StreamState<T> {
impl<T: Debug> StreamState<T> {
fn has_emitted_all_messages(&self) -> bool {
self.fin_received && self.emitted_messages == self.total_messages
}
Expand All @@ -78,6 +86,7 @@ impl<T> StreamState<T> {
self.emitted_messages += 1;
}

// Emits all buffered successive parts if they are next in sequence
fn emit_eligible_messages(&mut self, to_emit: &mut Vec<T>) {
while let Some(msg) = self.buffer.peek() {
if msg.sequence == self.next_sequence {
Expand Down
2 changes: 2 additions & 0 deletions code/crates/test/mbt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ malachitebft-core-types = { workspace = true }
malachitebft-core-votekeeper = { workspace = true }
malachitebft-core-state-machine = { workspace = true }
malachitebft-test = { workspace = true }
malachitebft-starknet-host = { workspace = true }
malachitebft-engine = { workspace = true }

itf = { workspace = true }
rand = { workspace = true }
Expand Down
Empty file added code/crates/test/mbt/log.json
Empty file.
2 changes: 1 addition & 1 deletion code/crates/test/mbt/specs
23 changes: 23 additions & 0 deletions code/crates/test/mbt/src/deserializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use num_bigint::BigInt;
use num_traits::cast::ToPrimitive;
use serde::Deserialize;

use crate::streaming::Message;

pub(crate) fn minus_one_as_none<'de, D>(de: D) -> Result<Option<i64>, D::Error>
where
D: serde::Deserializer<'de>,
Expand All @@ -13,3 +15,24 @@ where
Some(i) => Ok(i.to_i64()),
}
}

// I don't know if this is the right way
// Quint specification has its own Option type that is treated as enum in rust
// so I had to extract message from it and convert it to rust's Option type
#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
#[serde(tag = "tag", content = "value")]
enum MessageOption {
Some(Message),
None,
}

pub(crate) fn quint_option_message<'de, D>(de: D) -> Result<Option<Message>, D::Error>
where
D: serde::Deserializer<'de>,
{
let opt = MessageOption::deserialize(de)?;
match opt {
MessageOption::Some(message) => Ok(Some(message)),
MessageOption::None => Ok(None),
}
}
1 change: 1 addition & 0 deletions code/crates/test/mbt/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod consensus;
pub mod deserializers;
pub mod streaming;
pub mod types;
pub mod utils;
pub mod votekeeper;
Expand Down
102 changes: 102 additions & 0 deletions code/crates/test/mbt/src/streaming.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use crate::deserializers as de;
use itf::de::{As, Integer};
use serde::Deserialize;
use std::{
cmp::Ordering,
collections::{BinaryHeap, HashSet},
};

pub type Sequence = i64;
pub type Payload = String;

#[derive(Clone, Debug, Deserialize)]
pub struct BufferRecord(#[serde(with = "As::<Integer>")] pub Sequence, Message);

impl PartialEq for BufferRecord {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}

impl Eq for BufferRecord {}

impl PartialOrd for BufferRecord {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

// Buffer should act as MinHeap so the oredering is reversed

Check warning on line 29 in code/crates/test/mbt/src/streaming.rs

View workflow job for this annotation

GitHub Actions / Check spelling

"oredering" should be "ordering".
impl Ord for BufferRecord {
fn cmp(&self, other: &Self) -> Ordering {
other.0.cmp(&self.0)
}
}

#[derive(Debug, Deserialize, Clone)]
pub struct Buffer(pub BinaryHeap<BufferRecord>);

impl PartialEq for Buffer {
fn eq(&self, other: &Self) -> bool {
self.0.iter().eq(other.0.iter())
}
}

impl Eq for Buffer {}

impl Buffer {
fn push(&mut self, msg: BufferRecord) {
self.0.push(BufferRecord(msg.0, msg.1));
}

pub fn pop(&mut self) -> Option<BufferRecord> {
self.0.pop().map(|msg| msg)
}

fn peek(&self) -> Option<&BufferRecord> {
self.0.peek().map(|msg| msg)
}
}

#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Hash)]
#[serde(tag = "tag")]
pub enum MessageType {
#[serde(rename = "INIT")]
Init,
#[serde(rename = "DATA")]
Data,
#[serde(rename = "FIN")]
Fin,
}

#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Hash)]
#[serde(rename_all = "camelCase")]
pub struct Message {
#[serde(with = "As::<Integer>")]
pub sequence: Sequence,
pub msg_type: MessageType,
pub payload: Payload,
}

#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StreamState {
pub buffer: Buffer,
#[serde(deserialize_with = "de::quint_option_message")]
pub init_message: Option<Message>,
pub received: HashSet<Message>,
#[serde(with = "As::<Integer>")]
pub next_sequence: Sequence,
#[serde(with = "As::<Integer>")]
pub total_messages: i32,
pub fin_received: bool,
pub emitted: Vec<Message>,
}

#[derive(Clone, Debug, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct State {
pub state: StreamState,
#[serde(deserialize_with = "de::quint_option_message")]
pub incoming_message: Option<Message>,
}
1 change: 1 addition & 0 deletions code/crates/test/mbt/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod consensus;
pub mod streaming;
pub mod votekeeper;
2 changes: 1 addition & 1 deletion code/crates/test/mbt/src/tests/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fn test_itf() {
let quint_seed = quint_seed();

generate_test_traces(
"consensus/consensusTest.qnt",
"consensus/quint/tests/consensus/consensusTest.qnt",
&temp_path.to_string_lossy(),
quint_seed,
);
Expand Down
43 changes: 43 additions & 0 deletions code/crates/test/mbt/src/tests/streaming.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use glob::glob;

use crate::streaming::State;
use crate::utils::{generate_test_traces, quint_seed};

pub mod runner;
pub mod utils;

#[test]
fn test_itf() {
let temp_dir = tempfile::TempDir::with_prefix("informalsystems-malachitebft-part-streaming")
.expect("Failed to create temp dir");
let temp_path = temp_dir.path().to_owned();

if std::env::var("KEEP_TEMP").is_ok() {
std::mem::forget(temp_dir);
}

let quint_seed = quint_seed();

print!("{}\n", temp_path.to_string_lossy());
generate_test_traces(
"starknet/block-streaming/part_stream.qnt",
&temp_path.to_string_lossy(),
quint_seed,
);

for json_fixture in glob(&format!("{}/*.itf.json", temp_path.display()))
.expect("Failed to read glob pattern")
.flatten()
{
println!(
"🚀 Running trace {:?}",
json_fixture.file_name().unwrap().to_str().unwrap()
);

let json = std::fs::read_to_string(&json_fixture).unwrap();
let trace = itf::trace_from_str::<State>(&json).unwrap();

let streaming_runner = runner::StreamingRunner {};
trace.run_on(streaming_runner).unwrap();
}
}
Loading
Loading