diff --git a/crates/admin/src/cluster_controller/cluster_state_refresher.rs b/crates/admin/src/cluster_controller/cluster_state_refresher.rs deleted file mode 100644 index 1732d68eb4..0000000000 --- a/crates/admin/src/cluster_controller/cluster_state_refresher.rs +++ /dev/null @@ -1,253 +0,0 @@ -// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use std::collections::BTreeMap; -use std::sync::Arc; -use std::time::Instant; - -use tokio::sync::watch; -use tracing::{debug, trace}; - -use restate_core::network::rpc_router::RpcRouter; -use restate_core::network::{ - MessageRouterBuilder, NetworkError, Networking, Outgoing, TransportConnect, -}; -use restate_core::{ - Metadata, ShutdownError, TaskCenter, TaskCenterFutureExt, TaskHandle, TaskKind, -}; -use restate_types::cluster::cluster_state::{ - AliveNode, ClusterState, DeadNode, NodeState, SuspectNode, -}; -use restate_types::net::node::GetNodeState; -use restate_types::time::MillisSinceEpoch; -use restate_types::Version; - -pub struct ClusterStateRefresher { - network_sender: Networking, - get_state_router: RpcRouter, - in_flight_refresh: Option>>, - cluster_state_update_rx: watch::Receiver>, - cluster_state_update_tx: Arc>>, -} - -impl ClusterStateRefresher { - pub fn new(network_sender: Networking, router_builder: &mut MessageRouterBuilder) -> Self { - let get_state_router = RpcRouter::new(router_builder); - - let initial_state = ClusterState { - last_refreshed: None, - nodes_config_version: Version::INVALID, - partition_table_version: Version::INVALID, - logs_metadata_version: Version::INVALID, - nodes: BTreeMap::new(), - }; - let (cluster_state_update_tx, cluster_state_update_rx) = - watch::channel(Arc::from(initial_state)); - - Self { - network_sender, - get_state_router, - in_flight_refresh: None, - cluster_state_update_rx, - cluster_state_update_tx: Arc::new(cluster_state_update_tx), - } - } - - pub fn get_cluster_state(&self) -> Arc { - Arc::clone(&self.cluster_state_update_rx.borrow()) - } - - pub fn cluster_state_watcher(&self) -> ClusterStateWatcher { - ClusterStateWatcher { - cluster_state_watcher: self.cluster_state_update_rx.clone(), - } - } - - pub async fn next_cluster_state_update(&mut self) -> Arc { - self.cluster_state_update_rx - .changed() - .await - .expect("sender should always exist"); - Arc::clone(&self.cluster_state_update_rx.borrow_and_update()) - } - - pub fn schedule_refresh(&mut self) -> Result<(), ShutdownError> { - // if in-flight refresh is happening, then ignore. - if let Some(handle) = &self.in_flight_refresh { - if handle.is_finished() { - self.in_flight_refresh = None; - } else { - // still in flight. - return Ok(()); - } - } - - self.in_flight_refresh = Self::start_refresh_task( - self.get_state_router.clone(), - self.network_sender.clone(), - Arc::clone(&self.cluster_state_update_tx), - )?; - - Ok(()) - } - - fn start_refresh_task( - get_state_router: RpcRouter, - network_sender: Networking, - cluster_state_tx: Arc>>, - ) -> Result>>, ShutdownError> { - let refresh = async move { - let last_state = Arc::clone(&cluster_state_tx.borrow()); - let metadata = Metadata::current(); - // make sure we have a partition table that equals or newer than last refresh - let partition_table_version = metadata - .wait_for_version( - restate_core::MetadataKind::PartitionTable, - last_state.partition_table_version, - ) - .await?; - let _ = metadata - .wait_for_version( - restate_core::MetadataKind::NodesConfiguration, - last_state.nodes_config_version, - ) - .await; - let nodes_config = metadata.nodes_config_snapshot(); - - let mut nodes = BTreeMap::new(); - let mut join_set = tokio::task::JoinSet::new(); - for (_, node_config) in nodes_config.iter() { - let node_id = node_config.current_generation; - let rpc_router = get_state_router.clone(); - let network_sender = network_sender.clone(); - join_set - .build_task() - .name("get-nodes-state") - .spawn( - async move { - match network_sender.node_connection(node_id).await { - Ok(connection) => { - let outgoing = Outgoing::new(node_id, GetNodeState::default()) - .assign_connection(connection); - - ( - node_id, - rpc_router - .call_outgoing_timeout( - outgoing, - std::time::Duration::from_secs(1), // todo: make configurable - ) - .await, - ) - } - Err(network_error) => (node_id, Err(network_error)), - } - } - .in_current_tc_as_task(TaskKind::InPlace, "get-nodes-state"), - ) - .expect("to spawn task"); - } - while let Some(Ok((node_id, result))) = join_set.join_next().await { - match result { - Ok(response) => { - let peer = response.peer(); - let msg = response.into_body(); - nodes.insert( - node_id.as_plain(), - NodeState::Alive(AliveNode { - last_heartbeat_at: MillisSinceEpoch::now(), - generational_node_id: peer, - partitions: msg.partition_processor_state.unwrap_or_default(), - }), - ); - } - Err(NetworkError::RemoteVersionMismatch(msg)) => { - // When **this** node has just started, other peers might not have - // learned about the new metadata version and then they can - // return a RemoteVersionMismatch error. - // In this case we are not sure about the peer state but it's - // definitely not dead! - // Hence we set it as Suspect node. This gives it enough time to update - // its metadata, before we know the exact state - debug!("Node {node_id} is marked as Suspect: {msg}"); - nodes.insert( - node_id.as_plain(), - NodeState::Suspect(SuspectNode { - generational_node_id: node_id, - last_attempt: MillisSinceEpoch::now(), - }), - ); - } - Err(err) => { - // todo: implement a more robust failure detector - // This is a naive mechanism for failure detection and is just a stop-gap measure. - // A single connection error or timeout will cause a node to be marked as dead. - trace!("Node {node_id} is marked dead: {err}"); - let last_seen_alive = last_state.nodes.get(&node_id.as_plain()).and_then( - |state| match state { - NodeState::Alive(AliveNode { - last_heartbeat_at, .. - }) => Some(*last_heartbeat_at), - NodeState::Dead(DeadNode { last_seen_alive }) => *last_seen_alive, - NodeState::Suspect(_) => None, - }, - ); - - nodes.insert( - node_id.as_plain(), - NodeState::Dead(DeadNode { last_seen_alive }), - ); - } - }; - } - - let state = ClusterState { - last_refreshed: Some(Instant::now()), - nodes_config_version: nodes_config.version(), - partition_table_version, - nodes, - logs_metadata_version: metadata.logs_version(), - }; - - // publish the new state - cluster_state_tx.send(Arc::new(state))?; - Ok(()) - }; - - let handle = TaskCenter::spawn_unmanaged( - restate_core::TaskKind::Disposable, - "cluster-state-refresh", - refresh, - )?; - - // If this returned None, it means that the task completed or has been - // cancelled before we get to this point. - Ok(Some(handle)) - } -} - -#[derive(Debug, Clone)] -pub struct ClusterStateWatcher { - cluster_state_watcher: watch::Receiver>, -} - -impl ClusterStateWatcher { - pub async fn next_cluster_state(&mut self) -> Result, ShutdownError> { - self.cluster_state_watcher - .changed() - .await - .map_err(|_| ShutdownError)?; - Ok(Arc::clone(&self.cluster_state_watcher.borrow_and_update())) - } - - pub fn current(&self) -> Arc { - Arc::clone(&self.cluster_state_watcher.borrow()) - } -} diff --git a/crates/admin/src/cluster_controller/mod.rs b/crates/admin/src/cluster_controller/mod.rs index 5865d2ad8e..ba03bedab4 100644 --- a/crates/admin/src/cluster_controller/mod.rs +++ b/crates/admin/src/cluster_controller/mod.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -pub mod cluster_state_refresher; pub mod grpc_svc_handler; mod logs_controller; mod observed_cluster_state; diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index a7e393196d..47ecc78f85 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -29,9 +29,9 @@ use restate_types::partition_table::{ self, PartitionTable, PartitionTableBuilder, ReplicationStrategy, }; use restate_types::replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio::time; -use tokio::time::{Instant, Interval, MissedTickBehavior}; +use tokio::time::MissedTickBehavior; use tonic::codec::CompressionEncoding; use tracing::{debug, info}; @@ -47,7 +47,7 @@ use restate_core::{ TaskKind, }; use restate_types::cluster::cluster_state::ClusterState; -use restate_types::config::{AdminOptions, Configuration}; +use restate_types::config::Configuration; use restate_types::health::HealthStatus; use restate_types::identifiers::{PartitionId, SnapshotId}; use restate_types::live::Live; @@ -57,7 +57,6 @@ use restate_types::net::partition_processor_manager::CreateSnapshotRequest; use restate_types::protobuf::common::AdminStatus; use restate_types::{GenerationalNodeId, Version, Versioned}; -use super::cluster_state_refresher::ClusterStateRefresher; use super::grpc_svc_handler::ClusterCtrlSvcHandler; use super::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvcServer; use crate::cluster_controller::logs_controller::{self, NodeSetSelectorHints}; @@ -75,7 +74,7 @@ pub enum Error { pub struct Service { networking: Networking, bifrost: Bifrost, - cluster_state_refresher: ClusterStateRefresher, + cluster_state_watch: watch::Receiver>, configuration: Live, metadata_writer: MetadataWriter, metadata_store_client: MetadataStoreClient, @@ -84,7 +83,6 @@ pub struct Service { command_tx: mpsc::Sender, command_rx: mpsc::Receiver, health_status: HealthStatus, - heartbeat_interval: Interval, observed_cluster_state: ObservedClusterState, } @@ -94,7 +92,7 @@ where { #[allow(clippy::too_many_arguments)] pub fn new( - mut configuration: Live, + configuration: Live, health_status: HealthStatus, bifrost: Bifrost, networking: Networking, @@ -102,18 +100,13 @@ where server_builder: &mut NetworkServerBuilder, metadata_writer: MetadataWriter, metadata_store_client: MetadataStoreClient, + cluster_state_watch: watch::Receiver>, ) -> Self { let (command_tx, command_rx) = mpsc::channel(2); - let cluster_state_refresher = - ClusterStateRefresher::new(networking.clone(), router_builder); - let processor_manager_client = PartitionProcessorManagerClient::new(networking.clone(), router_builder); - let options = configuration.live_load(); - let heartbeat_interval = Self::create_heartbeat_interval(&options.admin); - // Registering ClusterCtrlSvc grpc service to network server server_builder.register_grpc_service( TonicServiceFilter::new( @@ -137,26 +130,15 @@ where health_status, networking, bifrost, - cluster_state_refresher, + cluster_state_watch, metadata_writer, metadata_store_client, processor_manager_client, command_tx, command_rx, - heartbeat_interval, observed_cluster_state: ObservedClusterState::default(), } } - - fn create_heartbeat_interval(options: &AdminOptions) -> Interval { - let mut heartbeat_interval = time::interval_at( - Instant::now() + options.heartbeat_interval.into(), - options.heartbeat_interval.into(), - ); - heartbeat_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - - heartbeat_interval - } } #[derive(Debug)] @@ -298,7 +280,6 @@ impl Service { self.init_partition_table().await?; let mut config_watcher = Configuration::watcher(); - let mut cluster_state_watcher = self.cluster_state_refresher.cluster_state_watcher(); TaskCenter::spawn_child( TaskKind::SystemService, @@ -320,11 +301,8 @@ impl Service { loop { tokio::select! { - _ = self.heartbeat_interval.tick() => { - // Ignore error if system is shutting down - let _ = self.cluster_state_refresher.schedule_refresh(); - }, - Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => { + Ok(_) = self.cluster_state_watch.changed() => { + let cluster_state = Arc::clone(&self.cluster_state_watch.borrow()); self.observed_cluster_state.update(&cluster_state); state.update(&self).await?; @@ -337,7 +315,6 @@ impl Service { _ = config_watcher.changed() => { debug!("Updating the cluster controller settings."); let configuration = self.configuration.live_load(); - self.heartbeat_interval = Self::create_heartbeat_interval(&configuration.admin); state.reconfigure(configuration); } result = state.run() => { @@ -387,7 +364,7 @@ impl Service { partition_id: PartitionId, response_tx: oneshot::Sender>, ) { - let cluster_state = self.cluster_state_refresher.get_cluster_state(); + let cluster_state = Arc::clone(&self.cluster_state_watch.borrow()); // For now, we just pick the leader node since we know that every partition is likely to // have one. We'll want to update the algorithm to be smart about scheduling snapshot tasks @@ -576,7 +553,7 @@ impl Service { ) { match command { ClusterControllerCommand::GetClusterState(tx) => { - let _ = tx.send(self.cluster_state_refresher.get_cluster_state()); + let _ = tx.send(Arc::clone(&self.cluster_state_watch.borrow())); } ClusterControllerCommand::TrimLog { log_id, @@ -841,7 +818,7 @@ mod tests { }; use restate_core::test_env::NoOpMessageHandler; use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder}; - use restate_types::cluster::cluster_state::PartitionProcessorStatus; + use restate_types::cluster::cluster_state::{ClusterState, PartitionProcessorStatus}; use restate_types::config::{AdminOptions, Configuration}; use restate_types::health::HealthStatus; use restate_types::identifiers::PartitionId; @@ -852,6 +829,7 @@ mod tests { use restate_types::net::AdvertisedAddress; use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; use restate_types::{GenerationalNodeId, Version}; + use tokio::sync::watch; #[test(restate_core::test)] async fn manual_log_trim() -> anyhow::Result<()> { @@ -860,6 +838,7 @@ mod tests { let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); + let (_, cluster_state_watch) = watch::channel(Arc::new(ClusterState::empty())); let svc = Service::new( Live::from_value(Configuration::default()), HealthStatus::default(), @@ -869,6 +848,7 @@ mod tests { &mut NetworkServerBuilder::default(), builder.metadata_writer.clone(), builder.metadata_store_client.clone(), + cluster_state_watch, ); let svc_handle = svc.handle(); @@ -1141,6 +1121,8 @@ mod tests { let mut server_builder = NetworkServerBuilder::default(); + let (_, cluster_state_watch) = watch::channel(Arc::new(ClusterState::empty())); + let svc = Service::new( Live::from_value(config), HealthStatus::default(), @@ -1150,6 +1132,7 @@ mod tests { &mut server_builder, builder.metadata_writer.clone(), builder.metadata_store_client.clone(), + cluster_state_watch, ); let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b55488393..0d90093320 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use std::collections::BTreeMap; +use std::sync::Arc; use futures::future::OptionFuture; use itertools::Itertools; @@ -21,14 +22,13 @@ use restate_bifrost::{Bifrost, BifrostAdmin}; use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::TransportConnect; use restate_core::{my_node_id, Metadata, MetadataWriter}; -use restate_types::cluster::cluster_state::{AliveNode, NodeState}; +use restate_types::cluster::cluster_state::{AliveNode, ClusterState, NodeState}; use restate_types::config::{AdminOptions, Configuration}; use restate_types::identifiers::PartitionId; use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_types::net::metadata::MetadataKind; use restate_types::{GenerationalNodeId, Version}; -use crate::cluster_controller::cluster_state_refresher::ClusterStateWatcher; use crate::cluster_controller::logs_controller::{ LogsBasedPartitionProcessorPlacementHints, LogsController, }; @@ -143,7 +143,7 @@ pub struct Leader { log_trim_interval: Option, logs_controller: LogsController, scheduler: Scheduler, - cluster_state_watcher: ClusterStateWatcher, + cluster_state_watch: watch::Receiver>, log_trim_threshold: Lsn, } @@ -183,7 +183,7 @@ where metadata_writer: service.metadata_writer.clone(), logs_watcher: metadata.watch(MetadataKind::Logs), partition_table_watcher: metadata.watch(MetadataKind::PartitionTable), - cluster_state_watcher: service.cluster_state_refresher.cluster_state_watcher(), + cluster_state_watch: service.cluster_state_watch.clone(), find_logs_tail_interval, log_trim_interval, log_trim_threshold, @@ -302,7 +302,7 @@ where &self.metadata_store_client, ); - let cluster_state = self.cluster_state_watcher.current(); + let cluster_state = Arc::clone(&self.cluster_state_watch.borrow()); let mut persisted_lsns_per_partition: BTreeMap< PartitionId, diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index d85c9be8ff..d0b7e94527 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -282,6 +282,7 @@ impl Node { worker_role .as_ref() .map(|worker_role| worker_role.storage_query_context().clone()), + base_role.cluster_state_watch(), ) .await?, ) diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index b06ed3eeca..45444cbc84 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -8,9 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::sync::Arc; use std::time::Duration; use codederror::CodedError; +use tokio::sync::watch; + use restate_admin::cluster_controller; use restate_admin::service::AdminService; use restate_bifrost::Bifrost; @@ -29,6 +32,7 @@ use restate_storage_query_datafusion::remote_query_scanner_client::create_remote use restate_storage_query_datafusion::remote_query_scanner_manager::{ create_partition_locator, RemoteScannerManager, }; +use restate_types::cluster::cluster_state::ClusterState; use restate_types::config::Configuration; use restate_types::config::IngressOptions; use restate_types::health::HealthStatus; @@ -72,6 +76,7 @@ impl AdminRole { router_builder: &mut MessageRouterBuilder, metadata_store_client: MetadataStoreClient, local_query_context: Option, + cluster_state_watch: watch::Receiver>, ) -> Result { health_status.update(AdminStatus::StartingUp); let config = updateable_config.pinned(); @@ -122,6 +127,7 @@ impl AdminRole { server_builder, metadata_writer, metadata_store_client, + cluster_state_watch, )) } else { None diff --git a/crates/types/src/config/admin.rs b/crates/types/src/config/admin.rs index 055bf8aade..7d688ed09f 100644 --- a/crates/types/src/config/admin.rs +++ b/crates/types/src/config/admin.rs @@ -38,13 +38,6 @@ pub struct AdminOptions { concurrent_api_requests_limit: Option, pub query_engine: QueryEngineOptions, - /// # Controller heartbeats - /// - /// Controls the interval at which cluster controller polls nodes of the cluster. - #[serde_as(as = "serde_with::DisplayFromStr")] - #[cfg_attr(feature = "schemars", schemars(with = "String"))] - pub heartbeat_interval: humantime::Duration, - /// # Log trim interval /// /// Controls the interval at which cluster controller tries to trim the logs. Log trimming @@ -107,7 +100,6 @@ impl Default for AdminOptions { // max is limited by Tower's LoadShedLayer. concurrent_api_requests_limit: None, query_engine: Default::default(), - heartbeat_interval: Duration::from_millis(1500).into(), // try to trim the log every hour log_trim_interval: Some(Duration::from_secs(60 * 60).into()), log_trim_threshold: 1000,