Skip to content

Commit

Permalink
Let cluster controller create initial partition table
Browse files Browse the repository at this point in the history
This also adds the auto-provision option which tells the cluster controller
whether to create an intially empty partition table or a partition table with
as many partitions as the configured bootstrap partitions.

This fixes #2084.
  • Loading branch information
tillrohrmann committed Oct 15, 2024
1 parent 2b7acbb commit 03ef822
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 113 deletions.
17 changes: 11 additions & 6 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use rand::prelude::IteratorRandom;
use rand::{thread_rng, RngCore};
use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadWriteError, WriteError};
use restate_core::metadata_store::{
retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError,
};
use restate_core::{metadata, task_center, Metadata, MetadataWriter, ShutdownError};
use restate_types::config::Configuration;
use restate_types::errors::GenericError;
use restate_types::live::Pinned;
use restate_types::logs::builder::LogsBuilder;
Expand Down Expand Up @@ -742,23 +745,25 @@ pub struct LogsController {

impl LogsController {
pub async fn init(
configuration: &Configuration,
metadata: Metadata,
bifrost: Bifrost,
metadata_store_client: MetadataStoreClient,
metadata_writer: MetadataWriter,
default_provider: ProviderKind,
) -> Result<Self> {
// obtain the latest logs or init it with an empty logs variant
let logs = metadata_store_client
.get_or_insert(BIFROST_CONFIG_KEY.clone(), Logs::default)
.await?;
let logs = retry_on_network_error(
configuration.common.network_error_retry_policy.clone(),
|| metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), Logs::default),
)
.await?;
metadata_writer.update(logs).await?;
Ok(Self {
effects: Some(Vec::new()),
inner: LogsControllerInner::new(
metadata.logs_ref(),
metadata.partition_table_ref().as_ref(),
default_provider,
configuration.bifrost.default_provider,
)?,
metadata,
bifrost,
Expand Down
35 changes: 27 additions & 8 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use tracing::{debug, trace};

use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use restate_core::metadata_store::{
MetadataStoreClient, Precondition, ReadError, ReadWriteError, WriteError,
retry_on_network_error, MetadataStoreClient, Precondition, ReadError, ReadWriteError,
WriteError,
};
use restate_core::network::{NetworkSender, Networking, Outgoing, TransportConnect};
use restate_core::{metadata, ShutdownError, SyncError, TaskCenter, TaskKind};
use restate_types::cluster_controller::{
ReplicationStrategy, SchedulingPlan, SchedulingPlanBuilder, TargetPartitionState,
};
use restate_types::config::Configuration;
use restate_types::identifiers::PartitionId;
use restate_types::logs::metadata::Logs;
use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY;
Expand Down Expand Up @@ -75,13 +77,19 @@ pub struct Scheduler<T> {
/// scheduling plan).
impl<T: TransportConnect> Scheduler<T> {
pub async fn init(
configuration: &Configuration,
task_center: TaskCenter,
metadata_store_client: MetadataStoreClient,
networking: Networking<T>,
) -> Result<Self, BuildError> {
let scheduling_plan = metadata_store_client
.get_or_insert(SCHEDULING_PLAN_KEY.clone(), SchedulingPlan::default)
.await?;
let scheduling_plan = retry_on_network_error(
configuration.common.network_error_retry_policy.clone(),
|| {
metadata_store_client
.get_or_insert(SCHEDULING_PLAN_KEY.clone(), SchedulingPlan::default)
},
)
.await?;

Ok(Self {
scheduling_plan,
Expand Down Expand Up @@ -415,6 +423,7 @@ mod tests {
AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode,
};
use restate_types::cluster_controller::{ReplicationStrategy, SchedulingPlan};
use restate_types::config::Configuration;
use restate_types::identifiers::PartitionId;
use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY;
use restate_types::net::codec::WireDecode;
Expand All @@ -439,8 +448,13 @@ mod tests {
.get::<SchedulingPlan>(SCHEDULING_PLAN_KEY.clone())
.await
.expect("scheduling plan");
let mut scheduler =
Scheduler::init(tc, metadata_store_client.clone(), networking).await?;
let mut scheduler = Scheduler::init(
Configuration::pinned().as_ref(),
tc,
metadata_store_client.clone(),
networking,
)
.await?;
let observed_cluster_state = ObservedClusterState::default();

scheduler
Expand Down Expand Up @@ -546,8 +560,13 @@ mod tests {
let tc = env.tc.clone();
env.tc
.run_in_scope("test", None, async move {
let mut scheduler =
Scheduler::init(tc, metadata_store_client.clone(), networking).await?;
let mut scheduler = Scheduler::init(
Configuration::pinned().as_ref(),
tc,
metadata_store_client.clone(),
networking,
)
.await?;
let mut observed_cluster_state = ObservedClusterState::default();

for _ in 0..num_scheduling_rounds {
Expand Down
97 changes: 83 additions & 14 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@ use tokio::time;
use tokio::time::{Instant, Interval, MissedTickBehavior};
use tracing::{debug, info, warn};

use super::cluster_state_refresher::{ClusterStateRefresher, ClusterStateWatcher};
use crate::cluster_controller::logs_controller::LogsController;
use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use crate::cluster_controller::scheduler::Scheduler;
use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::MetadataStoreClient;
use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient};
use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::{
Incoming, MessageRouterBuilder, NetworkSender, Networking, TransportConnect,
Expand All @@ -41,11 +37,18 @@ use restate_types::config::{AdminOptions, Configuration};
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::live::Live;
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::metadata_store::keys::PARTITION_TABLE_KEY;
use restate_types::net::cluster_controller::{AttachRequest, AttachResponse};
use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor_manager::CreateSnapshotRequest;
use restate_types::partition_table::PartitionTable;
use restate_types::{GenerationalNodeId, Version};

use super::cluster_state_refresher::{ClusterStateRefresher, ClusterStateWatcher};
use crate::cluster_controller::logs_controller::LogsController;
use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use crate::cluster_controller::scheduler::Scheduler;

#[derive(Debug, thiserror::Error, CodedError)]
pub enum Error {
#[error("error")]
Expand Down Expand Up @@ -221,11 +224,8 @@ impl<T: TransportConnect> Service<T> {
bifrost: Bifrost,
all_partitions_started_tx: Option<oneshot::Sender<()>>,
) -> anyhow::Result<()> {
// Make sure we have partition table before starting
let _ = self
.metadata
.wait_for_version(MetadataKind::PartitionTable, Version::MIN)
.await?;
self.init_partition_table().await?;

let bifrost_admin =
BifrostAdmin::new(&bifrost, &self.metadata_writer, &self.metadata_store_client);

Expand All @@ -248,26 +248,36 @@ impl<T: TransportConnect> Service<T> {
)?;
}

self.task_center.spawn_child(
TaskKind::SystemService,
"cluster-controller-metadata-sync",
None,
sync_cluster_controller_metadata(self.metadata.clone()),
)?;

let configuration = self.configuration.live_load();

let mut scheduler = Scheduler::init(
configuration,
self.task_center.clone(),
self.metadata_store_client.clone(),
self.networking.clone(),
)
.await?;

let mut logs_controller = LogsController::init(
configuration,
self.metadata.clone(),
bifrost.clone(),
self.metadata_store_client.clone(),
self.metadata_writer.clone(),
self.configuration.live_load().bifrost.default_provider,
)
.await?;

let mut observed_cluster_state = ObservedClusterState::default();

let mut logs_watcher = self.metadata.watch(MetadataKind::Logs);
let mut partition_watcher = self.metadata.watch(MetadataKind::PartitionTable);
let mut partition_table_watcher = self.metadata.watch(MetadataKind::PartitionTable);
let mut logs = self.metadata.updateable_logs_metadata();
let mut partition_table = self.metadata.updateable_partition_table();

Expand Down Expand Up @@ -297,8 +307,12 @@ impl<T: TransportConnect> Service<T> {
// tell the scheduler about potentially newly provisioned logs
scheduler.on_logs_update(logs.live_load(), partition_table.live_load()).await?
}
Ok(_) = partition_watcher.changed() => {
logs_controller.on_partition_table_update(partition_table.live_load())
Ok(_) = partition_table_watcher.changed() => {
let partition_table = partition_table.live_load();
let logs = logs.live_load();

logs_controller.on_partition_table_update(partition_table);
scheduler.on_logs_update(logs, partition_table).await?;
}
Some(cmd) = self.command_rx.recv() => {
self.on_cluster_cmd(cmd, bifrost_admin).await;
Expand All @@ -320,6 +334,32 @@ impl<T: TransportConnect> Service<T> {
}
}

async fn init_partition_table(&mut self) -> anyhow::Result<()> {
let configuration = self.configuration.live_load();

let partition_table = retry_on_network_error(
configuration.common.network_error_retry_policy.clone(),
|| {
self.metadata_store_client
.get_or_insert(PARTITION_TABLE_KEY.clone(), || {
if configuration.common.auto_provision_partitions {
PartitionTable::with_equally_sized_partitions(
Version::MIN,
configuration.common.bootstrap_num_partitions(),
)
} else {
PartitionTable::with_equally_sized_partitions(Version::MIN, 0)
}
})
},
)
.await?;

self.metadata_writer.update(partition_table).await?;

Ok(())
}

async fn trim_logs(
&self,
bifrost_admin: BifrostAdmin<'_>,
Expand Down Expand Up @@ -488,6 +528,35 @@ impl<T: TransportConnect> Service<T> {
}
}

async fn sync_cluster_controller_metadata(metadata: Metadata) -> anyhow::Result<()> {
// todo make this configurable
let mut interval = time::interval(Duration::from_secs(10));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

let mut cancel = std::pin::pin!(cancellation_watcher());

loop {
tokio::select! {
_ = &mut cancel => {
break;
},
_ = interval.tick() => {
tokio::select! {
_ = &mut cancel => {
break;
},
_ = futures::future::join3(
metadata.sync(MetadataKind::NodesConfiguration, TargetVersion::Latest),
metadata.sync(MetadataKind::PartitionTable, TargetVersion::Latest),
metadata.sync(MetadataKind::Logs, TargetVersion::Latest)) => {}
}
}
}
}

Ok(())
}

async fn signal_all_partitions_started(
mut cluster_state_watcher: ClusterStateWatcher,
metadata: Metadata,
Expand Down
31 changes: 30 additions & 1 deletion crates/core/src/metadata_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ use restate_types::errors::GenericError;
use restate_types::retries::RetryPolicy;
use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode};
use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned};
use std::future::Future;
use std::sync::Arc;
use tracing::debug;
use std::time::{Duration, Instant};
use tracing::log::trace;
use tracing::{debug, info};

#[derive(Debug, thiserror::Error)]
pub enum ReadError {
Expand Down Expand Up @@ -384,3 +387,29 @@ impl MetadataStoreClientError for WriteError {
}

static_assertions::assert_impl_all!(MetadataStoreClient: Send, Sync, Clone);

pub async fn retry_on_network_error<Fn, Fut, T, E, P>(retry_policy: P, action: Fn) -> Result<T, E>
where
P: Into<RetryPolicy>,
Fn: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: MetadataStoreClientError + std::fmt::Display,
{
let upsert_start = Instant::now();

retry_policy
.into()
.retry_if(action, |err: &E| {
if err.is_network_error() {
if upsert_start.elapsed() < Duration::from_secs(5) {
trace!("could not connect to metadata store: {err}; retrying");
} else {
info!("could not connect to metadata store: {err}; retrying");
}
true
} else {
false
}
})
.await
}
Loading

0 comments on commit 03ef822

Please sign in to comment.