Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose a partition worker CreateSnapshot RPC #1998

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/admin/protobuf/cluster_ctrl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ service ClusterCtrlSvc {
rpc ListNodes(ListNodesRequest) returns (ListNodesResponse);

rpc TrimLog(TrimLogRequest) returns (google.protobuf.Empty);

rpc CreatePartitionSnapshot(CreatePartitionSnapshotRequest) returns (CreatePartitionSnapshotResponse);
}

message ClusterStateRequest { }
Expand Down Expand Up @@ -71,3 +73,11 @@ message TrimLogRequest {
uint32 log_id = 1;
uint64 trim_point = 2;
}

message CreatePartitionSnapshotRequest {
uint32 partition_id = 1;
}

message CreatePartitionSnapshotResponse {
string snapshot_id = 1;
}
145 changes: 142 additions & 3 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
use std::collections::BTreeMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use codederror::CodedError;
use futures::future::OptionFuture;
use futures::{Stream, StreamExt};
Expand All @@ -22,18 +24,22 @@ use tracing::{debug, info, warn};

use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::{Incoming, MessageRouterBuilder, Networking, TransportConnect};
use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::{
Incoming, MessageRouterBuilder, NetworkSender, Networking, TransportConnect,
};
use restate_core::{
cancellation_watcher, Metadata, MetadataWriter, ShutdownError, TargetVersion, TaskCenter,
TaskKind,
};
use restate_types::cluster::cluster_state::{AliveNode, ClusterState, NodeState};
use restate_types::config::{AdminOptions, Configuration};
use restate_types::identifiers::PartitionId;
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::live::Live;
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::net::cluster_controller::{AttachRequest, AttachResponse};
use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor_manager::CreateSnapshotRequest;
use restate_types::{GenerationalNodeId, Version};

use super::cluster_state::{ClusterStateRefresher, ClusterStateWatcher};
Expand All @@ -52,6 +58,7 @@ pub struct Service<T> {
networking: Networking<T>,
incoming_messages: Pin<Box<dyn Stream<Item = Incoming<AttachRequest>> + Send + Sync + 'static>>,
cluster_state_refresher: ClusterStateRefresher<T>,
processor_manager_client: PartitionProcessorManagerClient<Networking<T>>,
command_tx: mpsc::Sender<ClusterControllerCommand>,
command_rx: mpsc::Receiver<ClusterControllerCommand>,

Expand Down Expand Up @@ -86,8 +93,10 @@ where
router_builder,
);

let options = configuration.live_load();
let processor_manager_client =
PartitionProcessorManagerClient::new(networking.clone(), router_builder);

let options = configuration.live_load();
let heartbeat_interval = Self::create_heartbeat_interval(&options.admin);
let (log_trim_interval, log_trim_threshold) =
Self::create_log_trim_interval(&options.admin);
Expand All @@ -101,6 +110,7 @@ where
cluster_state_refresher,
metadata_writer,
metadata_store_client,
processor_manager_client,
command_tx,
command_rx,
heartbeat_interval,
Expand Down Expand Up @@ -132,13 +142,18 @@ where
}
}

#[derive(Debug)]
enum ClusterControllerCommand {
GetClusterState(oneshot::Sender<Arc<ClusterState>>),
TrimLog {
log_id: LogId,
trim_point: Lsn,
response_tx: oneshot::Sender<anyhow::Result<()>>,
},
CreateSnapshot {
partition_id: PartitionId,
response_tx: oneshot::Sender<anyhow::Result<SnapshotId>>,
},
}

pub struct ClusterControllerHandle {
Expand Down Expand Up @@ -174,6 +189,23 @@ impl ClusterControllerHandle {

rx.await.map_err(|_| ShutdownError)
}

pub async fn create_partition_snapshot(
&self,
partition_id: PartitionId,
) -> Result<Result<SnapshotId, anyhow::Error>, ShutdownError> {
let (tx, rx) = oneshot::channel();

let _ = self
.tx
.send(ClusterControllerCommand::CreateSnapshot {
partition_id,
response_tx: tx,
})
.await;

rx.await.map_err(|_| ShutdownError)
}
}

