Skip to content

Commit

Permalink
Node state gossip
Browse files Browse the repository at this point in the history
Summary:
First iteration of gossip protocol to share node state
  • Loading branch information
muhamadazmy committed Nov 19, 2024
1 parent 1116010 commit 61fe276
Show file tree
Hide file tree
Showing 17 changed files with 637 additions and 379 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::TransportConnect;
use restate_core::{Metadata, MetadataWriter};
use restate_types::cluster::cluster_state::{AliveNode, NodeState};
use restate_types::cluster::cluster_state::{AliveNode, ClusterStateWatch, NodeState};
use restate_types::config::{AdminOptions, Configuration};
use restate_types::identifiers::PartitionId;
use restate_types::live::Live;
Expand All @@ -31,7 +31,6 @@ use restate_types::net::metadata::MetadataKind;
use restate_types::partition_table::PartitionTable;
use restate_types::{GenerationalNodeId, Version};

use super::cluster_state_refresher::ClusterStateWatcher;
use crate::cluster_controller::logs_controller::{
LogsBasedPartitionProcessorPlacementHints, LogsController,
};
Expand Down Expand Up @@ -93,7 +92,7 @@ pub(crate) struct Leader<T> {
log_trim_interval: Option<Interval>,
logs_controller: LogsController,
scheduler: Scheduler<T>,
cluster_state_watcher: ClusterStateWatcher,
cluster_state_watch: ClusterStateWatch,
logs: Live<Logs>,
log_trim_threshold: Lsn,
}
Expand Down Expand Up @@ -138,7 +137,7 @@ where
nodes_config: service.metadata.updateable_nodes_config(),
partition_table: service.metadata.updateable_partition_table(),
partition_table_watcher: service.metadata.watch(MetadataKind::PartitionTable),
cluster_state_watcher: service.cluster_state_refresher.cluster_state_watcher(),
cluster_state_watch: service.cluster_state_watch.clone(),
find_logs_tail_interval,
log_trim_interval,
log_trim_threshold,
Expand Down Expand Up @@ -218,7 +217,7 @@ where
&self,
bifrost_admin: BifrostAdmin<'_>,
) -> Result<(), restate_bifrost::Error> {
let cluster_state = self.cluster_state_watcher.current();
let cluster_state = self.cluster_state_watch.borrow().clone();

let mut persisted_lsns_per_partition: BTreeMap<
PartitionId,
Expand Down
242 changes: 0 additions & 242 deletions crates/admin/src/cluster_controller/cluster_state_refresher.rs

This file was deleted.

2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
.map_err(|_| Status::aborted("Node is shutting down"))?;

let resp = ClusterStateResponse {
cluster_state: Some((*cluster_state).clone().into()),
cluster_state: Some(cluster_state.into()),
};
Ok(Response::new(resp))
}
Expand Down
1 change: 0 additions & 1 deletion crates/admin/src/cluster_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
// by the Apache License, Version 2.0.

mod cluster_controller_state;
pub mod cluster_state_refresher;
pub mod grpc_svc_handler;
mod logs_controller;
mod observed_cluster_state;
Expand Down
Loading

0 comments on commit 61fe276

Please sign in to comment.