diff --git a/crates/admin/build.rs b/crates/admin/build.rs index fa2d555b9..2a94396f0 100644 --- a/crates/admin/build.rs +++ b/crates/admin/build.rs @@ -22,6 +22,10 @@ fn main() -> Result<(), Box> { .protoc_arg("--experimental_allow_proto3_optional") .extern_path(".restate.common", "::restate_types::protobuf::common") .extern_path(".restate.cluster", "::restate_types::protobuf::cluster") + .extern_path( + ".restate.deprecated_cluster", + "::restate_types::protobuf::deprecated_cluster", + ) .compile_protos( &["./protobuf/cluster_ctrl_svc.proto"], &["protobuf", "../types/protobuf"], diff --git a/crates/admin/protobuf/cluster_ctrl_svc.proto b/crates/admin/protobuf/cluster_ctrl_svc.proto index a4342bd28..a38f680d0 100644 --- a/crates/admin/protobuf/cluster_ctrl_svc.proto +++ b/crates/admin/protobuf/cluster_ctrl_svc.proto @@ -11,6 +11,7 @@ syntax = "proto3"; import "restate/common.proto"; import "restate/cluster.proto"; +import "restate/deprecated_cluster.proto"; import "google/protobuf/empty.proto"; package restate.cluster_ctrl; @@ -53,7 +54,9 @@ message GetClusterConfigurationResponse { message ClusterStateRequest {} -message ClusterStateResponse { restate.cluster.ClusterState cluster_state = 1; } +message ClusterStateResponse { + restate.deprecated_cluster.ClusterState cluster_state = 1; +} message ListLogsRequest {} diff --git a/crates/admin/src/cluster_controller/cluster_state_refresher.rs b/crates/admin/src/cluster_controller/cluster_state_refresher.rs index 1732d68eb..a65b224f6 100644 --- a/crates/admin/src/cluster_controller/cluster_state_refresher.rs +++ b/crates/admin/src/cluster_controller/cluster_state_refresher.rs @@ -22,16 +22,16 @@ use restate_core::network::{ use restate_core::{ Metadata, ShutdownError, TaskCenter, TaskCenterFutureExt, TaskHandle, TaskKind, }; -use restate_types::cluster::cluster_state::{ +use restate_types::deprecated_cluster::cluster_state::{ AliveNode, ClusterState, DeadNode, NodeState, SuspectNode, }; -use restate_types::net::node::GetNodeState; +use restate_types::net::node::GetPartitionsProcessorsState; use restate_types::time::MillisSinceEpoch; use restate_types::Version; pub struct ClusterStateRefresher { network_sender: Networking, - get_state_router: RpcRouter, + get_state_router: RpcRouter, in_flight_refresh: Option>>, cluster_state_update_rx: watch::Receiver>, cluster_state_update_tx: Arc>>, @@ -99,7 +99,7 @@ impl ClusterStateRefresher { } fn start_refresh_task( - get_state_router: RpcRouter, + get_state_router: RpcRouter, network_sender: Networking, cluster_state_tx: Arc>>, ) -> Result>>, ShutdownError> { @@ -134,8 +134,11 @@ impl ClusterStateRefresher { async move { match network_sender.node_connection(node_id).await { Ok(connection) => { - let outgoing = Outgoing::new(node_id, GetNodeState::default()) - .assign_connection(connection); + let outgoing = Outgoing::new( + node_id, + GetPartitionsProcessorsState::default(), + ) + .assign_connection(connection); ( node_id, diff --git a/crates/admin/src/cluster_controller/observed_cluster_state.rs b/crates/admin/src/cluster_controller/observed_cluster_state.rs index 2ec51276d..8901c1ced 100644 --- a/crates/admin/src/cluster_controller/observed_cluster_state.rs +++ b/crates/admin/src/cluster_controller/observed_cluster_state.rs @@ -12,7 +12,8 @@ use std::collections::{HashMap, HashSet}; use xxhash_rust::xxh3::Xxh3Builder; -use restate_types::cluster::cluster_state::{ClusterState, NodeState, RunMode}; +use restate_types::cluster::cluster_state::RunMode; +use restate_types::deprecated_cluster::cluster_state::{ClusterState, NodeState}; use restate_types::identifiers::PartitionId; use restate_types::{GenerationalNodeId, NodeId, PlainNodeId}; @@ -135,8 +136,9 @@ mod tests { }; use googletest::prelude::{empty, eq}; use googletest::{assert_that, elements_are, unordered_elements_are}; - use restate_types::cluster::cluster_state::{ - AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode, + use restate_types::cluster::cluster_state::{PartitionProcessorStatus, RunMode}; + use restate_types::deprecated_cluster::cluster_state::{ + AliveNode, ClusterState, DeadNode, NodeState, }; use restate_types::identifiers::PartitionId; use restate_types::time::MillisSinceEpoch; diff --git a/crates/admin/src/cluster_controller/scheduler.rs b/crates/admin/src/cluster_controller/scheduler.rs index e7ab524be..97c6ee5c9 100644 --- a/crates/admin/src/cluster_controller/scheduler.rs +++ b/crates/admin/src/cluster_controller/scheduler.rs @@ -600,13 +600,14 @@ mod tests { }; use restate_core::network::{ForwardingHandler, Incoming, MessageCollectorMockConnector}; use restate_core::{Metadata, TestCoreEnv, TestCoreEnvBuilder}; - use restate_types::cluster::cluster_state::{ - AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode, - }; + use restate_types::cluster::cluster_state::{PartitionProcessorStatus, RunMode}; use restate_types::cluster_controller::{ SchedulingPlan, SchedulingPlanBuilder, TargetPartitionState, }; use restate_types::config::Configuration; + use restate_types::deprecated_cluster::cluster_state::{ + AliveNode, ClusterState, DeadNode, NodeState, + }; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY; use restate_types::net::codec::WireDecode; diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index a7e393196..fc7d883d3 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -46,8 +46,8 @@ use restate_core::{ cancellation_watcher, Metadata, MetadataWriter, ShutdownError, TargetVersion, TaskCenter, TaskKind, }; -use restate_types::cluster::cluster_state::ClusterState; use restate_types::config::{AdminOptions, Configuration}; +use restate_types::deprecated_cluster::cluster_state::ClusterState; use restate_types::health::HealthStatus; use restate_types::identifiers::{PartitionId, SnapshotId}; use restate_types::live::Live; @@ -847,7 +847,9 @@ mod tests { use restate_types::identifiers::PartitionId; use restate_types::live::Live; use restate_types::logs::{LogId, Lsn, SequenceNumber}; - use restate_types::net::node::{GetNodeState, NodeStateResponse}; + use restate_types::net::node::{ + GetPartitionsProcessorsState, PartitionsProcessorsStateResponse, + }; use restate_types::net::partition_processor_manager::ControlProcessors; use restate_types::net::AdvertisedAddress; use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; @@ -901,7 +903,7 @@ mod tests { } impl MessageHandler for NodeStateHandler { - type MessageType = GetNodeState; + type MessageType = GetPartitionsProcessorsState; async fn on_message(&self, msg: Incoming) { if self.block_list.contains(&msg.peer()) { @@ -915,7 +917,7 @@ mod tests { }; let state = [(PartitionId::MIN, partition_processor_status)].into(); - let response = msg.to_rpc_response(NodeStateResponse { + let response = msg.to_rpc_response(PartitionsProcessorsStateResponse { partition_processor_state: Some(state), }); diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b5548839..6c75ba386 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -21,8 +21,8 @@ use restate_bifrost::{Bifrost, BifrostAdmin}; use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::TransportConnect; use restate_core::{my_node_id, Metadata, MetadataWriter}; -use restate_types::cluster::cluster_state::{AliveNode, NodeState}; use restate_types::config::{AdminOptions, Configuration}; +use restate_types::deprecated_cluster::cluster_state::{AliveNode, NodeState}; use restate_types::identifiers::PartitionId; use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_types::net::metadata::MetadataKind; diff --git a/crates/core/src/network/connection_manager.rs b/crates/core/src/network/connection_manager.rs index 6ce8dc4ed..5f7dadbf5 100644 --- a/crates/core/src/network/connection_manager.rs +++ b/crates/core/src/network/connection_manager.rs @@ -766,7 +766,7 @@ mod tests { use restate_test_util::{assert_eq, let_assert}; use restate_types::net::codec::WireDecode; use restate_types::net::metadata::{GetMetadataRequest, MetadataMessage}; - use restate_types::net::node::GetNodeState; + use restate_types::net::node::GetPartitionsProcessorsState; use restate_types::net::{ AdvertisedAddress, ProtocolVersion, CURRENT_PROTOCOL_VERSION, MIN_SUPPORTED_PROTOCOL_VERSION, @@ -1013,7 +1013,7 @@ mod tests { .await .into_test_result()?; - let request = GetNodeState {}; + let request = GetPartitionsProcessorsState {}; let partition_table_version = metadata.partition_table_version().next(); let header = Header::new( metadata.nodes_config_version(), diff --git a/crates/node/src/roles/base.rs b/crates/node/src/roles/base.rs index e6140dea4..2a40bc3d0 100644 --- a/crates/node/src/roles/base.rs +++ b/crates/node/src/roles/base.rs @@ -17,11 +17,11 @@ use restate_core::{ worker_api::ProcessorsManagerHandle, ShutdownError, TaskCenter, TaskKind, }; -use restate_types::net::node::{GetNodeState, NodeStateResponse}; +use restate_types::net::node::{GetPartitionsProcessorsState, PartitionsProcessorsStateResponse}; pub struct BaseRole { processor_manager_handle: Option, - incoming_node_state: MessageStream, + processors_state_request_stream: MessageStream, } impl BaseRole { @@ -29,11 +29,11 @@ impl BaseRole { router_builder: &mut MessageRouterBuilder, processor_manager_handle: Option, ) -> Self { - let incoming_node_state = router_builder.subscribe_to_stream(2); + let processors_state_request_stream = router_builder.subscribe_to_stream(2); Self { processor_manager_handle, - incoming_node_state, + processors_state_request_stream, } } @@ -56,17 +56,17 @@ impl BaseRole { } async fn run(mut self) -> anyhow::Result<()> { - while let Some(request) = self.incoming_node_state.next().await { + while let Some(request) = self.processors_state_request_stream.next().await { // handle request - self.handle_get_node_state(request).await?; + self.handle_get_partitions_processors_state(request).await?; } Ok(()) } - async fn handle_get_node_state( + async fn handle_get_partitions_processors_state( &self, - msg: Incoming, + msg: Incoming, ) -> Result<(), ShutdownError> { let partition_state = if let Some(ref handle) = self.processor_manager_handle { Some(handle.get_state().await?) @@ -76,7 +76,7 @@ impl BaseRole { // only return error if Shutdown if let Err(NetworkError::Shutdown(err)) = msg - .to_rpc_response(NodeStateResponse { + .to_rpc_response(PartitionsProcessorsStateResponse { partition_processor_state: partition_state, }) .try_send() diff --git a/crates/types/build.rs b/crates/types/build.rs index 3fc94b90e..6a0adb725 100644 --- a/crates/types/build.rs +++ b/crates/types/build.rs @@ -93,7 +93,8 @@ fn build_restate_proto(out_dir: &Path) -> std::io::Result<()> { .enum_attribute("Message.body", "#[derive(::derive_more::IsVariant)]") .btree_map([ ".restate.cluster.ClusterState", - ".restate.cluster.AliveNode", + ".restate.deprecated_cluster.ClusterState", + ".restate.deprecated_cluster.AliveNode", ]) .file_descriptor_set_path(out_dir.join("common_descriptor.bin")) // allow older protobuf compiler to be used @@ -102,6 +103,7 @@ fn build_restate_proto(out_dir: &Path) -> std::io::Result<()> { &[ "./protobuf/restate/common.proto", "./protobuf/restate/cluster.proto", + "./protobuf/restate/deprecated_cluster.proto", "./protobuf/restate/log_server_common.proto", "./protobuf/restate/node.proto", ], diff --git a/crates/types/protobuf/restate/cluster.proto b/crates/types/protobuf/restate/cluster.proto index d5dfb65e5..502573759 100644 --- a/crates/types/protobuf/restate/cluster.proto +++ b/crates/types/protobuf/restate/cluster.proto @@ -41,9 +41,6 @@ message SuspectNode { message AliveNode { restate.common.NodeId generational_node_id = 1; google.protobuf.Timestamp last_heartbeat_at = 2; - // partition id is u16 but protobuf doesn't support u16. This must be a value - // that's safe to convert to u16 - map partitions = 3; } message DeadNode { google.protobuf.Timestamp last_seen_alive = 1; } diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index eb8488b52..9c86f6758 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -71,8 +71,9 @@ enum TargetName { PARTITION_PROCESSOR_RPC = 52; PARTITION_PROCESSOR_RPC_RESPONSE = 53; // Node - NODE_GET_NODE_STATE_REQUEST = 60; - NODE_GET_NODE_STATE_RESPONSE = 61; + NODE_GET_PARTITIONS_PROCESSORS_STATE_REQUEST = 60; + NODE_GET_PARTITIONS_PROCESSORS_STATE_RESPONSE = 61; + // Remote Scanner REMOTE_QUERY_SCANNER_OPEN = 80; REMOTE_QUERY_SCANNER_OPENED = 81; diff --git a/crates/types/protobuf/restate/deprecated_cluster.proto b/crates/types/protobuf/restate/deprecated_cluster.proto new file mode 100644 index 000000000..4ae179eaf --- /dev/null +++ b/crates/types/protobuf/restate/deprecated_cluster.proto @@ -0,0 +1,50 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +syntax = "proto3"; + +import "restate/common.proto"; +import "restate/cluster.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; + +package restate.deprecated_cluster; + +message ClusterState { + google.protobuf.Duration last_refreshed = 1; + restate.common.Version nodes_config_version = 2; + restate.common.Version partition_table_version = 3; + map nodes = 4; + restate.common.Version logs_metadata_version = 5; +} + +message NodeState { + oneof state { + AliveNode alive = 1; + DeadNode dead = 2; + SuspectNode suspect = 3; + } +} + +message SuspectNode { + restate.common.NodeId generational_node_id = 1; + google.protobuf.Timestamp last_attempt = 2; +} + +message AliveNode { + restate.common.NodeId generational_node_id = 1; + google.protobuf.Timestamp last_heartbeat_at = 2; + // partition id is u16 but protobuf doesn't support u16. This must be a value + // that's safe to convert to u16 + map partitions = 3; +} + +message DeadNode { google.protobuf.Timestamp last_seen_alive = 1; } diff --git a/crates/types/src/cluster/cluster_state.rs b/crates/types/src/cluster/cluster_state.rs index b6313ae98..ab1be61ca 100644 --- a/crates/types/src/cluster/cluster_state.rs +++ b/crates/types/src/cluster/cluster_state.rs @@ -14,7 +14,7 @@ use std::time::Instant; use prost_dto::IntoProto; use serde::{Deserialize, Serialize}; -use crate::identifiers::{LeaderEpoch, PartitionId}; +use crate::identifiers::LeaderEpoch; use crate::logs::Lsn; use crate::time::MillisSinceEpoch; use crate::{GenerationalNodeId, PlainNodeId, Version}; @@ -89,7 +89,6 @@ pub struct AliveNode { pub last_heartbeat_at: MillisSinceEpoch, #[proto(required)] pub generational_node_id: GenerationalNodeId, - pub partitions: BTreeMap, } #[derive(Debug, Clone, IntoProto)] diff --git a/crates/types/src/deprecated_cluster/cluster_state.rs b/crates/types/src/deprecated_cluster/cluster_state.rs new file mode 100644 index 000000000..1646782f5 --- /dev/null +++ b/crates/types/src/deprecated_cluster/cluster_state.rs @@ -0,0 +1,112 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; +use std::time::Instant; + +use prost_dto::IntoProto; + +use crate::cluster::cluster_state::PartitionProcessorStatus; +use crate::identifiers::PartitionId; +use crate::time::MillisSinceEpoch; +use crate::{GenerationalNodeId, PlainNodeId, Version}; + +/// A container for health information about every node and partition in the +/// cluster. +#[derive(Debug, Clone, IntoProto)] +#[proto(target = "crate::protobuf::deprecated_cluster::ClusterState")] +pub struct ClusterState { + #[into_proto(map = "instant_to_proto")] + pub last_refreshed: Option, + #[proto(required)] + pub nodes_config_version: Version, + #[proto(required)] + pub partition_table_version: Version, + #[proto(required)] + pub logs_metadata_version: Version, + pub nodes: BTreeMap, +} + +impl ClusterState { + pub fn is_reliable(&self) -> bool { + // todo: make this configurable + // If the cluster state is older than 10 seconds, then it is not reliable. + self.last_refreshed + .map(|last_refreshed| last_refreshed.elapsed().as_secs() < 10) + .unwrap_or(false) + } + + pub fn alive_nodes(&self) -> impl Iterator { + self.nodes.values().flat_map(|node| match node { + NodeState::Alive(alive_node) => Some(alive_node), + NodeState::Dead(_) | NodeState::Suspect(_) => None, + }) + } + + pub fn dead_nodes(&self) -> impl Iterator { + self.nodes.iter().flat_map(|(node_id, state)| match state { + NodeState::Alive(_) | NodeState::Suspect(_) => None, + NodeState::Dead(_) => Some(node_id), + }) + } + + #[cfg(any(test, feature = "test-util"))] + pub fn empty() -> Self { + ClusterState { + last_refreshed: None, + nodes_config_version: Version::INVALID, + partition_table_version: Version::INVALID, + logs_metadata_version: Version::INVALID, + nodes: BTreeMap::default(), + } + } +} + +fn instant_to_proto(t: Instant) -> prost_types::Duration { + t.elapsed().try_into().unwrap() +} + +#[derive(Debug, Clone, IntoProto)] +#[proto( + target = "crate::protobuf::deprecated_cluster::NodeState", + oneof = "state" +)] +pub enum NodeState { + Alive(AliveNode), + Dead(DeadNode), + Suspect(SuspectNode), +} + +#[derive(Debug, Clone, IntoProto)] +#[proto(target = "crate::protobuf::deprecated_cluster::AliveNode")] +pub struct AliveNode { + #[proto(required)] + pub last_heartbeat_at: MillisSinceEpoch, + #[proto(required)] + pub generational_node_id: GenerationalNodeId, + pub partitions: BTreeMap, +} + +#[derive(Debug, Clone, IntoProto)] +#[proto(target = "crate::protobuf::deprecated_cluster::DeadNode")] +pub struct DeadNode { + pub last_seen_alive: Option, +} + +#[derive(Debug, Clone, IntoProto)] +#[proto(target = "crate::protobuf::deprecated_cluster::SuspectNode")] +/// As the name implies, SuspectNode is both dead and alive +/// until we receive a heartbeat +pub struct SuspectNode { + #[proto(required)] + pub generational_node_id: GenerationalNodeId, + #[proto(required)] + pub last_attempt: MillisSinceEpoch, +} diff --git a/crates/types/src/deprecated_cluster/mod.rs b/crates/types/src/deprecated_cluster/mod.rs new file mode 100644 index 000000000..011338164 --- /dev/null +++ b/crates/types/src/deprecated_cluster/mod.rs @@ -0,0 +1,11 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod cluster_state; diff --git a/crates/types/src/lib.rs b/crates/types/src/lib.rs index ce3bfb1e4..37c734f60 100644 --- a/crates/types/src/lib.rs +++ b/crates/types/src/lib.rs @@ -18,6 +18,7 @@ mod version; pub mod art; pub mod cluster; +pub mod deprecated_cluster; pub mod health; pub mod cluster_controller; diff --git a/crates/types/src/net/node.rs b/crates/types/src/net/node.rs index fbf308f19..7054cf1c7 100644 --- a/crates/types/src/net/node.rs +++ b/crates/types/src/net/node.rs @@ -17,18 +17,18 @@ use super::TargetName; use crate::{cluster::cluster_state::PartitionProcessorStatus, identifiers::PartitionId}; super::define_rpc! { - @request=GetNodeState, - @response=NodeStateResponse, - @request_target=TargetName::NodeGetNodeStateRequest, - @response_target=TargetName::NodeGetNodeStateResponse, + @request=GetPartitionsProcessorsState, + @response=PartitionsProcessorsStateResponse, + @request_target=TargetName::NodeGetPartitionsProcessorsStateRequest, + @response_target=TargetName::NodeGetPartitionsProcessorsStateResponse, } #[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)] -pub struct GetNodeState {} +pub struct GetPartitionsProcessorsState {} #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NodeStateResponse { +pub struct PartitionsProcessorsStateResponse { /// State of paritions processor per parition. Is set to None if this node is not a `Worker` node #[serde_as(as = "Option>")] pub partition_processor_state: Option>, diff --git a/crates/types/src/protobuf.rs b/crates/types/src/protobuf.rs index 1be1d15a9..eb4b612ea 100644 --- a/crates/types/src/protobuf.rs +++ b/crates/types/src/protobuf.rs @@ -60,6 +60,10 @@ pub mod cluster { } } +pub mod deprecated_cluster { + include!(concat!(env!("OUT_DIR"), "/restate.deprecated_cluster.rs")); +} + pub mod node { use opentelemetry::global; use opentelemetry::propagation::{Extractor, Injector}; diff --git a/tools/restatectl/src/commands/partition/list.rs b/tools/restatectl/src/commands/partition/list.rs index 5fc3adb5e..f41ff08c3 100644 --- a/tools/restatectl/src/commands/partition/list.rs +++ b/tools/restatectl/src/commands/partition/list.rs @@ -24,9 +24,8 @@ use restate_cli_util::ui::console::StyledTable; use restate_cli_util::ui::Tense; use restate_types::logs::metadata::{Chain, Logs}; use restate_types::logs::{LogId, Lsn}; -use restate_types::protobuf::cluster::{ - node_state, DeadNode, PartitionProcessorStatus, ReplayStatus, RunMode, SuspectNode, -}; +use restate_types::protobuf::cluster::{PartitionProcessorStatus, ReplayStatus, RunMode}; +use restate_types::protobuf::deprecated_cluster::{node_state, DeadNode, SuspectNode}; use restate_types::storage::StorageCodec; use restate_types::{GenerationalNodeId, PlainNodeId, Version};