From b4182b4c8e73e36ecaa31abe404f1c0921fa04d2 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 10 Oct 2024 19:05:57 +0200 Subject: [PATCH] Let cluster controller create initial partition table This also adds the auto-provision option which tells the cluster controller whether to create an intially empty partition table or a partition table with as many partitions as the configured bootstrap partitions. This fixes #2084. --- .../src/cluster_controller/logs_controller.rs | 17 +-- .../admin/src/cluster_controller/scheduler.rs | 35 ++++-- .../admin/src/cluster_controller/service.rs | 101 +++++++++++++++--- crates/core/src/metadata_store/mod.rs | 31 +++++- crates/node/src/lib.rs | 89 +-------------- .../types/src/config/cli_option_overrides.rs | 7 ++ crates/types/src/config/common.rs | 7 ++ 7 files changed, 174 insertions(+), 113 deletions(-) diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 087e5c7bbc..b13a08fe0c 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -20,8 +20,11 @@ use tracing::debug; use xxhash_rust::xxh3::Xxh3Builder; use restate_bifrost::{Bifrost, BifrostAdmin}; -use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadWriteError, WriteError}; +use restate_core::metadata_store::{ + retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError, +}; use restate_core::{metadata, task_center, Metadata, MetadataWriter, ShutdownError}; +use restate_types::config::Configuration; use restate_types::errors::GenericError; use restate_types::live::Pinned; use restate_types::logs::builder::LogsBuilder; @@ -746,23 +749,25 @@ pub struct LogsController { impl LogsController { pub async fn init( + configuration: &Configuration, metadata: Metadata, bifrost: Bifrost, metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, - default_provider: ProviderKind, ) -> Result { // obtain the latest logs or init it with an empty logs variant - let logs = metadata_store_client - .get_or_insert(BIFROST_CONFIG_KEY.clone(), Logs::default) - .await?; + let logs = retry_on_network_error( + configuration.common.network_error_retry_policy.clone(), + || metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), Logs::default), + ) + .await?; metadata_writer.update(logs).await?; Ok(Self { effects: Some(Vec::new()), inner: LogsControllerInner::new( metadata.logs_ref(), metadata.partition_table_ref().as_ref(), - default_provider, + configuration.bifrost.default_provider, )?, metadata, bifrost, diff --git a/crates/admin/src/cluster_controller/scheduler.rs b/crates/admin/src/cluster_controller/scheduler.rs index 0be2031518..e4f48e5629 100644 --- a/crates/admin/src/cluster_controller/scheduler.rs +++ b/crates/admin/src/cluster_controller/scheduler.rs @@ -14,13 +14,15 @@ use tracing::{debug, trace}; use crate::cluster_controller::observed_cluster_state::ObservedClusterState; use restate_core::metadata_store::{ - MetadataStoreClient, Precondition, ReadError, ReadWriteError, WriteError, + retry_on_network_error, MetadataStoreClient, Precondition, ReadError, ReadWriteError, + WriteError, }; use restate_core::network::{NetworkSender, Networking, Outgoing, TransportConnect}; use restate_core::{metadata, ShutdownError, SyncError, TaskCenter, TaskKind}; use restate_types::cluster_controller::{ ReplicationStrategy, SchedulingPlan, SchedulingPlanBuilder, TargetPartitionState, }; +use restate_types::config::Configuration; use restate_types::identifiers::PartitionId; use restate_types::logs::metadata::Logs; use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY; @@ -75,13 +77,19 @@ pub struct Scheduler { /// scheduling plan). impl Scheduler { pub async fn init( + configuration: &Configuration, task_center: TaskCenter, metadata_store_client: MetadataStoreClient, networking: Networking, ) -> Result { - let scheduling_plan = metadata_store_client - .get_or_insert(SCHEDULING_PLAN_KEY.clone(), SchedulingPlan::default) - .await?; + let scheduling_plan = retry_on_network_error( + configuration.common.network_error_retry_policy.clone(), + || { + metadata_store_client + .get_or_insert(SCHEDULING_PLAN_KEY.clone(), SchedulingPlan::default) + }, + ) + .await?; Ok(Self { scheduling_plan, @@ -415,6 +423,7 @@ mod tests { AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode, }; use restate_types::cluster_controller::{ReplicationStrategy, SchedulingPlan}; + use restate_types::config::Configuration; use restate_types::identifiers::PartitionId; use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY; use restate_types::net::codec::WireDecode; @@ -439,8 +448,13 @@ mod tests { .get::(SCHEDULING_PLAN_KEY.clone()) .await .expect("scheduling plan"); - let mut scheduler = - Scheduler::init(tc, metadata_store_client.clone(), networking).await?; + let mut scheduler = Scheduler::init( + Configuration::pinned().as_ref(), + tc, + metadata_store_client.clone(), + networking, + ) + .await?; let observed_cluster_state = ObservedClusterState::default(); scheduler @@ -546,8 +560,13 @@ mod tests { let tc = env.tc.clone(); env.tc .run_in_scope("test", None, async move { - let mut scheduler = - Scheduler::init(tc, metadata_store_client.clone(), networking).await?; + let mut scheduler = Scheduler::init( + Configuration::pinned().as_ref(), + tc, + metadata_store_client.clone(), + networking, + ) + .await?; let mut observed_cluster_state = ObservedClusterState::default(); for _ in 0..num_scheduling_rounds { diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 783d466cce..f03b48f66d 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -22,12 +22,8 @@ use tokio::time; use tokio::time::{Instant, Interval, MissedTickBehavior}; use tracing::{debug, info, warn}; -use super::cluster_state_refresher::{ClusterStateRefresher, ClusterStateWatcher}; -use crate::cluster_controller::logs_controller::LogsController; -use crate::cluster_controller::observed_cluster_state::ObservedClusterState; -use crate::cluster_controller::scheduler::Scheduler; use restate_bifrost::{Bifrost, BifrostAdmin}; -use restate_core::metadata_store::MetadataStoreClient; +use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; use restate_core::network::rpc_router::RpcRouter; use restate_core::network::{ Incoming, MessageRouterBuilder, NetworkSender, Networking, TransportConnect, @@ -41,11 +37,18 @@ use restate_types::config::{AdminOptions, Configuration}; use restate_types::identifiers::{PartitionId, SnapshotId}; use restate_types::live::Live; use restate_types::logs::{LogId, Lsn, SequenceNumber}; +use restate_types::metadata_store::keys::PARTITION_TABLE_KEY; use restate_types::net::cluster_controller::{AttachRequest, AttachResponse}; use restate_types::net::metadata::MetadataKind; use restate_types::net::partition_processor_manager::CreateSnapshotRequest; +use restate_types::partition_table::PartitionTable; use restate_types::{GenerationalNodeId, Version}; +use super::cluster_state_refresher::{ClusterStateRefresher, ClusterStateWatcher}; +use crate::cluster_controller::logs_controller::LogsController; +use crate::cluster_controller::observed_cluster_state::ObservedClusterState; +use crate::cluster_controller::scheduler::Scheduler; + #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { #[error("error")] @@ -221,11 +224,8 @@ impl Service { bifrost: Bifrost, all_partitions_started_tx: Option>, ) -> anyhow::Result<()> { - // Make sure we have partition table before starting - let _ = self - .metadata - .wait_for_version(MetadataKind::PartitionTable, Version::MIN) - .await?; + self.init_partition_table().await?; + let bifrost_admin = BifrostAdmin::new(&bifrost, &self.metadata_writer, &self.metadata_store_client); @@ -248,7 +248,17 @@ impl Service { )?; } + self.task_center.spawn_child( + TaskKind::SystemService, + "cluster-controller-metadata-sync", + None, + sync_cluster_controller_metadata(self.metadata.clone()), + )?; + + let configuration = self.configuration.live_load(); + let mut scheduler = Scheduler::init( + configuration, self.task_center.clone(), self.metadata_store_client.clone(), self.networking.clone(), @@ -256,18 +266,18 @@ impl Service { .await?; let mut logs_controller = LogsController::init( + configuration, self.metadata.clone(), bifrost.clone(), self.metadata_store_client.clone(), self.metadata_writer.clone(), - self.configuration.live_load().bifrost.default_provider, ) .await?; let mut observed_cluster_state = ObservedClusterState::default(); let mut logs_watcher = self.metadata.watch(MetadataKind::Logs); - let mut partition_watcher = self.metadata.watch(MetadataKind::PartitionTable); + let mut partition_table_watcher = self.metadata.watch(MetadataKind::PartitionTable); let mut logs = self.metadata.updateable_logs_metadata(); let mut partition_table = self.metadata.updateable_partition_table(); @@ -297,8 +307,12 @@ impl Service { // tell the scheduler about potentially newly provisioned logs scheduler.on_logs_update(logs.live_load(), partition_table.live_load()).await? } - Ok(_) = partition_watcher.changed() => { - logs_controller.on_partition_table_update(partition_table.live_load()) + Ok(_) = partition_table_watcher.changed() => { + let partition_table = partition_table.live_load(); + let logs = logs.live_load(); + + logs_controller.on_partition_table_update(partition_table); + scheduler.on_logs_update(logs, partition_table).await?; } Some(cmd) = self.command_rx.recv() => { self.on_cluster_cmd(cmd, bifrost_admin).await; @@ -320,6 +334,36 @@ impl Service { } } + async fn init_partition_table(&mut self) -> anyhow::Result<()> { + let configuration = self.configuration.live_load(); + + let partition_table = retry_on_network_error( + configuration.common.network_error_retry_policy.clone(), + || { + self.metadata_store_client + .get_or_insert(PARTITION_TABLE_KEY.clone(), || { + let partition_table = if configuration.common.auto_provision_partitions { + PartitionTable::with_equally_sized_partitions( + Version::MIN, + configuration.common.bootstrap_num_partitions(), + ) + } else { + PartitionTable::with_equally_sized_partitions(Version::MIN, 0) + }; + + debug!("Initializing the partition table with '{partition_table:?}'"); + + partition_table + }) + }, + ) + .await?; + + self.metadata_writer.update(partition_table).await?; + + Ok(()) + } + async fn trim_logs( &self, bifrost_admin: BifrostAdmin<'_>, @@ -488,6 +532,35 @@ impl Service { } } +async fn sync_cluster_controller_metadata(metadata: Metadata) -> anyhow::Result<()> { + // todo make this configurable + let mut interval = time::interval(Duration::from_secs(10)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + let mut cancel = std::pin::pin!(cancellation_watcher()); + + loop { + tokio::select! { + _ = &mut cancel => { + break; + }, + _ = interval.tick() => { + tokio::select! { + _ = &mut cancel => { + break; + }, + _ = futures::future::join3( + metadata.sync(MetadataKind::NodesConfiguration, TargetVersion::Latest), + metadata.sync(MetadataKind::PartitionTable, TargetVersion::Latest), + metadata.sync(MetadataKind::Logs, TargetVersion::Latest)) => {} + } + } + } + } + + Ok(()) +} + async fn signal_all_partitions_started( mut cluster_state_watcher: ClusterStateWatcher, metadata: Metadata, diff --git a/crates/core/src/metadata_store/mod.rs b/crates/core/src/metadata_store/mod.rs index 576d9b2a72..6e72f44d6c 100644 --- a/crates/core/src/metadata_store/mod.rs +++ b/crates/core/src/metadata_store/mod.rs @@ -20,8 +20,11 @@ use restate_types::errors::GenericError; use restate_types::retries::RetryPolicy; use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode}; use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned}; +use std::future::Future; use std::sync::Arc; -use tracing::debug; +use std::time::{Duration, Instant}; +use tracing::log::trace; +use tracing::{debug, info}; #[derive(Debug, thiserror::Error)] pub enum ReadError { @@ -384,3 +387,29 @@ impl MetadataStoreClientError for WriteError { } static_assertions::assert_impl_all!(MetadataStoreClient: Send, Sync, Clone); + +pub async fn retry_on_network_error(retry_policy: P, action: Fn) -> Result +where + P: Into, + Fn: FnMut() -> Fut, + Fut: Future>, + E: MetadataStoreClientError + std::fmt::Display, +{ + let upsert_start = Instant::now(); + + retry_policy + .into() + .retry_if(action, |err: &E| { + if err.is_network_error() { + if upsert_start.elapsed() < Duration::from_secs(5) { + trace!("could not connect to metadata store: {err}; retrying"); + } else { + info!("could not connect to metadata store: {err}; retrying"); + } + true + } else { + false + } + }) + .await +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index d2d30d9af3..ac1670690a 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -14,13 +14,11 @@ mod roles; use restate_types::errors::GenericError; use restate_types::logs::RecordCache; -use std::future::Future; -use std::time::Duration; use tokio::sync::oneshot; use codederror::CodedError; use restate_bifrost::BifrostService; -use restate_core::metadata_store::{MetadataStoreClientError, ReadWriteError}; +use restate_core::metadata_store::{retry_on_network_error, ReadWriteError}; use restate_core::network::Networking; use restate_core::network::{GrpcConnector, MessageRouterBuilder}; use restate_core::{ @@ -33,13 +31,10 @@ use restate_metadata_store::local::LocalMetadataStoreService; use restate_metadata_store::MetadataStoreClient; use restate_types::config::{CommonOptions, Configuration}; use restate_types::live::Live; -use restate_types::metadata_store::keys::{NODES_CONFIG_KEY, PARTITION_TABLE_KEY}; +use restate_types::metadata_store::keys::NODES_CONFIG_KEY; use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; -use restate_types::partition_table::PartitionTable; -use restate_types::retries::RetryPolicy; use restate_types::Version; -use tokio::time::Instant; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info}; use crate::cluster_marker::ClusterValidationError; use crate::network_server::{AdminDependencies, NetworkServer, WorkerDependencies}; @@ -303,29 +298,7 @@ impl Node { metadata_writer.update(nodes_config).await?; if config.common.allow_bootstrap { - // only try to insert static configuration if in bootstrap mode - let partition_table = - Self::fetch_or_insert_initial_configuration(&self.metadata_store_client, &config) - .await?; - - metadata_writer.update(partition_table).await?; - } else { - // otherwise, just sync the required metadata - metadata - .sync(MetadataKind::PartitionTable, TargetVersion::Latest) - .await?; - metadata - .sync(MetadataKind::Logs, TargetVersion::Latest) - .await?; - - // safety check until we can tolerate missing partition table and logs configuration - if metadata.partition_table_version() == Version::INVALID { - return Err(Error::SafetyCheck( - format!( - "Missing partition table for cluster '{}'. This indicates that the cluster bootstrap is incomplete. Please re-run with '--allow-bootstrap true'.", - config.common.cluster_name(), - )))?; - } + // todo write bootstrap state } // fetch the latest schema information @@ -454,37 +427,11 @@ impl Node { Ok(()) } - async fn fetch_or_insert_initial_configuration( - metadata_store_client: &MetadataStoreClient, - options: &Configuration, - ) -> Result { - let partition_table = - Self::fetch_or_insert_partition_table(metadata_store_client, options).await?; - - Ok(partition_table) - } - - async fn fetch_or_insert_partition_table( - metadata_store_client: &MetadataStoreClient, - config: &Configuration, - ) -> Result { - Self::retry_on_network_error(config.common.network_error_retry_policy.clone(), || { - metadata_store_client.get_or_insert(PARTITION_TABLE_KEY.clone(), || { - PartitionTable::with_equally_sized_partitions( - Version::MIN, - config.common.bootstrap_num_partitions(), - ) - }) - }) - .await - .map_err(Into::into) - } - async fn upsert_node_config( metadata_store_client: &MetadataStoreClient, common_opts: &CommonOptions, ) -> Result { - Self::retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { let mut previous_node_generation = None; metadata_store_client.read_modify_write(NODES_CONFIG_KEY.clone(), move |nodes_config| { let mut nodes_config = if common_opts.allow_bootstrap { @@ -566,32 +513,6 @@ impl Node { .map_err(|err| err.transpose()) } - async fn retry_on_network_error(retry_policy: P, action: Fn) -> Result - where - P: Into, - Fn: FnMut() -> Fut, - Fut: Future>, - E: MetadataStoreClientError + std::fmt::Display, - { - let upsert_start = Instant::now(); - - retry_policy - .into() - .retry_if(action, |err: &E| { - if err.is_network_error() { - if upsert_start.elapsed() < Duration::from_secs(5) { - trace!("could not connect to metadata store: {err}; retrying"); - } else { - info!("could not connect to metadata store: {err}; retrying"); - } - true - } else { - false - } - }) - .await - } - pub fn bifrost(&self) -> restate_bifrost::Bifrost { self.bifrost.handle() } diff --git a/crates/types/src/config/cli_option_overrides.rs b/crates/types/src/config/cli_option_overrides.rs index 793c849786..6a1209b99b 100644 --- a/crates/types/src/config/cli_option_overrides.rs +++ b/crates/types/src/config/cli_option_overrides.rs @@ -82,6 +82,13 @@ pub struct CommonOptionCliOverride { #[clap(long, global = true)] pub bootstrap_num_partitions: Option, + /// # Automatically provision number of configured partitions + /// + /// If this option is set to `false`, then one needs to manually write a partition table to + /// the metadata store. Without a partition table, the cluster will not start. + #[clap(long, global = true)] + pub auto_provision_partitions: Option, + /// This timeout is used when shutting down the various Restate components to drain all the internal queues. #[serde_as(as = "Option")] #[clap(long, global = true)] diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index db57c3cb5f..db481abbf9 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -215,6 +215,12 @@ pub struct CommonOptions { /// /// The retry policy for node network error pub network_error_retry_policy: RetryPolicy, + + /// # Automatically provision number of configured partitions + /// + /// If this option is set to `false`, then one needs to manually write a partition table to + /// the metadata store. Without a partition table, the cluster will not start. + pub auto_provision_partitions: bool, } static HOSTNAME: Lazy = Lazy::new(|| { @@ -359,6 +365,7 @@ impl Default for CommonOptions { Some(15), Some(Duration::from_secs(5)), ), + auto_provision_partitions: true, } } }