Skip to content

Commit

Permalink
Add VerifyVoteExtension effect, host and app messages
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Jan 30, 2025
1 parent 4d1333f commit e27f8e3
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 6 deletions.
22 changes: 22 additions & 0 deletions code/crates/app-channel/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,28 @@ where
reply_to.send(rx.await?)?;
}

HostMsg::VerifyVoteExtension {
height,
round,
value_id,
extension,
reply_to,
} => {
let (reply, rx) = oneshot::channel();

self.sender
.send(AppMsg::VerifyVoteExtension {
height,
round,
value_id,
extension,
reply,
})
.await?;

reply_to.send(rx.await?)?;
}

HostMsg::RestreamValue {
height,
round,
Expand Down
13 changes: 13 additions & 0 deletions code/crates/app-channel/src/msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

use bytes::Bytes;
use derive_where::derive_where;
use malachitebft_app::consensus::VoteExtensionError;
use tokio::sync::mpsc;
use tokio::sync::oneshot;

Expand Down Expand Up @@ -73,6 +74,18 @@ pub enum AppMsg<Ctx: Context> {
reply: Reply<Option<Ctx::Extension>>,
},

/// Verify a vote extension
///
/// If the vote extension is deemed invalid, the vote it was part of
/// will be discarded altogether.
VerifyVoteExtension {
height: Ctx::Height,
round: Round,
value_id: ValueId<Ctx>,
extension: Ctx::Extension,
reply: Reply<Result<(), VoteExtensionError>>,
},

/// Requests the application to re-stream a proposal that it has already seen.
///
/// The application MUST re-publish again all the proposal parts pertaining
Expand Down
36 changes: 34 additions & 2 deletions code/crates/core-consensus/src/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use malachitebft_core_types::*;

use crate::input::RequestId;
use crate::types::SignedConsensusMsg;
use crate::ConsensusMsg;
use crate::{ConsensusMsg, VoteExtensionError};

/// Provides a way to construct the appropriate [`Resume`] value to
/// resume execution after handling an [`Effect`].
Expand Down Expand Up @@ -193,6 +193,21 @@ where
ThresholdParams,
resume::CertificateValidity,
),

/// Verify a vote extension
///
/// If the vote extension is deemed invalid, the vote it was part of
/// will be discarded altogether.
///
/// Resume with: [`resume::VoteExtensionValidity`]
VerifyVoteExtension(
Ctx::Height,
Round,
ValueId<Ctx>,
SignedExtension<Ctx>,
PublicKey<Ctx>,
resume::VoteExtensionValidity,
),
}

/// A value with which the consensus process can be resumed after yielding an [`Effect`].
Expand Down Expand Up @@ -223,14 +238,20 @@ where
/// Resume execution with the signed proposal
SignedProposal(SignedMessage<Ctx, Ctx::Proposal>),

/// An optional vote extension. See the [`Effect::ExtendVote`] effect for more information.
/// Resume with an optional vote extension.
/// See the [`Effect::ExtendVote`] effect for more information.
VoteExtension(Option<SignedExtension<Ctx>>),

/// Resume execution with the result of the verification of the [`SignedExtension`]
VoteExtensionValidity(Result<(), VoteExtensionError>),

/// Resume execution with the result of the verification of the [`CommitCertificate`]
CertificateValidity(Result<(), CertificateError<Ctx>>),
}

pub mod resume {
use crate::VoteExtensionError;

use super::*;

#[derive(Debug, Default)]
Expand Down Expand Up @@ -309,4 +330,15 @@ pub mod resume {
Resume::CertificateValidity(value)
}
}

#[derive(Debug, Default)]
pub struct VoteExtensionValidity;