impl<T: TransportConnect> Service<T> {
Expand Down Expand Up @@ -318,6 +350,61 @@ impl<T: TransportConnect> Service<T> {
Ok(())
}

/// Triggers a snapshot creation for the given partition by issuing an RPC
/// to the node hosting the active leader.
async fn create_partition_snapshot(
&self,
partition_id: PartitionId,
response_tx: oneshot::Sender<anyhow::Result<SnapshotId>>,
) {
let cluster_state = self.cluster_state_refresher.get_cluster_state();

// For now, we just pick the leader node since we know that every partition is likely to
// have one. We'll want to update the algorithm to be smart about scheduling snapshot tasks
// in the future to avoid disrupting the leader when there are up-to-date followers.
let leader_node = cluster_state
.alive_nodes()
.filter_map(|node| {
node.partitions
.get(&partition_id)
.filter(|status| status.is_effective_leader())
.map(|_| node)
.cloned()
})
.next();

match leader_node {
Some(node) => {
debug!(
node_id = %node.generational_node_id,
?partition_id,
"Asking node to snapshot partition"
);

let mut node_rpc_client = self.processor_manager_client.clone();
let _ = self.task_center.spawn_child(
TaskKind::Disposable,
"create-snapshot-response",
Some(partition_id),
async move {
let _ = response_tx.send(
node_rpc_client
.create_snapshot(node.generational_node_id, partition_id)
.await,
);
Ok(())
},
);
}

None => {
let _ = response_tx.send(Err(anyhow::anyhow!(
"Can not find a suitable node to take snapshot of partition {partition_id}"
)));
}
};
}

async fn on_cluster_cmd(
&self,
command: ClusterControllerCommand,
Expand All @@ -339,6 +426,14 @@ impl<T: TransportConnect> Service<T> {
let result = bifrost_admin.trim(log_id, trim_point).await;
let _ = response_tx.send(result.map_err(Into::into));
}
ClusterControllerCommand::CreateSnapshot {
partition_id,
response_tx,
} => {
info!(?partition_id, "Create snapshot command received");
self.create_partition_snapshot(partition_id, response_tx)
.await;
}
}
}

Expand Down Expand Up @@ -408,6 +503,50 @@ async fn signal_all_partitions_started(
}
}

#[derive(Clone)]
struct PartitionProcessorManagerClient<N>
where
N: Clone,
{
network_sender: N,
create_snapshot_router: RpcRouter<CreateSnapshotRequest>,
}

impl<N> PartitionProcessorManagerClient<N>
where
N: NetworkSender + 'static,
{
pub fn new(network_sender: N, router_builder: &mut MessageRouterBuilder) -> Self {
let create_snapshot_router = RpcRouter::new(router_builder);

PartitionProcessorManagerClient {
network_sender,
create_snapshot_router,
}
}

pub async fn create_snapshot(
&mut self,
node_id: GenerationalNodeId,
partition_id: PartitionId,
) -> anyhow::Result<SnapshotId> {
// todo(pavel): make snapshot RPC timeout configurable, especially if this includes remote upload in the future
let response = tokio::time::timeout(
Duration::from_secs(30),
self.create_snapshot_router.call(
&self.network_sender,
node_id,
CreateSnapshotRequest { partition_id },
),
)
.await?;
let create_snapshot_response = response?.into_body();
create_snapshot_response
.result
.map_err(|e| anyhow!("Failed to create snapshot: {:?}", e))
}
}

#[cfg(test)]
mod tests {
use super::Service;
Expand Down
34 changes: 32 additions & 2 deletions crates/node/src/network_server/handler/cluster_ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ use tracing::{debug, info};

use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvc;
use restate_admin::cluster_controller::protobuf::{
ClusterStateRequest, ClusterStateResponse, DescribeLogRequest, DescribeLogResponse,
ListLogsRequest, ListLogsResponse, ListNodesRequest, ListNodesResponse, TrimLogRequest,
ClusterStateRequest, ClusterStateResponse, CreatePartitionSnapshotRequest,
CreatePartitionSnapshotResponse, DescribeLogRequest, DescribeLogResponse, ListLogsRequest,
ListLogsResponse, ListNodesRequest, ListNodesResponse, TrimLogRequest,
};
use restate_admin::cluster_controller::ClusterControllerHandle;
use restate_bifrost::{Bifrost, FindTailAttributes};
use restate_metadata_store::MetadataStoreClient;
use restate_types::identifiers::PartitionId;
use restate_types::logs::metadata::Logs;
use restate_types::logs::{LogId, Lsn};
use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, NODES_CONFIG_KEY};
Expand Down Expand Up @@ -171,6 +173,34 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
}
Ok(Response::new(()))
}

