From d349d16df5afee0115c5c20766904166c4eb50c8 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Wed, 25 Sep 2024 23:17:55 +0200 Subject: [PATCH] Expose a partition worker CreateSnapshot RPC stack-info: PR: https://github.com/restatedev/restate/pull/1998, branch: pcholakov/stack/1 --- crates/admin/protobuf/cluster_ctrl_svc.proto | 10 ++ .../admin/src/cluster_controller/service.rs | 145 +++++++++++++++++- .../network_server/handler/cluster_ctrl.rs | 34 +++- crates/types/protobuf/restate/common.proto | 5 +- .../src/net/partition_processor_manager.rs | 24 ++- crates/worker/src/lib.rs | 12 +- .../worker/src/partition_processor_manager.rs | 75 ++++++++- tools/restatectl/src/app.rs | 4 + tools/restatectl/src/commands/mod.rs | 1 + .../src/commands/snapshot/create_snapshot.rs | 59 +++++++ tools/restatectl/src/commands/snapshot/mod.rs | 19 +++ 11 files changed, 372 insertions(+), 16 deletions(-) create mode 100644 tools/restatectl/src/commands/snapshot/create_snapshot.rs create mode 100644 tools/restatectl/src/commands/snapshot/mod.rs diff --git a/crates/admin/protobuf/cluster_ctrl_svc.proto b/crates/admin/protobuf/cluster_ctrl_svc.proto index 8ca57a2d9..dd19ed77d 100644 --- a/crates/admin/protobuf/cluster_ctrl_svc.proto +++ b/crates/admin/protobuf/cluster_ctrl_svc.proto @@ -25,6 +25,8 @@ service ClusterCtrlSvc { rpc ListNodes(ListNodesRequest) returns (ListNodesResponse); rpc TrimLog(TrimLogRequest) returns (google.protobuf.Empty); + + rpc CreatePartitionSnapshot(CreatePartitionSnapshotRequest) returns (CreatePartitionSnapshotResponse); } message ClusterStateRequest { } @@ -71,3 +73,11 @@ message TrimLogRequest { uint32 log_id = 1; uint64 trim_point = 2; } + +message CreatePartitionSnapshotRequest { + uint32 partition_id = 1; +} + +message CreatePartitionSnapshotResponse { + string snapshot_id = 1; +} diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 0e4783138..44b5058cc 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -11,7 +11,9 @@ use std::collections::BTreeMap; use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; +use anyhow::anyhow; use codederror::CodedError; use futures::future::OptionFuture; use futures::{Stream, StreamExt}; @@ -22,18 +24,22 @@ use tracing::{debug, info, warn}; use restate_bifrost::{Bifrost, BifrostAdmin}; use restate_core::metadata_store::MetadataStoreClient; -use restate_core::network::{Incoming, MessageRouterBuilder, Networking, TransportConnect}; +use restate_core::network::rpc_router::RpcRouter; +use restate_core::network::{ + Incoming, MessageRouterBuilder, NetworkSender, Networking, TransportConnect, +}; use restate_core::{ cancellation_watcher, Metadata, MetadataWriter, ShutdownError, TargetVersion, TaskCenter, TaskKind, }; use restate_types::cluster::cluster_state::{AliveNode, ClusterState, NodeState}; use restate_types::config::{AdminOptions, Configuration}; -use restate_types::identifiers::PartitionId; +use restate_types::identifiers::{PartitionId, SnapshotId}; use restate_types::live::Live; use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_types::net::cluster_controller::{AttachRequest, AttachResponse}; use restate_types::net::metadata::MetadataKind; +use restate_types::net::partition_processor_manager::CreateSnapshotRequest; use restate_types::{GenerationalNodeId, Version}; use super::cluster_state::{ClusterStateRefresher, ClusterStateWatcher}; @@ -52,6 +58,7 @@ pub struct Service { networking: Networking, incoming_messages: Pin> + Send + Sync + 'static>>, cluster_state_refresher: ClusterStateRefresher, + processor_manager_client: PartitionProcessorManagerClient>, command_tx: mpsc::Sender, command_rx: mpsc::Receiver, @@ -86,8 +93,10 @@ where router_builder, ); - let options = configuration.live_load(); + let processor_manager_client = + PartitionProcessorManagerClient::new(networking.clone(), router_builder); + let options = configuration.live_load(); let heartbeat_interval = Self::create_heartbeat_interval(&options.admin); let (log_trim_interval, log_trim_threshold) = Self::create_log_trim_interval(&options.admin); @@ -101,6 +110,7 @@ where cluster_state_refresher, metadata_writer, metadata_store_client, + processor_manager_client, command_tx, command_rx, heartbeat_interval, @@ -132,6 +142,7 @@ where } } +#[derive(Debug)] enum ClusterControllerCommand { GetClusterState(oneshot::Sender>), TrimLog { @@ -139,6 +150,10 @@ enum ClusterControllerCommand { trim_point: Lsn, response_tx: oneshot::Sender>, }, + CreateSnapshot { + partition_id: PartitionId, + response_tx: oneshot::Sender>, + }, } pub struct ClusterControllerHandle { @@ -174,6 +189,23 @@ impl ClusterControllerHandle { rx.await.map_err(|_| ShutdownError) } + + pub async fn create_partition_snapshot( + &self, + partition_id: PartitionId, + ) -> Result, ShutdownError> { + let (tx, rx) = oneshot::channel(); + + let _ = self + .tx + .send(ClusterControllerCommand::CreateSnapshot { + partition_id, + response_tx: tx, + }) + .await; + + rx.await.map_err(|_| ShutdownError) + } } impl Service { @@ -318,6 +350,61 @@ impl Service { Ok(()) } + /// Triggers a snapshot creation for the given partition by issuing an RPC + /// to the node hosting the active leader. + async fn create_partition_snapshot( + &self, + partition_id: PartitionId, + response_tx: oneshot::Sender>, + ) { + let cluster_state = self.cluster_state_refresher.get_cluster_state(); + + // For now, we just pick the leader node since we know that every partition is likely to + // have one. We'll want to update the algorithm to be smart about scheduling snapshot tasks + // in the future to avoid disrupting the leader when there are up-to-date followers. + let leader_node = cluster_state + .alive_nodes() + .filter_map(|node| { + node.partitions + .get(&partition_id) + .filter(|status| status.is_effective_leader()) + .map(|_| node) + .cloned() + }) + .next(); + + match leader_node { + Some(node) => { + debug!( + node_id = %node.generational_node_id, + ?partition_id, + "Asking node to snapshot partition" + ); + + let mut node_rpc_client = self.processor_manager_client.clone(); + let _ = self.task_center.spawn_child( + TaskKind::Disposable, + "create-snapshot-response", + Some(partition_id), + async move { + let _ = response_tx.send( + node_rpc_client + .create_snapshot(node.generational_node_id, partition_id) + .await, + ); + Ok(()) + }, + ); + } + + None => { + let _ = response_tx.send(Err(anyhow::anyhow!( + "Can not find a suitable node to take snapshot of partition {partition_id}" + ))); + } + }; + } + async fn on_cluster_cmd( &self, command: ClusterControllerCommand, @@ -339,6 +426,14 @@ impl Service { let result = bifrost_admin.trim(log_id, trim_point).await; let _ = response_tx.send(result.map_err(Into::into)); } + ClusterControllerCommand::CreateSnapshot { + partition_id, + response_tx, + } => { + info!(?partition_id, "Create snapshot command received"); + self.create_partition_snapshot(partition_id, response_tx) + .await; + } } } @@ -408,6 +503,50 @@ async fn signal_all_partitions_started( } } +#[derive(Clone)] +struct PartitionProcessorManagerClient +where + N: Clone, +{ + network_sender: N, + create_snapshot_router: RpcRouter, +} + +impl PartitionProcessorManagerClient +where + N: NetworkSender + 'static, +{ + pub fn new(network_sender: N, router_builder: &mut MessageRouterBuilder) -> Self { + let create_snapshot_router = RpcRouter::new(router_builder); + + PartitionProcessorManagerClient { + network_sender, + create_snapshot_router, + } + } + + pub async fn create_snapshot( + &mut self, + node_id: GenerationalNodeId, + partition_id: PartitionId, + ) -> anyhow::Result { + // todo(pavel): make snapshot RPC timeout configurable, especially if this includes remote upload in the future + let response = tokio::time::timeout( + Duration::from_secs(30), + self.create_snapshot_router.call( + &self.network_sender, + node_id, + CreateSnapshotRequest { partition_id }, + ), + ) + .await?; + let create_snapshot_response = response?.into_body(); + create_snapshot_response + .result + .map_err(|e| anyhow!("Failed to create snapshot: {:?}", e)) + } +} + #[cfg(test)] mod tests { use super::Service; diff --git a/crates/node/src/network_server/handler/cluster_ctrl.rs b/crates/node/src/network_server/handler/cluster_ctrl.rs index f3534c029..c774d3a46 100644 --- a/crates/node/src/network_server/handler/cluster_ctrl.rs +++ b/crates/node/src/network_server/handler/cluster_ctrl.rs @@ -14,12 +14,14 @@ use tracing::{debug, info}; use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvc; use restate_admin::cluster_controller::protobuf::{ - ClusterStateRequest, ClusterStateResponse, DescribeLogRequest, DescribeLogResponse, - ListLogsRequest, ListLogsResponse, ListNodesRequest, ListNodesResponse, TrimLogRequest, + ClusterStateRequest, ClusterStateResponse, CreatePartitionSnapshotRequest, + CreatePartitionSnapshotResponse, DescribeLogRequest, DescribeLogResponse, ListLogsRequest, + ListLogsResponse, ListNodesRequest, ListNodesResponse, TrimLogRequest, }; use restate_admin::cluster_controller::ClusterControllerHandle; use restate_bifrost::{Bifrost, FindTailAttributes}; use restate_metadata_store::MetadataStoreClient; +use restate_types::identifiers::PartitionId; use restate_types::logs::metadata::Logs; use restate_types::logs::{LogId, Lsn}; use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, NODES_CONFIG_KEY}; @@ -171,6 +173,34 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { } Ok(Response::new(())) } + + /// Handles ad-hoc snapshot requests, as sent by `restatectl snapshots create`. This is + /// implemented as an RPC call within the cluster to a worker node hosting the partition. + async fn create_partition_snapshot( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let partition_id = PartitionId::from( + u16::try_from(request.partition_id) + .map_err(|id| Status::invalid_argument(format!("Invalid partition id: {id}")))?, + ); + + match self + .controller_handle + .create_partition_snapshot(partition_id) + .await + .map_err(|_| Status::aborted("Node is shutting down"))? + { + Err(err) => { + info!("Failed creating partition snapshot: {err}"); + Err(Status::internal(err.to_string())) + } + Ok(snapshot_id) => Ok(Response::new(CreatePartitionSnapshotResponse { + snapshot_id: snapshot_id.to_string(), + })), + } + } } fn serialize_value(value: T) -> Bytes { diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index 9d0dd1e5f..fd94ef05a 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -21,7 +21,7 @@ message NodeId { optional uint32 generation = 2; } -// Partition Processor leadershop epoch number +// Partition Processor leadership epoch number message LeaderEpoch { uint64 value = 1; } // Log sequence number @@ -66,6 +66,9 @@ enum TargetName { REPLICATED_LOGLET_APPENDED = 41; REPLICATED_LOGLET_GET_SEQUENCER_STATE = 42; REPLICATED_LOGLET_SEQUENCER_STATE = 43; + // Partition Processor + PARTITION_CREATE_SNAPSHOT_REQUEST = 50; + PARTITION_CREATE_SNAPSHOT_RESPONSE = 51; } enum NodeStatus { diff --git a/crates/types/src/net/partition_processor_manager.rs b/crates/types/src/net/partition_processor_manager.rs index ab1bdcc35..6b721ad7a 100644 --- a/crates/types/src/net/partition_processor_manager.rs +++ b/crates/types/src/net/partition_processor_manager.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; use crate::cluster::cluster_state::{PartitionProcessorStatus, RunMode}; -use crate::identifiers::PartitionId; +use crate::identifiers::{PartitionId, SnapshotId}; use crate::net::{define_message, TargetName}; use crate::net::define_rpc; @@ -69,3 +69,25 @@ impl From for ProcessorCommand { } } } + +define_rpc! { + @request = CreateSnapshotRequest, + @response = CreateSnapshotResponse, + @request_target = TargetName::PartitionCreateSnapshotRequest, + @response_target = TargetName::PartitionCreateSnapshotResponse, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateSnapshotRequest { + pub partition_id: PartitionId, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateSnapshotResponse { + pub result: Result, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum SnapshotError { + SnapshotCreationFailed(String), +} diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 864076fad..78ebc2a27 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -119,11 +119,10 @@ impl Worker { // ingress_kafka let ingress_kafka = IngressKafkaService::new(ingress_dispatcher.clone()); - let subscription_controller_handle = - subscription_integration::SubscriptionControllerHandle::new( - config.ingress.clone(), - ingress_kafka.create_command_sender(), - ); + let subscription_controller_handle = SubscriptionControllerHandle::new( + config.ingress.clone(), + ingress_kafka.create_command_sender(), + ); let partition_store_manager = PartitionStoreManager::create( updateable_config.clone().map(|c| &c.worker.storage), @@ -154,6 +153,9 @@ impl Worker { bifrost, ); + // handle RPCs + router_builder.add_message_handler(partition_processor_manager.message_handler()); + let storage_query_context = QueryContext::create( &config.admin.query_engine, partition_processor_manager.handle(), diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 637e89f8b..f95fb50c5 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -28,9 +28,9 @@ use tracing::{debug, info, instrument, trace, warn}; use restate_bifrost::Bifrost; use restate_core::network::rpc_router::{RpcError, RpcRouter}; use restate_core::network::{Incoming, MessageRouterBuilder}; -use restate_core::network::{Networking, TransportConnect}; +use restate_core::network::{MessageHandler, Networking, TransportConnect}; use restate_core::worker_api::{ProcessorsManagerCommand, ProcessorsManagerHandle}; -use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskId, TaskKind}; +use restate_core::{cancellation_watcher, task_center, Metadata, ShutdownError, TaskId, TaskKind}; use restate_core::{RuntimeError, TaskCenter}; use restate_invoker_api::StatusHandle; use restate_invoker_impl::Service as InvokerService; @@ -52,9 +52,12 @@ use restate_types::logs::SequenceNumber; use restate_types::metadata_store::keys::partition_processor_epoch_key; use restate_types::net::cluster_controller::AttachRequest; use restate_types::net::cluster_controller::{Action, AttachResponse}; -use restate_types::net::partition_processor_manager::ProcessorsStateResponse; use restate_types::net::partition_processor_manager::{ - ControlProcessor, ControlProcessors, GetProcessorsState, ProcessorCommand, + ControlProcessor, ControlProcessors, CreateSnapshotResponse, GetProcessorsState, + ProcessorCommand, SnapshotError, +}; +use restate_types::net::partition_processor_manager::{ + CreateSnapshotRequest, ProcessorsStateResponse, }; use restate_types::partition_table::PartitionTable; use restate_types::schema::Schema; @@ -251,6 +254,66 @@ impl PartitionProcessorHandle { } } +/// RPC message handler for Partition Processor management operations. +pub struct PartitionProcessorManagerMessageHandler { + processors_manager_handle: ProcessorsManagerHandle, +} + +impl PartitionProcessorManagerMessageHandler { + fn new( + processors_manager_handle: ProcessorsManagerHandle, + ) -> PartitionProcessorManagerMessageHandler { + Self { + processors_manager_handle, + } + } +} + +impl MessageHandler for PartitionProcessorManagerMessageHandler { + type MessageType = CreateSnapshotRequest; + + async fn on_message(&self, msg: Incoming) { + debug!("Received '{:?}' from {}", msg.body(), msg.peer()); + + let processors_manager_handle = self.processors_manager_handle.clone(); + task_center() + .spawn_child( + TaskKind::Disposable, + "create-snapshot-request-rpc", + None, + async move { + let create_snapshot_result = processors_manager_handle + .create_snapshot(msg.body().partition_id) + .await; + debug!( + partition_id = ?msg.body().partition_id, + result = ?create_snapshot_result, + "Create snapshot completed", + ); + + match create_snapshot_result.as_ref() { + Ok(snapshot_id) => msg.to_rpc_response(CreateSnapshotResponse { + result: Ok(*snapshot_id), + }), + Err(error) => msg.to_rpc_response(CreateSnapshotResponse { + result: Err(SnapshotError::SnapshotCreationFailed(error.to_string())), + }), + } + .send() + .await + .map_err(|e| { + warn!(result = ?create_snapshot_result, "Failed to send response: {}", e); + anyhow::anyhow!("Failed to send response to create snapshot request: {}", e) + }) + }, + ) + .map_err(|e| { + warn!("Failed to spawn request handler: {}", e); + }) + .ok(); + } +} + type ChannelStatusReaderList = Vec<(RangeInclusive, ChannelStatusReader)>; #[derive(Debug, Clone, Default)] @@ -336,6 +399,10 @@ impl PartitionProcessorManager { ProcessorsManagerHandle::new(self.tx.clone()) } + pub(crate) fn message_handler(&self) -> PartitionProcessorManagerMessageHandler { + PartitionProcessorManagerMessageHandler::new(self.handle()) + } + async fn attach(&mut self) -> Result, AttachError> { loop { // We try to get the admin node on every retry since it might change between retries. diff --git a/tools/restatectl/src/app.rs b/tools/restatectl/src/app.rs index 1a0cb97ce..87a2ae04c 100644 --- a/tools/restatectl/src/app.rs +++ b/tools/restatectl/src/app.rs @@ -21,6 +21,7 @@ use crate::commands::log::Log; use crate::commands::metadata::Metadata; use crate::commands::node::Node; use crate::commands::partitions::Partitions; +use crate::commands::snapshot::Snapshot; #[derive(Run, Parser, Clone)] #[command(author, version = crate::build_info::version(), about, infer_subcommands = true)] @@ -58,6 +59,9 @@ pub enum Command { /// Cluster metadata #[clap(subcommand)] Metadata(Metadata), + /// Partition processor snapshots + #[clap(subcommand)] + Snapshots(Snapshot), } fn init(common_opts: &CommonOpts) { diff --git a/tools/restatectl/src/commands/mod.rs b/tools/restatectl/src/commands/mod.rs index de7ac7633..c1860ee20 100644 --- a/tools/restatectl/src/commands/mod.rs +++ b/tools/restatectl/src/commands/mod.rs @@ -14,3 +14,4 @@ pub mod log; pub mod metadata; pub mod node; pub mod partitions; +pub mod snapshot; diff --git a/tools/restatectl/src/commands/snapshot/create_snapshot.rs b/tools/restatectl/src/commands/snapshot/create_snapshot.rs new file mode 100644 index 000000000..0f3f1ae8d --- /dev/null +++ b/tools/restatectl/src/commands/snapshot/create_snapshot.rs @@ -0,0 +1,59 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use anyhow::Context; +use cling::prelude::*; +use tonic::codec::CompressionEncoding; + +use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient; +use restate_admin::cluster_controller::protobuf::CreatePartitionSnapshotRequest; +use restate_cli_util::c_println; + +use crate::app::ConnectionInfo; +use crate::util::grpc_connect; + +#[derive(Run, Parser, Collect, Clone, Debug)] +#[clap(visible_alias = "create")] +#[cling(run = "create_snapshot")] +pub struct CreateSnapshotOpts { + /// The partition to snapshot + #[arg(short, long)] + partition_id: u16, +} + +async fn create_snapshot( + connection: &ConnectionInfo, + opts: &CreateSnapshotOpts, +) -> anyhow::Result<()> { + let channel = grpc_connect(connection.cluster_controller.clone()) + .await + .with_context(|| { + format!( + "cannot connect to cluster controller at {}", + connection.cluster_controller + ) + })?; + let mut client = + ClusterCtrlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); + + let request = CreatePartitionSnapshotRequest { + partition_id: opts.partition_id as u32, + }; + + let response = client + .create_partition_snapshot(request) + .await + .map_err(|e| anyhow::anyhow!("failed to request snapshot: {:?}", e))? + .into_inner(); + + c_println!("Snapshot created: {}", response.snapshot_id); + + Ok(()) +} diff --git a/tools/restatectl/src/commands/snapshot/mod.rs b/tools/restatectl/src/commands/snapshot/mod.rs new file mode 100644 index 000000000..2ef179e6a --- /dev/null +++ b/tools/restatectl/src/commands/snapshot/mod.rs @@ -0,0 +1,19 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod create_snapshot; + +use cling::prelude::*; + +#[derive(Run, Subcommand, Clone)] +pub enum Snapshot { + /// Create. + CreateSnapshot(create_snapshot::CreateSnapshotOpts), +}