Skip to content

Commit

Permalink
Deprecate ClusterState types
Browse files Browse the repository at this point in the history
Summary:
This commit moves cluster.proto to deprecated_cluster.proto mainly but it also does
the following:
- Extract cluster configuration types to their own file.
- Extract partition processor types (Status, RunMode, etc...) to their own file.
  • Loading branch information
muhamadazmy committed Dec 26, 2024
1 parent 3da5356 commit 9ae265e
Show file tree
Hide file tree
Showing 33 changed files with 349 additions and 257 deletions.
8 changes: 8 additions & 0 deletions crates/admin/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.protoc_arg("--experimental_allow_proto3_optional")
.extern_path(".restate.common", "::restate_types::protobuf::common")
.extern_path(".restate.cluster", "::restate_types::protobuf::cluster")
.extern_path(
".restate.deprecated_cluster",
"::restate_types::protobuf::deprecated_cluster",
)
.extern_path(
".restate.cluster_configuration",
"::restate_types::protobuf::cluster_configuration",
)
.compile_protos(
&["./protobuf/cluster_ctrl_svc.proto"],
&["protobuf", "../types/protobuf"],
Expand Down
11 changes: 7 additions & 4 deletions crates/admin/protobuf/cluster_ctrl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
syntax = "proto3";

import "restate/common.proto";
import "restate/cluster.proto";
import "restate/cluster_configuration.proto";
import "restate/deprecated_cluster.proto";
import "google/protobuf/empty.proto";

package restate.cluster_ctrl;
Expand Down Expand Up @@ -43,17 +44,19 @@ service ClusterCtrlSvc {

message SetClusterConfigurationResponse {}
message SetClusterConfigurationRequest {
restate.cluster.ClusterConfiguration cluster_configuration = 1;
restate.cluster_configuration.ClusterConfiguration cluster_configuration = 1;
}

message GetClusterConfigurationRequest {}
message GetClusterConfigurationResponse {
restate.cluster.ClusterConfiguration cluster_configuration = 1;
restate.cluster_configuration.ClusterConfiguration cluster_configuration = 1;
}

message ClusterStateRequest {}

message ClusterStateResponse { restate.cluster.ClusterState cluster_state = 1; }
message ClusterStateResponse {
restate.deprecated_cluster.ClusterState cluster_state = 1;
}

message ListLogsRequest {}

Expand Down
15 changes: 9 additions & 6 deletions crates/admin/src/cluster_controller/cluster_state_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ use restate_core::network::{
use restate_core::{
Metadata, ShutdownError, TaskCenter, TaskCenterFutureExt, TaskHandle, TaskKind,
};
use restate_types::cluster::cluster_state::{
use restate_types::deprecated_cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState, SuspectNode,
};
use restate_types::net::node::GetNodeState;
use restate_types::net::node::GetPartitionsProcessorsState;
use restate_types::time::MillisSinceEpoch;
use restate_types::Version;

pub struct ClusterStateRefresher<T> {
network_sender: Networking<T>,
get_state_router: RpcRouter<GetNodeState>,
get_state_router: RpcRouter<GetPartitionsProcessorsState>,
in_flight_refresh: Option<TaskHandle<anyhow::Result<()>>>,
cluster_state_update_rx: watch::Receiver<Arc<ClusterState>>,
cluster_state_update_tx: Arc<watch::Sender<Arc<ClusterState>>>,
Expand Down Expand Up @@ -99,7 +99,7 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
}

fn start_refresh_task(
get_state_router: RpcRouter<GetNodeState>,
get_state_router: RpcRouter<GetPartitionsProcessorsState>,
network_sender: Networking<T>,
cluster_state_tx: Arc<watch::Sender<Arc<ClusterState>>>,
) -> Result<Option<TaskHandle<anyhow::Result<()>>>, ShutdownError> {
Expand Down Expand Up @@ -134,8 +134,11 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
async move {
match network_sender.node_connection(node_id).await {
Ok(connection) => {
let outgoing = Outgoing::new(node_id, GetNodeState::default())
.assign_connection(connection);
let outgoing = Outgoing::new(
node_id,
GetPartitionsProcessorsState::default(),
)
.assign_connection(connection);

(
node_id,
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::num::NonZeroU16;
use std::time::Duration;

use bytes::{Bytes, BytesMut};
use restate_types::protobuf::cluster::ClusterConfiguration;
use restate_types::protobuf::cluster_configuration::ClusterConfiguration;
use tonic::{async_trait, Request, Response, Status};
use tracing::info;

Expand Down
8 changes: 5 additions & 3 deletions crates/admin/src/cluster_controller/observed_cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use std::collections::{HashMap, HashSet};

use xxhash_rust::xxh3::Xxh3Builder;

use restate_types::cluster::cluster_state::{ClusterState, NodeState, RunMode};
use restate_types::deprecated_cluster::cluster_state::{ClusterState, NodeState};
use restate_types::identifiers::PartitionId;
use restate_types::partition_processor::RunMode;
use restate_types::{GenerationalNodeId, NodeId, PlainNodeId};

/// Represents the scheduler's observed state of the cluster. The scheduler will use this
Expand Down Expand Up @@ -135,10 +136,11 @@ mod tests {
};
use googletest::prelude::{empty, eq};
use googletest::{assert_that, elements_are, unordered_elements_are};
use restate_types::cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode,
use restate_types::deprecated_cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState,
};
use restate_types::identifiers::PartitionId;
use restate_types::partition_processor::{PartitionProcessorStatus, RunMode};
use restate_types::time::MillisSinceEpoch;
use restate_types::{GenerationalNodeId, PlainNodeId, Version};
use std::collections::{BTreeMap, HashMap};
Expand Down
7 changes: 4 additions & 3 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,13 @@ mod tests {
};
use restate_core::network::{ForwardingHandler, Incoming, MessageCollectorMockConnector};
use restate_core::{Metadata, TestCoreEnv, TestCoreEnvBuilder};
use restate_types::cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode,
};
use restate_types::cluster_controller::{
SchedulingPlan, SchedulingPlanBuilder, TargetPartitionState,
};
use restate_types::config::Configuration;
use restate_types::deprecated_cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState,
};
use restate_types::identifiers::{PartitionId, PartitionKey};
use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY;
use restate_types::net::codec::WireDecode;
Expand All @@ -615,6 +615,7 @@ mod tests {
use restate_types::nodes_config::{
LogServerConfig, NodeConfig, NodesConfiguration, Role, StorageState,
};
use restate_types::partition_processor::{PartitionProcessorStatus, RunMode};
use restate_types::partition_table::{PartitionTable, ReplicationStrategy};
use restate_types::time::MillisSinceEpoch;
use restate_types::{GenerationalNodeId, PlainNodeId, Version};
Expand Down
12 changes: 7 additions & 5 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use restate_core::{
cancellation_watcher, Metadata, MetadataWriter, ShutdownError, TargetVersion, TaskCenter,
TaskKind,
};
use restate_types::cluster::cluster_state::ClusterState;
use restate_types::config::{AdminOptions, Configuration};
use restate_types::deprecated_cluster::cluster_state::ClusterState;
use restate_types::health::HealthStatus;
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::live::Live;
Expand Down Expand Up @@ -841,16 +841,18 @@ mod tests {
};
use restate_core::test_env::NoOpMessageHandler;
use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder};
use restate_types::cluster::cluster_state::PartitionProcessorStatus;
use restate_types::config::{AdminOptions, Configuration};
use restate_types::health::HealthStatus;
use restate_types::identifiers::PartitionId;
use restate_types::live::Live;
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::net::node::{GetNodeState, NodeStateResponse};
use restate_types::net::node::{
GetPartitionsProcessorsState, PartitionsProcessorsStateResponse,
};
use restate_types::net::partition_processor_manager::ControlProcessors;
use restate_types::net::AdvertisedAddress;
use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role};
use restate_types::partition_processor::PartitionProcessorStatus;
use restate_types::{GenerationalNodeId, Version};

#[test(restate_core::test)]
Expand Down Expand Up @@ -901,7 +903,7 @@ mod tests {
}

impl MessageHandler for NodeStateHandler {
type MessageType = GetNodeState;
type MessageType = GetPartitionsProcessorsState;

async fn on_message(&self, msg: Incoming<Self::MessageType>) {
if self.block_list.contains(&msg.peer()) {
Expand All @@ -915,7 +917,7 @@ mod tests {
};

let state = [(PartitionId::MIN, partition_processor_status)].into();
let response = msg.to_rpc_response(NodeStateResponse {
let response = msg.to_rpc_response(PartitionsProcessorsStateResponse {
partition_processor_state: Some(state),
});

Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::TransportConnect;
use restate_core::{my_node_id, Metadata, MetadataWriter};
use restate_types::cluster::cluster_state::{AliveNode, NodeState};
use restate_types::config::{AdminOptions, Configuration};
use restate_types::deprecated_cluster::cluster_state::{AliveNode, NodeState};
use restate_types::identifiers::PartitionId;
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::net::metadata::MetadataKind;
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ mod tests {
use restate_test_util::{assert_eq, let_assert};
use restate_types::net::codec::WireDecode;
use restate_types::net::metadata::{GetMetadataRequest, MetadataMessage};
use restate_types::net::node::GetNodeState;
use restate_types::net::node::GetPartitionsProcessorsState;
use restate_types::net::{
AdvertisedAddress, ProtocolVersion, CURRENT_PROTOCOL_VERSION,
MIN_SUPPORTED_PROTOCOL_VERSION,
Expand Down Expand Up @@ -1013,7 +1013,7 @@ mod tests {
.await
.into_test_result()?;

let request = GetNodeState {};
let request = GetPartitionsProcessorsState {};
let partition_table_version = metadata.partition_table_version().next();
let header = Header::new(
metadata.nodes_config_version(),
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/worker_api/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use std::io;
use tokio::sync::{mpsc, oneshot};

use restate_types::{
cluster::cluster_state::PartitionProcessorStatus,
identifiers::{PartitionId, SnapshotId},
partition_processor::PartitionProcessorStatus,
};

use crate::ShutdownError;
Expand Down
18 changes: 9 additions & 9 deletions crates/node/src/roles/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ use restate_core::{
worker_api::ProcessorsManagerHandle,
ShutdownError, TaskCenter, TaskKind,
};
use restate_types::net::node::{GetNodeState, NodeStateResponse};
use restate_types::net::node::{GetPartitionsProcessorsState, PartitionsProcessorsStateResponse};

pub struct BaseRole {
processor_manager_handle: Option<ProcessorsManagerHandle>,
incoming_node_state: MessageStream<GetNodeState>,
processors_state_request_stream: MessageStream<GetPartitionsProcessorsState>,
}

impl BaseRole {
pub fn create(
router_builder: &mut MessageRouterBuilder,
processor_manager_handle: Option<ProcessorsManagerHandle>,
) -> Self {
let incoming_node_state = router_builder.subscribe_to_stream(2);
let processors_state_request_stream = router_builder.subscribe_to_stream(2);

Self {
processor_manager_handle,
incoming_node_state,
processors_state_request_stream,
}
}

Expand All @@ -56,17 +56,17 @@ impl BaseRole {
}

async fn run(mut self) -> anyhow::Result<()> {
while let Some(request) = self.incoming_node_state.next().await {
while let Some(request) = self.processors_state_request_stream.next().await {
// handle request
self.handle_get_node_state(request).await?;
self.handle_get_partitions_processors_state(request).await?;
}

Ok(())
}

async fn handle_get_node_state(
async fn handle_get_partitions_processors_state(
&self,
msg: Incoming<GetNodeState>,
msg: Incoming<GetPartitionsProcessorsState>,
) -> Result<(), ShutdownError> {
let partition_state = if let Some(ref handle) = self.processor_manager_handle {
Some(handle.get_state().await?)
Expand All @@ -76,7 +76,7 @@ impl BaseRole {

// only return error if Shutdown
if let Err(NetworkError::Shutdown(err)) = msg
.to_rpc_response(NodeStateResponse {
.to_rpc_response(PartitionsProcessorsStateResponse {
partition_processor_state: partition_state,
})
.try_send()
Expand Down
8 changes: 5 additions & 3 deletions crates/types/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,18 @@ fn build_restate_proto(out_dir: &Path) -> std::io::Result<()> {
)
.enum_attribute("Message.body", "#[derive(::derive_more::IsVariant)]")
.btree_map([
".restate.cluster.ClusterState",
".restate.cluster.AliveNode",
".restate.deprecated_cluster.ClusterState",
".restate.deprecated_cluster.AliveNode",
])
.file_descriptor_set_path(out_dir.join("common_descriptor.bin"))
// allow older protobuf compiler to be used
.protoc_arg("--experimental_allow_proto3_optional")
.compile_protos(
&[
"./protobuf/restate/common.proto",
"./protobuf/restate/cluster.proto",
"./protobuf/restate/partition_processor.proto",
"./protobuf/restate/cluster_configuration.proto",
"./protobuf/restate/deprecated_cluster.proto",
"./protobuf/restate/log_server_common.proto",
"./protobuf/restate/node.proto",
],
Expand Down
Loading

0 comments on commit 9ae265e

Please sign in to comment.