diff --git a/crates/admin/build.rs b/crates/admin/build.rs index fa2d555b95..7ab0ac102c 100644 --- a/crates/admin/build.rs +++ b/crates/admin/build.rs @@ -22,6 +22,14 @@ fn main() -> Result<(), Box> { .protoc_arg("--experimental_allow_proto3_optional") .extern_path(".restate.common", "::restate_types::protobuf::common") .extern_path(".restate.cluster", "::restate_types::protobuf::cluster") + .extern_path( + ".restate.deprecated_cluster", + "::restate_types::protobuf::deprecated_cluster", + ) + .extern_path( + ".restate.cluster_configuration", + "::restate_types::protobuf::cluster_configuration", + ) .compile_protos( &["./protobuf/cluster_ctrl_svc.proto"], &["protobuf", "../types/protobuf"], diff --git a/crates/admin/protobuf/cluster_ctrl_svc.proto b/crates/admin/protobuf/cluster_ctrl_svc.proto index a4342bd28b..20e5e832f0 100644 --- a/crates/admin/protobuf/cluster_ctrl_svc.proto +++ b/crates/admin/protobuf/cluster_ctrl_svc.proto @@ -10,7 +10,8 @@ syntax = "proto3"; import "restate/common.proto"; -import "restate/cluster.proto"; +import "restate/cluster_configuration.proto"; +import "restate/deprecated_cluster.proto"; import "google/protobuf/empty.proto"; package restate.cluster_ctrl; @@ -43,17 +44,19 @@ service ClusterCtrlSvc { message SetClusterConfigurationResponse {} message SetClusterConfigurationRequest { - restate.cluster.ClusterConfiguration cluster_configuration = 1; + restate.cluster_configuration.ClusterConfiguration cluster_configuration = 1; } message GetClusterConfigurationRequest {} message GetClusterConfigurationResponse { - restate.cluster.ClusterConfiguration cluster_configuration = 1; + restate.cluster_configuration.ClusterConfiguration cluster_configuration = 1; } message ClusterStateRequest {} -message ClusterStateResponse { restate.cluster.ClusterState cluster_state = 1; } +message ClusterStateResponse { + restate.deprecated_cluster.ClusterState cluster_state = 1; +} message ListLogsRequest {} diff --git a/crates/admin/src/cluster_controller/cluster_state_refresher.rs b/crates/admin/src/cluster_controller/cluster_state_refresher.rs index 1732d68eb4..a65b224f68 100644 --- a/crates/admin/src/cluster_controller/cluster_state_refresher.rs +++ b/crates/admin/src/cluster_controller/cluster_state_refresher.rs @@ -22,16 +22,16 @@ use restate_core::network::{ use restate_core::{ Metadata, ShutdownError, TaskCenter, TaskCenterFutureExt, TaskHandle, TaskKind, }; -use restate_types::cluster::cluster_state::{ +use restate_types::deprecated_cluster::cluster_state::{ AliveNode, ClusterState, DeadNode, NodeState, SuspectNode, }; -use restate_types::net::node::GetNodeState; +use restate_types::net::node::GetPartitionsProcessorsState; use restate_types::time::MillisSinceEpoch; use restate_types::Version; pub struct ClusterStateRefresher { network_sender: Networking, - get_state_router: RpcRouter, + get_state_router: RpcRouter, in_flight_refresh: Option>>, cluster_state_update_rx: watch::Receiver>, cluster_state_update_tx: Arc>>, @@ -99,7 +99,7 @@ impl ClusterStateRefresher { } fn start_refresh_task( - get_state_router: RpcRouter, + get_state_router: RpcRouter, network_sender: Networking, cluster_state_tx: Arc>>, ) -> Result>>, ShutdownError> { @@ -134,8 +134,11 @@ impl ClusterStateRefresher { async move { match network_sender.node_connection(node_id).await { Ok(connection) => { - let outgoing = Outgoing::new(node_id, GetNodeState::default()) - .assign_connection(connection); + let outgoing = Outgoing::new( + node_id, + GetPartitionsProcessorsState::default(), + ) + .assign_connection(connection); ( node_id, diff --git a/crates/admin/src/cluster_controller/grpc_svc_handler.rs b/crates/admin/src/cluster_controller/grpc_svc_handler.rs index f9c8202393..18ac5b5bce 100644 --- a/crates/admin/src/cluster_controller/grpc_svc_handler.rs +++ b/crates/admin/src/cluster_controller/grpc_svc_handler.rs @@ -12,7 +12,7 @@ use std::num::NonZeroU16; use std::time::Duration; use bytes::{Bytes, BytesMut}; -use restate_types::protobuf::cluster::ClusterConfiguration; +use restate_types::protobuf::cluster_configuration::ClusterConfiguration; use tonic::{async_trait, Request, Response, Status}; use tracing::info; diff --git a/crates/admin/src/cluster_controller/observed_cluster_state.rs b/crates/admin/src/cluster_controller/observed_cluster_state.rs index 2ec51276d0..ec41a2199f 100644 --- a/crates/admin/src/cluster_controller/observed_cluster_state.rs +++ b/crates/admin/src/cluster_controller/observed_cluster_state.rs @@ -12,8 +12,9 @@ use std::collections::{HashMap, HashSet}; use xxhash_rust::xxh3::Xxh3Builder; -use restate_types::cluster::cluster_state::{ClusterState, NodeState, RunMode}; +use restate_types::deprecated_cluster::cluster_state::{ClusterState, NodeState}; use restate_types::identifiers::PartitionId; +use restate_types::partition_processor::RunMode; use restate_types::{GenerationalNodeId, NodeId, PlainNodeId}; /// Represents the scheduler's observed state of the cluster. The scheduler will use this @@ -135,10 +136,11 @@ mod tests { }; use googletest::prelude::{empty, eq}; use googletest::{assert_that, elements_are, unordered_elements_are}; - use restate_types::cluster::cluster_state::{ - AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode, + use restate_types::deprecated_cluster::cluster_state::{ + AliveNode, ClusterState, DeadNode, NodeState, }; use restate_types::identifiers::PartitionId; + use restate_types::partition_processor::{PartitionProcessorStatus, RunMode}; use restate_types::time::MillisSinceEpoch; use restate_types::{GenerationalNodeId, PlainNodeId, Version}; use std::collections::{BTreeMap, HashMap}; diff --git a/crates/admin/src/cluster_controller/scheduler.rs b/crates/admin/src/cluster_controller/scheduler.rs index e7ab524beb..6dd33922cb 100644 --- a/crates/admin/src/cluster_controller/scheduler.rs +++ b/crates/admin/src/cluster_controller/scheduler.rs @@ -600,13 +600,13 @@ mod tests { }; use restate_core::network::{ForwardingHandler, Incoming, MessageCollectorMockConnector}; use restate_core::{Metadata, TestCoreEnv, TestCoreEnvBuilder}; - use restate_types::cluster::cluster_state::{ - AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode, - }; use restate_types::cluster_controller::{ SchedulingPlan, SchedulingPlanBuilder, TargetPartitionState, }; use restate_types::config::Configuration; + use restate_types::deprecated_cluster::cluster_state::{ + AliveNode, ClusterState, DeadNode, NodeState, + }; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY; use restate_types::net::codec::WireDecode; @@ -615,6 +615,7 @@ mod tests { use restate_types::nodes_config::{ LogServerConfig, NodeConfig, NodesConfiguration, Role, StorageState, }; + use restate_types::partition_processor::{PartitionProcessorStatus, RunMode}; use restate_types::partition_table::{PartitionTable, ReplicationStrategy}; use restate_types::time::MillisSinceEpoch; use restate_types::{GenerationalNodeId, PlainNodeId, Version}; diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index e5694601fc..a7127c27f5 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -47,8 +47,8 @@ use restate_core::{ cancellation_watcher, Metadata, MetadataWriter, ShutdownError, TargetVersion, TaskCenter, TaskKind, }; -use restate_types::cluster::cluster_state::ClusterState; use restate_types::config::{AdminOptions, Configuration}; +use restate_types::deprecated_cluster::cluster_state::ClusterState; use restate_types::health::HealthStatus; use restate_types::identifiers::{PartitionId, SnapshotId}; use restate_types::live::Live; @@ -842,16 +842,18 @@ mod tests { }; use restate_core::test_env::NoOpMessageHandler; use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder}; - use restate_types::cluster::cluster_state::PartitionProcessorStatus; use restate_types::config::{AdminOptions, Configuration}; use restate_types::health::HealthStatus; use restate_types::identifiers::PartitionId; use restate_types::live::Live; use restate_types::logs::{LogId, Lsn, SequenceNumber}; - use restate_types::net::node::{GetNodeState, NodeStateResponse}; + use restate_types::net::node::{ + GetPartitionsProcessorsState, PartitionsProcessorsStateResponse, + }; use restate_types::net::partition_processor_manager::ControlProcessors; use restate_types::net::AdvertisedAddress; use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; + use restate_types::partition_processor::PartitionProcessorStatus; use restate_types::{GenerationalNodeId, Version}; #[test(restate_core::test)] @@ -902,7 +904,7 @@ mod tests { } impl MessageHandler for NodeStateHandler { - type MessageType = GetNodeState; + type MessageType = GetPartitionsProcessorsState; async fn on_message(&self, msg: Incoming) { if self.block_list.contains(&msg.peer()) { @@ -916,7 +918,7 @@ mod tests { }; let state = [(PartitionId::MIN, partition_processor_status)].into(); - let response = msg.to_rpc_response(NodeStateResponse { + let response = msg.to_rpc_response(PartitionsProcessorsStateResponse { partition_processor_state: Some(state), }); diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b55488393..6c75ba3863 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -21,8 +21,8 @@ use restate_bifrost::{Bifrost, BifrostAdmin}; use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::TransportConnect; use restate_core::{my_node_id, Metadata, MetadataWriter}; -use restate_types::cluster::cluster_state::{AliveNode, NodeState}; use restate_types::config::{AdminOptions, Configuration}; +use restate_types::deprecated_cluster::cluster_state::{AliveNode, NodeState}; use restate_types::identifiers::PartitionId; use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_types::net::metadata::MetadataKind; diff --git a/crates/core/src/network/connection_manager.rs b/crates/core/src/network/connection_manager.rs index 6ce8dc4ed2..5f7dadbf55 100644 --- a/crates/core/src/network/connection_manager.rs +++ b/crates/core/src/network/connection_manager.rs @@ -766,7 +766,7 @@ mod tests { use restate_test_util::{assert_eq, let_assert}; use restate_types::net::codec::WireDecode; use restate_types::net::metadata::{GetMetadataRequest, MetadataMessage}; - use restate_types::net::node::GetNodeState; + use restate_types::net::node::GetPartitionsProcessorsState; use restate_types::net::{ AdvertisedAddress, ProtocolVersion, CURRENT_PROTOCOL_VERSION, MIN_SUPPORTED_PROTOCOL_VERSION, @@ -1013,7 +1013,7 @@ mod tests { .await .into_test_result()?; - let request = GetNodeState {}; + let request = GetPartitionsProcessorsState {}; let partition_table_version = metadata.partition_table_version().next(); let header = Header::new( metadata.nodes_config_version(), diff --git a/crates/core/src/worker_api/partition_processor_manager.rs b/crates/core/src/worker_api/partition_processor_manager.rs index 10fec1904c..0fa3d58c47 100644 --- a/crates/core/src/worker_api/partition_processor_manager.rs +++ b/crates/core/src/worker_api/partition_processor_manager.rs @@ -14,8 +14,8 @@ use std::io; use tokio::sync::{mpsc, oneshot}; use restate_types::{ - cluster::cluster_state::PartitionProcessorStatus, identifiers::{PartitionId, SnapshotId}, + partition_processor::PartitionProcessorStatus, }; use crate::ShutdownError; diff --git a/crates/node/src/roles/base.rs b/crates/node/src/roles/base.rs index e6140dea44..2a40bc3d0a 100644 --- a/crates/node/src/roles/base.rs +++ b/crates/node/src/roles/base.rs @@ -17,11 +17,11 @@ use restate_core::{ worker_api::ProcessorsManagerHandle, ShutdownError, TaskCenter, TaskKind, }; -use restate_types::net::node::{GetNodeState, NodeStateResponse}; +use restate_types::net::node::{GetPartitionsProcessorsState, PartitionsProcessorsStateResponse}; pub struct BaseRole { processor_manager_handle: Option, - incoming_node_state: MessageStream, + processors_state_request_stream: MessageStream, } impl BaseRole { @@ -29,11 +29,11 @@ impl BaseRole { router_builder: &mut MessageRouterBuilder, processor_manager_handle: Option, ) -> Self { - let incoming_node_state = router_builder.subscribe_to_stream(2); + let processors_state_request_stream = router_builder.subscribe_to_stream(2); Self { processor_manager_handle, - incoming_node_state, + processors_state_request_stream, } } @@ -56,17 +56,17 @@ impl BaseRole { } async fn run(mut self) -> anyhow::Result<()> { - while let Some(request) = self.incoming_node_state.next().await { + while let Some(request) = self.processors_state_request_stream.next().await { // handle request - self.handle_get_node_state(request).await?; + self.handle_get_partitions_processors_state(request).await?; } Ok(()) } - async fn handle_get_node_state( + async fn handle_get_partitions_processors_state( &self, - msg: Incoming, + msg: Incoming, ) -> Result<(), ShutdownError> { let partition_state = if let Some(ref handle) = self.processor_manager_handle { Some(handle.get_state().await?) @@ -76,7 +76,7 @@ impl BaseRole { // only return error if Shutdown if let Err(NetworkError::Shutdown(err)) = msg - .to_rpc_response(NodeStateResponse { + .to_rpc_response(PartitionsProcessorsStateResponse { partition_processor_state: partition_state, }) .try_send() diff --git a/crates/types/build.rs b/crates/types/build.rs index 3fc94b90ea..b3624f6a51 100644 --- a/crates/types/build.rs +++ b/crates/types/build.rs @@ -92,8 +92,8 @@ fn build_restate_proto(out_dir: &Path) -> std::io::Result<()> { ) .enum_attribute("Message.body", "#[derive(::derive_more::IsVariant)]") .btree_map([ - ".restate.cluster.ClusterState", - ".restate.cluster.AliveNode", + ".restate.deprecated_cluster.ClusterState", + ".restate.deprecated_cluster.AliveNode", ]) .file_descriptor_set_path(out_dir.join("common_descriptor.bin")) // allow older protobuf compiler to be used @@ -101,7 +101,9 @@ fn build_restate_proto(out_dir: &Path) -> std::io::Result<()> { .compile_protos( &[ "./protobuf/restate/common.proto", - "./protobuf/restate/cluster.proto", + "./protobuf/restate/partition_processor.proto", + "./protobuf/restate/cluster_configuration.proto", + "./protobuf/restate/deprecated_cluster.proto", "./protobuf/restate/log_server_common.proto", "./protobuf/restate/node.proto", ], diff --git a/crates/types/protobuf/restate/cluster.proto b/crates/types/protobuf/restate/cluster.proto deleted file mode 100644 index d5dfb65e53..0000000000 --- a/crates/types/protobuf/restate/cluster.proto +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -syntax = "proto3"; - -import "restate/common.proto"; -import "google/protobuf/empty.proto"; -import "google/protobuf/duration.proto"; -import "google/protobuf/timestamp.proto"; - -package restate.cluster; - -message ClusterState { - google.protobuf.Duration last_refreshed = 1; - restate.common.Version nodes_config_version = 2; - restate.common.Version partition_table_version = 3; - map nodes = 4; - restate.common.Version logs_metadata_version = 5; -} - -message NodeState { - oneof state { - AliveNode alive = 1; - DeadNode dead = 2; - SuspectNode suspect = 3; - } -} - -message SuspectNode { - restate.common.NodeId generational_node_id = 1; - google.protobuf.Timestamp last_attempt = 2; -} - -message AliveNode { - restate.common.NodeId generational_node_id = 1; - google.protobuf.Timestamp last_heartbeat_at = 2; - // partition id is u16 but protobuf doesn't support u16. This must be a value - // that's safe to convert to u16 - map partitions = 3; -} - -message DeadNode { google.protobuf.Timestamp last_seen_alive = 1; } - -enum RunMode { - RunMode_UNKNOWN = 0; - LEADER = 1; - FOLLOWER = 2; -} - -enum ReplayStatus { - ReplayStatus_UNKNOWN = 0; - STARTING = 1; - ACTIVE = 2; - CATCHING_UP = 3; -} - -message PartitionProcessorStatus { - google.protobuf.Timestamp updated_at = 1; - RunMode planned_mode = 2; - RunMode effective_mode = 3; - optional restate.common.LeaderEpoch last_observed_leader_epoch = 4; - optional restate.common.NodeId last_observed_leader_node = 5; - optional restate.common.Lsn last_applied_log_lsn = 6; - optional google.protobuf.Timestamp last_record_applied_at = 7; - uint64 num_skipped_records = 8; - ReplayStatus replay_status = 9; - optional restate.common.Lsn last_persisted_log_lsn = 10; - optional restate.common.Lsn last_archived_log_lsn = 12; - // Set if replay_status is CATCHING_UP - optional restate.common.Lsn target_tail_lsn = 11; -} - -enum NodeSetSelectionStrategyKind { - NodeSetSelectionStrategyKind_UNKNOWN = 0; - StrictFaultTolerantGreedy = 1; -} - -message NodeSetSelectionStrategy { NodeSetSelectionStrategyKind kind = 1; } - -message ReplicatedProviderConfig { - string replication_property = 1; - NodeSetSelectionStrategy nodeset_selection_strategy = 2; -} - -message DefaultProvider { - string provider = 1; - // only required if provider = "replicated" - optional ReplicatedProviderConfig replicated_config = 2; -} - -enum ReplicationStrategyKind { - ReplicationStrategyKind_UNKNOWN = 0; - OnAllNodes = 1; - Factor = 2; -} - -message ReplicationStrategy { - ReplicationStrategyKind kind = 1; - // required if kind == "Factor" - optional uint32 factor = 2; -} - -message ClusterConfiguration { - uint32 num_partitions = 1; - ReplicationStrategy replication_strategy = 2; - DefaultProvider default_provider = 3; -} diff --git a/crates/types/protobuf/restate/cluster_configuration.proto b/crates/types/protobuf/restate/cluster_configuration.proto new file mode 100644 index 0000000000..00e77baf32 --- /dev/null +++ b/crates/types/protobuf/restate/cluster_configuration.proto @@ -0,0 +1,53 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +import "restate/common.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; + +package restate.cluster_configuration; + +enum NodeSetSelectionStrategyKind { + NodeSetSelectionStrategyKind_UNKNOWN = 0; + StrictFaultTolerantGreedy = 1; +} + +message NodeSetSelectionStrategy { NodeSetSelectionStrategyKind kind = 1; } + +message ReplicatedProviderConfig { + string replication_property = 1; + NodeSetSelectionStrategy nodeset_selection_strategy = 2; +} + +message DefaultProvider { + string provider = 1; + // only required if provider = "replicated" + optional ReplicatedProviderConfig replicated_config = 2; +} + +enum ReplicationStrategyKind { + ReplicationStrategyKind_UNKNOWN = 0; + OnAllNodes = 1; + Factor = 2; +} + +message ReplicationStrategy { + ReplicationStrategyKind kind = 1; + // required if kind == "Factor" + optional uint32 factor = 2; +} + +message ClusterConfiguration { + uint32 num_partitions = 1; + ReplicationStrategy replication_strategy = 2; + DefaultProvider default_provider = 3; +} diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index eb8488b521..9c86f67585 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -71,8 +71,9 @@ enum TargetName { PARTITION_PROCESSOR_RPC = 52; PARTITION_PROCESSOR_RPC_RESPONSE = 53; // Node - NODE_GET_NODE_STATE_REQUEST = 60; - NODE_GET_NODE_STATE_RESPONSE = 61; + NODE_GET_PARTITIONS_PROCESSORS_STATE_REQUEST = 60; + NODE_GET_PARTITIONS_PROCESSORS_STATE_RESPONSE = 61; + // Remote Scanner REMOTE_QUERY_SCANNER_OPEN = 80; REMOTE_QUERY_SCANNER_OPENED = 81; diff --git a/crates/types/protobuf/restate/deprecated_cluster.proto b/crates/types/protobuf/restate/deprecated_cluster.proto new file mode 100644 index 0000000000..3858b905f2 --- /dev/null +++ b/crates/types/protobuf/restate/deprecated_cluster.proto @@ -0,0 +1,51 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +import "restate/common.proto"; +import "restate/partition_processor.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; + +package restate.deprecated_cluster; + +message ClusterState { + google.protobuf.Duration last_refreshed = 1; + restate.common.Version nodes_config_version = 2; + restate.common.Version partition_table_version = 3; + map nodes = 4; + restate.common.Version logs_metadata_version = 5; +} + +message NodeState { + oneof state { + AliveNode alive = 1; + DeadNode dead = 2; + SuspectNode suspect = 3; + } +} + +message SuspectNode { + restate.common.NodeId generational_node_id = 1; + google.protobuf.Timestamp last_attempt = 2; +} + +message AliveNode { + restate.common.NodeId generational_node_id = 1; + google.protobuf.Timestamp last_heartbeat_at = 2; + // partition id is u16 but protobuf doesn't support u16. This must be a value + // that's safe to convert to u16 + map partitions = + 3; +} + +message DeadNode { google.protobuf.Timestamp last_seen_alive = 1; } diff --git a/crates/types/protobuf/restate/partition_processor.proto b/crates/types/protobuf/restate/partition_processor.proto new file mode 100644 index 0000000000..ad75b9d6ad --- /dev/null +++ b/crates/types/protobuf/restate/partition_processor.proto @@ -0,0 +1,47 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +import "restate/common.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; + +package restate.partition_processor; + +enum RunMode { + RunMode_UNKNOWN = 0; + LEADER = 1; + FOLLOWER = 2; +} + +enum ReplayStatus { + ReplayStatus_UNKNOWN = 0; + STARTING = 1; + ACTIVE = 2; + CATCHING_UP = 3; +} + +message PartitionProcessorStatus { + google.protobuf.Timestamp updated_at = 1; + RunMode planned_mode = 2; + RunMode effective_mode = 3; + optional restate.common.LeaderEpoch last_observed_leader_epoch = 4; + optional restate.common.NodeId last_observed_leader_node = 5; + optional restate.common.Lsn last_applied_log_lsn = 6; + optional google.protobuf.Timestamp last_record_applied_at = 7; + uint64 num_skipped_records = 8; + ReplayStatus replay_status = 9; + optional restate.common.Lsn last_persisted_log_lsn = 10; + optional restate.common.Lsn last_archived_log_lsn = 12; + // Set if replay_status is CATCHING_UP + optional restate.common.Lsn target_tail_lsn = 11; +} diff --git a/crates/types/src/cluster_controller.rs b/crates/types/src/cluster_controller.rs index ad213251a7..6ec6a50f4b 100644 --- a/crates/types/src/cluster_controller.rs +++ b/crates/types/src/cluster_controller.rs @@ -14,8 +14,8 @@ use std::ops::RangeInclusive; use serde_with::serde_as; use xxhash_rust::xxh3::Xxh3Builder; -use crate::cluster::cluster_state::RunMode; use crate::identifiers::{PartitionId, PartitionKey}; +use crate::partition_processor::RunMode; use crate::partition_table::PartitionTable; use crate::{flexbuffers_storage_encode_decode, PlainNodeId, Version, Versioned}; diff --git a/crates/types/src/cluster/cluster_state.rs b/crates/types/src/deprecated_cluster/cluster_state.rs similarity index 56% rename from crates/types/src/cluster/cluster_state.rs rename to crates/types/src/deprecated_cluster/cluster_state.rs index b6313ae984..3fe8c7cab8 100644 --- a/crates/types/src/cluster/cluster_state.rs +++ b/crates/types/src/deprecated_cluster/cluster_state.rs @@ -12,17 +12,16 @@ use std::collections::BTreeMap; use std::time::Instant; use prost_dto::IntoProto; -use serde::{Deserialize, Serialize}; -use crate::identifiers::{LeaderEpoch, PartitionId}; -use crate::logs::Lsn; +use crate::identifiers::PartitionId; +use crate::partition_processor::PartitionProcessorStatus; use crate::time::MillisSinceEpoch; use crate::{GenerationalNodeId, PlainNodeId, Version}; /// A container for health information about every node and partition in the /// cluster. #[derive(Debug, Clone, IntoProto)] -#[proto(target = "crate::protobuf::cluster::ClusterState")] +#[proto(target = "crate::protobuf::deprecated_cluster::ClusterState")] pub struct ClusterState { #[into_proto(map = "instant_to_proto")] pub last_refreshed: Option, @@ -75,7 +74,10 @@ fn instant_to_proto(t: Instant) -> prost_types::Duration { } #[derive(Debug, Clone, IntoProto)] -#[proto(target = "crate::protobuf::cluster::NodeState", oneof = "state")] +#[proto( + target = "crate::protobuf::deprecated_cluster::NodeState", + oneof = "state" +)] pub enum NodeState { Alive(AliveNode), Dead(DeadNode), @@ -83,7 +85,7 @@ pub enum NodeState { } #[derive(Debug, Clone, IntoProto)] -#[proto(target = "crate::protobuf::cluster::AliveNode")] +#[proto(target = "crate::protobuf::deprecated_cluster::AliveNode")] pub struct AliveNode { #[proto(required)] pub last_heartbeat_at: MillisSinceEpoch, @@ -93,13 +95,13 @@ pub struct AliveNode { } #[derive(Debug, Clone, IntoProto)] -#[proto(target = "crate::protobuf::cluster::DeadNode")] +#[proto(target = "crate::protobuf::deprecated_cluster::DeadNode")] pub struct DeadNode { pub last_seen_alive: Option, } #[derive(Debug, Clone, IntoProto)] -#[proto(target = "crate::protobuf::cluster::SuspectNode")] +#[proto(target = "crate::protobuf::deprecated_cluster::SuspectNode")] /// As the name implies, SuspectNode is both dead and alive /// until we receive a heartbeat pub struct SuspectNode { @@ -108,68 +110,3 @@ pub struct SuspectNode { #[proto(required)] pub last_attempt: MillisSinceEpoch, } - -#[derive( - Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto, derive_more::Display, -)] -#[proto(target = "crate::protobuf::cluster::RunMode")] -pub enum RunMode { - Leader, - Follower, -} - -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, IntoProto)] -#[proto(target = "crate::protobuf::cluster::ReplayStatus")] -pub enum ReplayStatus { - Starting, - Active, - CatchingUp, -} - -#[derive(Debug, Clone, Serialize, Deserialize, IntoProto)] -#[proto(target = "crate::protobuf::cluster::PartitionProcessorStatus")] -pub struct PartitionProcessorStatus { - #[proto(required)] - pub updated_at: MillisSinceEpoch, - pub planned_mode: RunMode, - pub effective_mode: RunMode, - pub last_observed_leader_epoch: Option, - pub last_observed_leader_node: Option, - pub last_applied_log_lsn: Option, - pub last_record_applied_at: Option, - pub num_skipped_records: u64, - pub replay_status: ReplayStatus, - pub last_persisted_log_lsn: Option, - pub last_archived_log_lsn: Option, - // Set if replay_status is CatchingUp - pub target_tail_lsn: Option, -} - -impl Default for PartitionProcessorStatus { - fn default() -> Self { - Self { - updated_at: MillisSinceEpoch::now(), - planned_mode: RunMode::Follower, - effective_mode: RunMode::Follower, - last_observed_leader_epoch: None, - last_observed_leader_node: None, - last_applied_log_lsn: None, - last_record_applied_at: None, - num_skipped_records: 0, - replay_status: ReplayStatus::Starting, - last_persisted_log_lsn: None, - last_archived_log_lsn: None, - target_tail_lsn: None, - } - } -} - -impl PartitionProcessorStatus { - pub fn is_effective_leader(&self) -> bool { - self.effective_mode == RunMode::Leader - } - - pub fn new() -> Self { - Self::default() - } -} diff --git a/crates/types/src/cluster/mod.rs b/crates/types/src/deprecated_cluster/mod.rs similarity index 100% rename from crates/types/src/cluster/mod.rs rename to crates/types/src/deprecated_cluster/mod.rs diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index ce3bfb1e44..8e1db8003b 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -17,8 +17,9 @@ mod node_id; mod version; pub mod art; -pub mod cluster; +pub mod deprecated_cluster; pub mod health; +pub mod partition_processor; pub mod cluster_controller; pub mod config; diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index 1668910b57..12923e432b 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -26,7 +26,7 @@ use super::builder::LogsBuilder; use super::LogletId; use crate::config::Configuration; use crate::logs::{LogId, Lsn, SequenceNumber}; -use crate::protobuf::cluster::{ +use crate::protobuf::cluster_configuration::{ NodeSetSelectionStrategy as ProtoNodeSetSelectionStrategy, NodeSetSelectionStrategyKind, }; use crate::replicated_loglet::{ReplicatedLogletParams, ReplicationProperty}; @@ -213,11 +213,11 @@ impl DefaultProvider { } } -impl From for crate::protobuf::cluster::DefaultProvider { +impl From for crate::protobuf::cluster_configuration::DefaultProvider { fn from(value: DefaultProvider) -> Self { - use crate::protobuf::cluster; + use crate::protobuf::cluster_configuration; - let mut result = crate::protobuf::cluster::DefaultProvider::default(); + let mut result = crate::protobuf::cluster_configuration::DefaultProvider::default(); match value { DefaultProvider::Local => result.provider = ProviderKind::Local.to_string(), @@ -225,7 +225,7 @@ impl From for crate::protobuf::cluster::DefaultProvider { DefaultProvider::InMemory => result.provider = ProviderKind::InMemory.to_string(), DefaultProvider::Replicated(config) => { result.provider = ProviderKind::Replicated.to_string(); - result.replicated_config = Some(cluster::ReplicatedProviderConfig { + result.replicated_config = Some(cluster_configuration::ReplicatedProviderConfig { replication_property: config.replication_property.to_string(), nodeset_selection_strategy: Some(config.nodeset_selection_strategy.into()), }) @@ -236,9 +236,11 @@ impl From for crate::protobuf::cluster::DefaultProvider { } } -impl TryFrom for DefaultProvider { +impl TryFrom for DefaultProvider { type Error = anyhow::Error; - fn try_from(value: crate::protobuf::cluster::DefaultProvider) -> Result { + fn try_from( + value: crate::protobuf::cluster_configuration::DefaultProvider, + ) -> Result { let provider_kind: ProviderKind = value.provider.parse()?; match provider_kind { diff --git a/crates/types/src/net/node.rs b/crates/types/src/net/node.rs index fbf308f191..070d8bb918 100644 --- a/crates/types/src/net/node.rs +++ b/crates/types/src/net/node.rs @@ -14,21 +14,21 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; use super::TargetName; -use crate::{cluster::cluster_state::PartitionProcessorStatus, identifiers::PartitionId}; +use crate::{identifiers::PartitionId, partition_processor::PartitionProcessorStatus}; super::define_rpc! { - @request=GetNodeState, - @response=NodeStateResponse, - @request_target=TargetName::NodeGetNodeStateRequest, - @response_target=TargetName::NodeGetNodeStateResponse, + @request=GetPartitionsProcessorsState, + @response=PartitionsProcessorsStateResponse, + @request_target=TargetName::NodeGetPartitionsProcessorsStateRequest, + @response_target=TargetName::NodeGetPartitionsProcessorsStateResponse, } #[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] -pub struct GetNodeState {} +pub struct GetPartitionsProcessorsState {} #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NodeStateResponse { +pub struct PartitionsProcessorsStateResponse { /// State of paritions processor per parition. Is set to None if this node is not a `Worker` node #[serde_as(as = "Option>")] pub partition_processor_state: Option>, diff --git a/crates/types/src/net/partition_processor_manager.rs b/crates/types/src/net/partition_processor_manager.rs index a60c158e9d..83bff00300 100644 --- a/crates/types/src/net/partition_processor_manager.rs +++ b/crates/types/src/net/partition_processor_manager.rs @@ -10,10 +10,10 @@ use serde::{Deserialize, Serialize}; -use crate::cluster::cluster_state::RunMode; use crate::identifiers::{PartitionId, SnapshotId}; use crate::net::define_rpc; use crate::net::{define_message, TargetName}; +use crate::partition_processor::RunMode; use crate::Version; define_message! { diff --git a/crates/types/src/partition_processor.rs b/crates/types/src/partition_processor.rs new file mode 100644 index 0000000000..b2dc950923 --- /dev/null +++ b/crates/types/src/partition_processor.rs @@ -0,0 +1,82 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use prost_dto::IntoProto; +use serde::{Deserialize, Serialize}; + +use crate::identifiers::LeaderEpoch; +use crate::logs::Lsn; +use crate::time::MillisSinceEpoch; +use crate::GenerationalNodeId; + +#[derive( + Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto, derive_more::Display, +)] +#[proto(target = "crate::protobuf::partition_processor::RunMode")] +pub enum RunMode { + Leader, + Follower, +} + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, IntoProto)] +#[proto(target = "crate::protobuf::partition_processor::ReplayStatus")] +pub enum ReplayStatus { + Starting, + Active, + CatchingUp, +} + +#[derive(Debug, Clone, Serialize, Deserialize, IntoProto)] +#[proto(target = "crate::protobuf::partition_processor::PartitionProcessorStatus")] +pub struct PartitionProcessorStatus { + #[proto(required)] + pub updated_at: MillisSinceEpoch, + pub planned_mode: RunMode, + pub effective_mode: RunMode, + pub last_observed_leader_epoch: Option, + pub last_observed_leader_node: Option, + pub last_applied_log_lsn: Option, + pub last_record_applied_at: Option, + pub num_skipped_records: u64, + pub replay_status: ReplayStatus, + pub last_persisted_log_lsn: Option, + pub last_archived_log_lsn: Option, + // Set if replay_status is CatchingUp + pub target_tail_lsn: Option, +} + +impl Default for PartitionProcessorStatus { + fn default() -> Self { + Self { + updated_at: MillisSinceEpoch::now(), + planned_mode: RunMode::Follower, + effective_mode: RunMode::Follower, + last_observed_leader_epoch: None, + last_observed_leader_node: None, + last_applied_log_lsn: None, + last_record_applied_at: None, + num_skipped_records: 0, + replay_status: ReplayStatus::Starting, + last_persisted_log_lsn: None, + last_archived_log_lsn: None, + target_tail_lsn: None, + } + } +} + +impl PartitionProcessorStatus { + pub fn is_effective_leader(&self) -> bool { + self.effective_mode == RunMode::Leader + } + + pub fn new() -> Self { + Self::default() + } +} diff --git a/crates/types/src/partition_table.rs b/crates/types/src/partition_table.rs index 99454a173f..699f998b59 100644 --- a/crates/types/src/partition_table.rs +++ b/crates/types/src/partition_table.rs @@ -19,7 +19,7 @@ use anyhow::Context; use regex::Regex; use crate::identifiers::{PartitionId, PartitionKey}; -use crate::protobuf::cluster::ReplicationStrategy as ProtoReplicationStrategy; +use crate::protobuf::cluster_configuration::ReplicationStrategy as ProtoReplicationStrategy; use crate::{flexbuffers_storage_encode_decode, Version, Versioned}; static REPLICATION_STRATEGY_FACTOR_PATTERN: LazyLock = LazyLock::new(|| { @@ -440,7 +440,7 @@ impl TryFrom for ReplicationStrategy { type Error = anyhow::Error; fn try_from(value: ProtoReplicationStrategy) -> Result { - use crate::protobuf::cluster::ReplicationStrategyKind; + use crate::protobuf::cluster_configuration::ReplicationStrategyKind; if value.kind == i32::from(ReplicationStrategyKind::OnAllNodes) { Ok(Self::OnAllNodes) @@ -495,7 +495,7 @@ impl FromStr for ReplicationStrategy { impl From for ProtoReplicationStrategy { fn from(value: ReplicationStrategy) -> Self { - use crate::protobuf::cluster::ReplicationStrategyKind; + use crate::protobuf::cluster_configuration::ReplicationStrategyKind; let mut result = Self::default(); match value { diff --git a/crates/types/src/protobuf.rs b/crates/types/src/protobuf.rs index 1be1d15a90..124882d1be 100644 --- a/crates/types/src/protobuf.rs +++ b/crates/types/src/protobuf.rs @@ -45,8 +45,8 @@ pub mod common { } } -pub mod cluster { - include!(concat!(env!("OUT_DIR"), "/restate.cluster.rs")); +pub mod partition_processor { + include!(concat!(env!("OUT_DIR"), "/restate.partition_processor.rs")); impl std::fmt::Display for RunMode { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -60,6 +60,17 @@ pub mod cluster { } } +pub mod deprecated_cluster { + include!(concat!(env!("OUT_DIR"), "/restate.deprecated_cluster.rs")); +} + +pub mod cluster_configuration { + include!(concat!( + env!("OUT_DIR"), + "/restate.cluster_configuration.rs" + )); +} + pub mod node { use opentelemetry::global; use opentelemetry::propagation::{Extractor, Injector}; diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 5659a7f25e..9fa9248850 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -40,7 +40,7 @@ use restate_storage_api::service_status_table::{ ReadOnlyVirtualObjectStatusTable, VirtualObjectStatus, }; use restate_storage_api::{StorageError, Transaction}; -use restate_types::cluster::cluster_state::{PartitionProcessorStatus, ReplayStatus, RunMode}; +use restate_types::partition_processor::{PartitionProcessorStatus, ReplayStatus, RunMode}; use restate_types::config::WorkerOptions; use restate_types::errors::KILLED_INVOCATION_ERROR; use restate_types::identifiers::{ diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 01a9941630..9ddfe92dfb 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -44,8 +44,6 @@ use restate_invoker_impl::{BuildError, ChannelStatusReader}; use restate_metadata_store::{MetadataStoreClient, ReadModifyWriteError}; use restate_partition_store::snapshots::PartitionSnapshotMetadata; use restate_partition_store::PartitionStoreManager; -use restate_types::cluster::cluster_state::ReplayStatus; -use restate_types::cluster::cluster_state::{PartitionProcessorStatus, RunMode}; use restate_types::config::Configuration; use restate_types::epoch::EpochMetadata; use restate_types::health::HealthStatus; @@ -60,6 +58,7 @@ use restate_types::net::partition_processor::{ use restate_types::net::partition_processor_manager::{ ControlProcessor, ControlProcessors, ProcessorCommand, }; +use restate_types::partition_processor::{PartitionProcessorStatus, ReplayStatus, RunMode}; use restate_types::partition_table::PartitionTable; use restate_types::protobuf::common::WorkerStatus; use restate_types::GenerationalNodeId; diff --git a/crates/worker/src/partition_processor_manager/processor_state.rs b/crates/worker/src/partition_processor_manager/processor_state.rs index e7924a48ba..8dda13d4d4 100644 --- a/crates/worker/src/partition_processor_manager/processor_state.rs +++ b/crates/worker/src/partition_processor_manager/processor_state.rs @@ -19,11 +19,11 @@ use ulid::Ulid; use restate_core::network::Incoming; use restate_core::{TaskCenter, TaskKind}; use restate_invoker_impl::ChannelStatusReader; -use restate_types::cluster::cluster_state::{PartitionProcessorStatus, ReplayStatus, RunMode}; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; use restate_types::net::partition_processor::{ PartitionProcessorRpcError, PartitionProcessorRpcRequest, }; +use restate_types::partition_processor::{PartitionProcessorStatus, ReplayStatus, RunMode}; use restate_types::time::MillisSinceEpoch; use crate::partition::PartitionProcessorControlCommand; diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index 8466474a53..66725b4fa0 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -20,11 +20,11 @@ use restate_invoker_impl::Service as InvokerService; use restate_partition_store::snapshots::LocalPartitionSnapshot; use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; use restate_service_protocol::codec::ProtobufRawEntryCodec; -use restate_types::cluster::cluster_state::PartitionProcessorStatus; use restate_types::config::{Configuration, WorkerOptions}; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::live::Live; use restate_types::logs::Lsn; +use restate_types::partition_processor::PartitionProcessorStatus; use restate_types::schema::Schema; use crate::invoker_integration::EntryEnricher; diff --git a/server/tests/snapshots.rs b/server/tests/snapshots.rs index 5f0fdc5846..f4cc94cb3b 100644 --- a/server/tests/snapshots.rs +++ b/server/tests/snapshots.rs @@ -34,7 +34,7 @@ use restate_local_cluster_runner::{ }; use restate_types::config::{LogFormat, MetadataStoreClient}; use restate_types::net::AdvertisedAddress; -use restate_types::protobuf::cluster::node_state::State; +use restate_types::protobuf::deprecated_cluster::node_state::State; use restate_types::{config::Configuration, nodes_config::Role}; mod common; diff --git a/tools/restatectl/src/commands/cluster/config.rs b/tools/restatectl/src/commands/cluster/config.rs index 620a2f488b..d60522dae8 100644 --- a/tools/restatectl/src/commands/cluster/config.rs +++ b/tools/restatectl/src/commands/cluster/config.rs @@ -17,7 +17,7 @@ use cling::prelude::*; use restate_types::{ logs::metadata::DefaultProvider, partition_table::ReplicationStrategy, - protobuf::cluster::ClusterConfiguration, + protobuf::cluster_configuration::ClusterConfiguration, }; #[derive(Run, Subcommand, Clone)] diff --git a/tools/restatectl/src/commands/partition/list.rs b/tools/restatectl/src/commands/partition/list.rs index 5fc3adb5eb..a6e08d7ee7 100644 --- a/tools/restatectl/src/commands/partition/list.rs +++ b/tools/restatectl/src/commands/partition/list.rs @@ -24,8 +24,9 @@ use restate_cli_util::ui::console::StyledTable; use restate_cli_util::ui::Tense; use restate_types::logs::metadata::{Chain, Logs}; use restate_types::logs::{LogId, Lsn}; -use restate_types::protobuf::cluster::{ - node_state, DeadNode, PartitionProcessorStatus, ReplayStatus, RunMode, SuspectNode, +use restate_types::protobuf::deprecated_cluster::{node_state, DeadNode, SuspectNode}; +use restate_types::protobuf::partition_processor::{ + PartitionProcessorStatus, ReplayStatus, RunMode, }; use restate_types::storage::StorageCodec; use restate_types::{GenerationalNodeId, PlainNodeId, Version};