impl<Ctx: Context> Resumable<Ctx> for VoteExtensionValidity {
type Value = Result<(), VoteExtensionError>;

fn resume_with(self, value: Self::Value) -> Resume<Ctx> {
Resume::VoteExtensionValidity(value)
}
}
}
6 changes: 6 additions & 0 deletions code/crates/core-consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,9 @@ pub struct ProposedValue<Ctx: Context> {
pub value: Ctx::Value,
pub validity: Validity,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum VoteExtensionError {
InvalidSignature,
InvalidVoteExtension,
}
41 changes: 39 additions & 2 deletions code/crates/engine/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use tracing::{debug, error, info, warn};

use malachitebft_codec as codec;
use malachitebft_config::TimeoutConfig;
use malachitebft_core_consensus::{Effect, PeerId, Resumable, Resume, SignedConsensusMsg};
use malachitebft_core_consensus::{
Effect, PeerId, Resumable, Resume, SignedConsensusMsg, VoteExtensionError,
};
use malachitebft_core_types::{
Context, Round, SigningProvider, SigningProviderExt, Timeout, TimeoutKind, ValidatorSet,
ValueId, ValueOrigin,
Expand Down Expand Up @@ -718,6 +720,23 @@ where
.map_err(|e| eyre!("Failed to get earliest block height: {e:?}").into())
}

async fn verify_vote_extension(
&self,
height: Ctx::Height,
round: Round,
value_id: ValueId<Ctx>,
extension: Ctx::Extension,
) -> Result<Result<(), VoteExtensionError>, ActorProcessingErr> {
ractor::call!(self.host, |reply_to| HostMsg::VerifyVoteExtension {
height,
round,
value_id,
extension,
reply_to
})
.map_err(|e| eyre!("Failed to verify vote extension: {e:?}").into())
}

async fn get_history_min_height(&self) -> Result<Ctx::Height, ActorProcessingErr> {
ractor::call!(self.host, |reply_to| HostMsg::GetHistoryMinHeight {
reply_to
Expand Down Expand Up @@ -886,9 +905,27 @@ where
}
}

Effect::VerifyVoteExtension(height, round, value_id, signed_extension, pk, r) => {
let valid = self.signing_provider.verify_signed_vote_extension(
&signed_extension.message,
&signed_extension.signature,
&pk,
);

if !valid {
return Ok(r.resume_with(Err(VoteExtensionError::InvalidSignature)));
}

let result = self
.verify_vote_extension(height, round, value_id, signed_extension.message)
.await?;

Ok(r.resume_with(result))
}

Effect::Publish(msg, r) => {
// Sync the WAL to disk before we broadcast the message
// NOTE: The message has already been append to the WAL by the `PersistMessage` effect.
// NOTE: The message has already been append to the WAL by the `WalAppendMessage` effect.
self.wal_flush(phase).await?;

// Notify any subscribers that we are about to publish a message
Expand Down
14 changes: 13 additions & 1 deletion code/crates/engine/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;
use derive_where::derive_where;
use ractor::{ActorRef, RpcReplyPort};

use malachitebft_core_consensus::PeerId;
use malachitebft_core_consensus::{PeerId, VoteExtensionError};
use malachitebft_core_types::{CommitCertificate, Context, Round, ValueId, VoteExtensions};
use malachitebft_sync::RawDecidedValue;

Expand Down Expand Up @@ -50,6 +50,18 @@ pub enum HostMsg<Ctx: Context> {
reply_to: RpcReplyPort<Option<Ctx::Extension>>,
},

/// Verify a vote extension
///
/// If the vote extension is deemed invalid, the vote it was part of
/// will be discarded altogether.
VerifyVoteExtension {
height: Ctx::Height,
round: Round,
value_id: ValueId<Ctx>,
extension: Ctx::Extension,
reply_to: RpcReplyPort<Result<(), VoteExtensionError>>,
},

/// Request to restream an existing block/value from Driver
RestreamValue {
height: Ctx::Height,
Expand Down
25 changes: 24 additions & 1 deletion code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use rand::SeedableRng;
use tokio::time::Instant;
use tracing::{debug, error, info, trace, warn};

use malachitebft_core_consensus::PeerId;
use malachitebft_core_consensus::{PeerId, VoteExtensionError};
use malachitebft_core_types::{
CommitCertificate, Round, Validity, ValueId, ValueOrigin, VoteExtensions,
};
Expand Down Expand Up @@ -144,6 +144,16 @@ impl Host {
reply_to,
} => on_extend_vote(state, height, round, value_id, reply_to).await,

HostMsg::VerifyVoteExtension {
height,
round,
value_id,
extension,
reply_to,
} => {
on_verify_vote_extension(state, height, round, value_id, extension, reply_to).await
}

HostMsg::RestreamValue {
height,
round,
Expand Down Expand Up @@ -383,6 +393,19 @@ async fn on_extend_vote(
Ok(())
}

async fn on_verify_vote_extension(
_state: &mut HostState,
_height: Height,
_round: Round,
_value_id: ValueId<MockContext>,
_extension: Bytes,
reply_to: RpcReplyPort<Result<(), VoteExtensionError>>,
) -> Result<(), ActorProcessingErr> {
// TODO
reply_to.send(Ok(()))?;
Ok(())
}

/// If we have already built a block for this height and round, return it to consensus
/// This may happen when we are restarting after a crash and replaying the WAL.
async fn find_previously_built_value(
Expand Down
13 changes: 13 additions & 0 deletions code/examples/channel/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,19 @@ pub async fn run(state: &mut State, channels: &mut Channels<TestContext>) -> eyr
}
}

AppMsg::VerifyVoteExtension {
height: _,
round: _,
value_id: _,
extension: _,
reply,
} => {
// TODO
if reply.send(Ok(())).is_err() {
error!("Failed to send VerifyVoteExtension reply");
}
}

// On the receiving end of these proposal parts (ie. when we are not the proposer),
// we need to process these parts and re-assemble the full value.
// To this end, we store each part that we receive and assemble the full value once we
Expand Down

0 comments on commit e27f8e3

Please sign in to comment.