Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto provision #2058

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ use tracing::debug;
use xxhash_rust::xxh3::Xxh3Builder;

use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadWriteError, WriteError};
use restate_core::metadata_store::{
retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError,
};
use restate_core::{metadata, task_center, Metadata, MetadataWriter, ShutdownError};
use restate_types::config::Configuration;
use restate_types::errors::GenericError;
use restate_types::live::Pinned;
use restate_types::logs::builder::LogsBuilder;
Expand Down Expand Up @@ -746,23 +749,25 @@ pub struct LogsController {

impl LogsController {
pub async fn init(
configuration: &Configuration,
metadata: Metadata,
bifrost: Bifrost,
metadata_store_client: MetadataStoreClient,
metadata_writer: MetadataWriter,
default_provider: ProviderKind,
) -> Result<Self> {
// obtain the latest logs or init it with an empty logs variant
let logs = metadata_store_client
.get_or_insert(BIFROST_CONFIG_KEY.clone(), Logs::default)
.await?;
let logs = retry_on_network_error(
configuration.common.network_error_retry_policy.clone(),
|| metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), Logs::default),
)
.await?;
metadata_writer.update(logs).await?;
Ok(Self {
effects: Some(Vec::new()),
inner: LogsControllerInner::new(
metadata.logs_ref(),
metadata.partition_table_ref().as_ref(),
default_provider,
configuration.bifrost.default_provider,
)?,
metadata,
bifrost,
Expand Down
35 changes: 27 additions & 8 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use tracing::{debug, trace};

use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use restate_core::metadata_store::{
MetadataStoreClient, Precondition, ReadError, ReadWriteError, WriteError,
retry_on_network_error, MetadataStoreClient, Precondition, ReadError, ReadWriteError,
WriteError,
};
use restate_core::network::{NetworkSender, Networking, Outgoing, TransportConnect};
use restate_core::{metadata, ShutdownError, SyncError, TaskCenter, TaskKind};
use restate_types::cluster_controller::{
ReplicationStrategy, SchedulingPlan, SchedulingPlanBuilder, TargetPartitionState,
};
use restate_types::config::Configuration;
use restate_types::identifiers::PartitionId;
use restate_types::logs::metadata::Logs;
use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY;
Expand Down Expand Up @@ -75,13 +77,19 @@ pub struct Scheduler<T> {
/// scheduling plan).
impl<T: TransportConnect> Scheduler<T> {
pub async fn init(
configuration: &Configuration,
task_center: TaskCenter,
metadata_store_client: MetadataStoreClient,
networking: Networking<T>,
) -> Result<Self, BuildError> {
let scheduling_plan = metadata_store_client
.get_or_insert(SCHEDULING_PLAN_KEY.clone(), SchedulingPlan::default)
.await?;
let scheduling_plan = retry_on_network_error(
configuration.common.network_error_retry_policy.clone(),
|| {
metadata_store_client
.get_or_insert(SCHEDULING_PLAN_KEY.clone(), SchedulingPlan::default)
},
)
.await?;

Ok(Self {
scheduling_plan,
Expand Down Expand Up @@ -415,6 +423,7 @@ mod tests {
AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode,
};
use restate_types::cluster_controller::{ReplicationStrategy, SchedulingPlan};
use restate_types::config::Configuration;
use restate_types::identifiers::PartitionId;
use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY;
use restate_types::net::codec::WireDecode;
Expand All @@ -439,8 +448,13 @@ mod tests {
.get::<SchedulingPlan>(SCHEDULING_PLAN_KEY.clone())
.await
.expect("scheduling plan");
let mut scheduler =
Scheduler::init(tc, metadata_store_client.clone(), networking).await?;
let mut scheduler = Scheduler::init(
Configuration::pinned().as_ref(),
tc,
metadata_store_client.clone(),
networking,
)
.await?;
let observed_cluster_state = ObservedClusterState::default();

scheduler
Expand Down Expand Up @@ -546,8 +560,13 @@ mod tests {
let tc = env.tc.clone();
env.tc
.run_in_scope("test", None, async move {
let mut scheduler =
Scheduler::init(tc, metadata_store_client.clone(), networking).await?;
let mut scheduler = Scheduler::init(
Configuration::pinned().as_ref(),
tc,
metadata_store_client.clone(),
networking,
)
.await?;
let mut observed_cluster_state = ObservedClusterState::default();

for _ in 0..num_scheduling_rounds {
Expand Down
101 changes: 87 additions & 14 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@ use tokio::time;
use tokio::time::{Instant, Interval, MissedTickBehavior};
use tracing::{debug, info, warn};

use super::cluster_state_refresher::{ClusterStateRefresher, ClusterStateWatcher};
use crate::cluster_controller::logs_controller::LogsController;
use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use crate::cluster_controller::scheduler::Scheduler;
use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::MetadataStoreClient;
use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient};
use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::{
Incoming, MessageRouterBuilder, NetworkSender, Networking, TransportConnect,
Expand All @@ -41,11 +37,18 @@ use restate_types::config::{AdminOptions, Configuration};
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::live::Live;
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::metadata_store::keys::PARTITION_TABLE_KEY;
use restate_types::net::cluster_controller::{AttachRequest, AttachResponse};
use restate_types::net::metadata::MetadataKind;
use restate_types::net::partition_processor_manager::CreateSnapshotRequest;
use restate_types::partition_table::PartitionTable;
use restate_types::{GenerationalNodeId, Version};

use super::cluster_state_refresher::{ClusterStateRefresher, ClusterStateWatcher};
use crate::cluster_controller::logs_controller::LogsController;
use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use crate::cluster_controller::scheduler::Scheduler;

#[derive(Debug, thiserror::Error, CodedError)]
pub enum Error {
#[error("error")]
Expand Down Expand Up @@ -221,11 +224,8 @@ impl<T: TransportConnect> Service<T> {
bifrost: Bifrost,
all_partitions_started_tx: Option<oneshot::Sender<()>>,
) -> anyhow::Result<()> {
// Make sure we have partition table before starting
let _ = self
.metadata
.wait_for_version(MetadataKind::PartitionTable, Version::MIN)
.await?;
self.init_partition_table().await?;

let bifrost_admin =
BifrostAdmin::new(&bifrost, &self.metadata_writer, &self.metadata_store_client);

Expand All @@ -248,26 +248,36 @@ impl<T: TransportConnect> Service<T> {
)?;
}

self.task_center.spawn_child(
TaskKind::SystemService,
"cluster-controller-metadata-sync",
None,
sync_cluster_controller_metadata(self.metadata.clone()),
)?;

let configuration = self.configuration.live_load();

let mut scheduler = Scheduler::init(
configuration,
self.task_center.clone(),
self.metadata_store_client.clone(),
self.networking.clone(),
)
.await?;

let mut logs_controller = LogsController::init(
configuration,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker, but I'd have loved if we rely more on Live configuration for a more reactive experience.

Copy link
Contributor Author

@tillrohrmann tillrohrmann Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the LogsController depends on values in the Configuration apart from the init phase, then this sounds like a good idea.

self.metadata.clone(),
bifrost.clone(),
self.metadata_store_client.clone(),
self.metadata_writer.clone(),
self.configuration.live_load().bifrost.default_provider,
)
.await?;

let mut observed_cluster_state = ObservedClusterState::default();

let mut logs_watcher = self.metadata.watch(MetadataKind::Logs);
let mut partition_watcher = self.metadata.watch(MetadataKind::PartitionTable);
let mut partition_table_watcher = self.metadata.watch(MetadataKind::PartitionTable);
let mut logs = self.metadata.updateable_logs_metadata();
let mut partition_table = self.metadata.updateable_partition_table();

Expand Down Expand Up @@ -297,8 +307,12 @@ impl<T: TransportConnect> Service<T> {
// tell the scheduler about potentially newly provisioned logs
scheduler.on_logs_update(logs.live_load(), partition_table.live_load()).await?
}
Ok(_) = partition_watcher.changed() => {
logs_controller.on_partition_table_update(partition_table.live_load())
Ok(_) = partition_table_watcher.changed() => {
let partition_table = partition_table.live_load();
let logs = logs.live_load();

logs_controller.on_partition_table_update(partition_table);
scheduler.on_logs_update(logs, partition_table).await?;
}
Some(cmd) = self.command_rx.recv() => {
self.on_cluster_cmd(cmd, bifrost_admin).await;
Expand All @@ -320,6 +334,36 @@ impl<T: TransportConnect> Service<T> {
}
}

async fn init_partition_table(&mut self) -> anyhow::Result<()> {
let configuration = self.configuration.live_load();

let partition_table = retry_on_network_error(
Comment on lines +337 to +340
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Might be worth logging that we are initializing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will add it.

configuration.common.network_error_retry_policy.clone(),
|| {
self.metadata_store_client
.get_or_insert(PARTITION_TABLE_KEY.clone(), || {
let partition_table = if configuration.common.auto_provision_partitions {
PartitionTable::with_equally_sized_partitions(
Version::MIN,
configuration.common.bootstrap_num_partitions(),
)
} else {
PartitionTable::with_equally_sized_partitions(Version::MIN, 0)
};

debug!("Initializing the partition table with '{partition_table:?}'");

partition_table
})
},
)
.await?;

self.metadata_writer.update(partition_table).await?;

Ok(())
}

async fn trim_logs(
&self,
bifrost_admin: BifrostAdmin<'_>,
Expand Down Expand Up @@ -488,6 +532,35 @@ impl<T: TransportConnect> Service<T> {
}
}

async fn sync_cluster_controller_metadata(metadata: Metadata) -> anyhow::Result<()> {
// todo make this configurable
let mut interval = time::interval(Duration::from_secs(10));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

let mut cancel = std::pin::pin!(cancellation_watcher());

loop {
tokio::select! {
_ = &mut cancel => {
break;
},
_ = interval.tick() => {
tokio::select! {
_ = &mut cancel => {
break;
},
_ = futures::future::join3(
metadata.sync(MetadataKind::NodesConfiguration, TargetVersion::Latest),
metadata.sync(MetadataKind::PartitionTable, TargetVersion::Latest),
metadata.sync(MetadataKind::Logs, TargetVersion::Latest)) => {}
}
}
}
}

Ok(())
}

async fn signal_all_partitions_started(
mut cluster_state_watcher: ClusterStateWatcher,
metadata: Metadata,
Expand Down
31 changes: 30 additions & 1 deletion crates/core/src/metadata_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ use restate_types::errors::GenericError;
use restate_types::retries::RetryPolicy;
use restate_types::storage::{StorageCodec, StorageDecode, StorageEncode};
use restate_types::{flexbuffers_storage_encode_decode, Version, Versioned};
use std::future::Future;
use std::sync::Arc;
use tracing::debug;
use std::time::{Duration, Instant};
use tracing::log::trace;
use tracing::{debug, info};

#[derive(Debug, thiserror::Error)]
pub enum ReadError {
Expand Down Expand Up @@ -384,3 +387,29 @@ impl MetadataStoreClientError for WriteError {
}

static_assertions::assert_impl_all!(MetadataStoreClient: Send, Sync, Clone);

pub async fn retry_on_network_error<Fn, Fut, T, E, P>(retry_policy: P, action: Fn) -> Result<T, E>
where
P: Into<RetryPolicy>,
Fn: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: MetadataStoreClientError + std::fmt::Display,
{
let upsert_start = Instant::now();

retry_policy
.into()
.retry_if(action, |err: &E| {
if err.is_network_error() {
if upsert_start.elapsed() < Duration::from_secs(5) {
trace!("could not connect to metadata store: {err}; retrying");
} else {
info!("could not connect to metadata store: {err}; retrying");
}
true
} else {
false
}
})
.await
}
Loading
Loading