/// Handles ad-hoc snapshot requests, as sent by `restatectl snapshots create`. This is
/// implemented as an RPC call within the cluster to a worker node hosting the partition.
async fn create_partition_snapshot(
&self,
request: Request<CreatePartitionSnapshotRequest>,
) -> Result<Response<CreatePartitionSnapshotResponse>, Status> {
let request = request.into_inner();
let partition_id = PartitionId::from(
u16::try_from(request.partition_id)
.map_err(|id| Status::invalid_argument(format!("Invalid partition id: {id}")))?,
);

match self
.controller_handle
.create_partition_snapshot(partition_id)
.await
.map_err(|_| Status::aborted("Node is shutting down"))?
{
Err(err) => {
info!("Failed creating partition snapshot: {err}");
Err(Status::internal(err.to_string()))
}
Ok(snapshot_id) => Ok(Response::new(CreatePartitionSnapshotResponse {
snapshot_id: snapshot_id.to_string(),
})),
}
}
}

fn serialize_value<T: StorageEncode>(value: T) -> Bytes {
Expand Down
5 changes: 4 additions & 1 deletion crates/types/protobuf/restate/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ message NodeId {
optional uint32 generation = 2;
}

// Partition Processor leadershop epoch number
// Partition Processor leadership epoch number
message LeaderEpoch { uint64 value = 1; }

// Log sequence number
Expand Down Expand Up @@ -66,6 +66,9 @@ enum TargetName {
REPLICATED_LOGLET_APPENDED = 41;
REPLICATED_LOGLET_GET_SEQUENCER_STATE = 42;
REPLICATED_LOGLET_SEQUENCER_STATE = 43;
// Partition Processor
PARTITION_CREATE_SNAPSHOT_REQUEST = 50;
PARTITION_CREATE_SNAPSHOT_RESPONSE = 51;
}

enum NodeStatus {
Expand Down
24 changes: 23 additions & 1 deletion crates/types/src/net/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
use serde_with::serde_as;

use crate::cluster::cluster_state::{PartitionProcessorStatus, RunMode};
use crate::identifiers::PartitionId;
use crate::identifiers::{PartitionId, SnapshotId};
use crate::net::{define_message, TargetName};

use crate::net::define_rpc;
Expand Down Expand Up @@ -69,3 +69,25 @@ impl From<RunMode> for ProcessorCommand {
}
}
}

define_rpc! {
@request = CreateSnapshotRequest,
@response = CreateSnapshotResponse,
@request_target = TargetName::PartitionCreateSnapshotRequest,
@response_target = TargetName::PartitionCreateSnapshotResponse,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateSnapshotRequest {
pub partition_id: PartitionId,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateSnapshotResponse {
pub result: Result<SnapshotId, SnapshotError>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SnapshotError {
SnapshotCreationFailed(String),
}
12 changes: 7 additions & 5 deletions crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,10 @@ impl<T: TransportConnect> Worker<T> {

// ingress_kafka
let ingress_kafka = IngressKafkaService::new(ingress_dispatcher.clone());
let subscription_controller_handle =
subscription_integration::SubscriptionControllerHandle::new(
config.ingress.clone(),
ingress_kafka.create_command_sender(),
);
let subscription_controller_handle = SubscriptionControllerHandle::new(
config.ingress.clone(),
ingress_kafka.create_command_sender(),
);

let partition_store_manager = PartitionStoreManager::create(
updateable_config.clone().map(|c| &c.worker.storage),
Expand Down Expand Up @@ -154,6 +153,9 @@ impl<T: TransportConnect> Worker<T> {
bifrost,
);

// handle RPCs
router_builder.add_message_handler(partition_processor_manager.message_handler());

let storage_query_context = QueryContext::create(
&config.admin.query_engine,
partition_processor_manager.handle(),
Expand Down
Loading
Loading