Skip to content

Commit

Permalink
[Bifrost] Let providers propose their own parameters
Browse files Browse the repository at this point in the history
- BifrostAdmin can now init empty logs configuration
- BifrostAdmin can now auto extend the chain
- Providers now have control over suggesting a new segment configuration
- Introduces a temporary copy of NodeSetSelector into bifrost until log-controller is removed
  • Loading branch information
AhmedSoliman committed Dec 24, 2024
1 parent 1ac1f70 commit 658da80
Show file tree
Hide file tree
Showing 22 changed files with 903 additions and 124 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 35 additions & 37 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,30 @@

mod nodeset_selection;

use futures::never::Never;
use rand::prelude::IteratorRandom;
use rand::thread_rng;
use std::collections::HashMap;
use std::iter;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

use futures::never::Never;
use rand::prelude::IteratorRandom;
use rand::thread_rng;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, error, trace, trace_span, Instrument};
use xxhash_rust::xxh3::Xxh3Builder;

use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError};
use restate_core::metadata_store::{
retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError,
};
use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadWriteError, WriteError};
use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt};
use restate_types::config::Configuration;
use restate_types::errors::GenericError;
use restate_types::identifiers::PartitionId;
use restate_types::live::Pinned;
use restate_types::logs::builder::LogsBuilder;
use restate_types::logs::metadata::{
Chain, DefaultProvider, LogletConfig, LogletParams, Logs, LogsConfiguration,
NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, SegmentIndex,
Chain, LogletConfig, LogletParams, Logs, LogsConfiguration, NodeSetSelectionStrategy,
ProviderConfiguration, ProviderKind, ReplicatedLogletConfig, SegmentIndex,
};
use restate_types::logs::{LogId, LogletId, Lsn, TailState};
use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
Expand Down Expand Up @@ -67,6 +65,8 @@ pub enum LogsControllerError {
LogletParamsToConfiguration(GenericError),
#[error(transparent)]
Shutdown(#[from] ShutdownError),
#[error(transparent)]
Other(GenericError),
}

/// Node set selector hints for the [`LogsController`].
Expand Down Expand Up @@ -320,17 +320,17 @@ fn try_provisioning(
node_set_selector_hints: impl NodeSetSelectorHints,
) -> Option<LogletConfiguration> {
match logs_configuration.default_provider {
DefaultProvider::Local => {
ProviderConfiguration::Local => {
let log_id = LogletId::new(log_id, SegmentIndex::OLDEST);
Some(LogletConfiguration::Local(log_id.into()))
}
#[cfg(any(test, feature = "memory-loglet"))]
DefaultProvider::InMemory => {
ProviderConfiguration::InMemory => {
let log_id = LogletId::new(log_id, SegmentIndex::OLDEST);
Some(LogletConfiguration::Memory(log_id.into()))
}
#[cfg(feature = "replicated-loglet")]
DefaultProvider::Replicated(ref config) => build_new_replicated_loglet_configuration(
ProviderConfiguration::Replicated(ref config) => build_new_replicated_loglet_configuration(
config,
LogletId::new(log_id, SegmentIndex::OLDEST),
&Metadata::with_current(|m| m.nodes_config_ref()),
Expand Down Expand Up @@ -436,10 +436,10 @@ impl LogletConfiguration {
) -> bool {
match (self, &logs_configuration.default_provider) {
#[cfg(any(test, feature = "memory-loglet"))]
(Self::Memory(_), DefaultProvider::InMemory) => false,
(Self::Local(_), DefaultProvider::Local) => false,
(Self::Memory(_), ProviderConfiguration::InMemory) => false,
(Self::Local(_), ProviderConfiguration::Local) => false,
#[cfg(feature = "replicated-loglet")]
(Self::Replicated(params), DefaultProvider::Replicated(config)) => {
(Self::Replicated(params), ProviderConfiguration::Replicated(config)) => {
let sequencer_change_required = !observed_cluster_state
.is_node_alive(params.sequencer)
&& !observed_cluster_state.alive_nodes.is_empty();
Expand Down Expand Up @@ -501,10 +501,14 @@ impl LogletConfiguration {

match logs_configuration.default_provider {
#[cfg(any(test, feature = "memory-loglet"))]
DefaultProvider::InMemory => Some(LogletConfiguration::Memory(loglet_id.next().into())),
DefaultProvider::Local => Some(LogletConfiguration::Local(loglet_id.next().into())),
ProviderConfiguration::InMemory => {
Some(LogletConfiguration::Memory(loglet_id.next().into()))
}
ProviderConfiguration::Local => {
Some(LogletConfiguration::Local(loglet_id.next().into()))
}
#[cfg(feature = "replicated-loglet")]
DefaultProvider::Replicated(ref config) => {
ProviderConfiguration::Replicated(ref config) => {
let previous_params = match self {
Self::Replicated(previous_params) => Some(previous_params),
_ => None,
Expand Down Expand Up @@ -926,24 +930,15 @@ pub struct LogsController {

impl LogsController {
pub async fn init(
configuration: &Configuration,
bifrost: Bifrost,
metadata_store_client: MetadataStoreClient,
metadata_writer: MetadataWriter,
) -> Result<Self> {
// obtain the latest logs or init it with an empty logs variant
let logs = retry_on_network_error(
configuration.common.network_error_retry_policy.clone(),
|| {
metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || {
Logs::from_configuration(configuration)
})
},
)
.await?;

let logs_configuration = logs.configuration().clone();
metadata_writer.update(Arc::new(logs)).await?;
// fetches latest logs or init it with an empty logs variant
BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client)
.init_metadata()
.await
.map_err(|e| LogsControllerError::Other(e.into()))?;

//todo(azmy): make configurable
let retry_policy = RetryPolicy::exponential(
Expand All @@ -955,7 +950,10 @@ impl LogsController {

let mut this = Self {
effects: Some(Vec::new()),
inner: LogsControllerInner::new(logs_configuration, retry_policy),
inner: LogsControllerInner::new(
Metadata::with_current(|m| m.logs_ref().configuration().clone()),
retry_policy,
),
bifrost,
metadata_store_client,
metadata_writer,
Expand Down Expand Up @@ -1279,7 +1277,7 @@ pub mod tests {

use enumset::{enum_set, EnumSet};
use restate_types::logs::metadata::{
DefaultProvider, LogsConfiguration, NodeSetSelectionStrategy, ReplicatedLogletConfig,
LogsConfiguration, NodeSetSelectionStrategy, ProviderConfiguration, ReplicatedLogletConfig,
};
use restate_types::logs::LogletId;
use restate_types::nodes_config::{
Expand Down Expand Up @@ -1452,7 +1450,7 @@ pub mod tests {

fn logs_configuration(replication_factor: u8) -> LogsConfiguration {
LogsConfiguration {
default_provider: DefaultProvider::Replicated(ReplicatedLogletConfig {
default_provider: ProviderConfiguration::Replicated(ReplicatedLogletConfig {
replication_property: ReplicationProperty::new(
NonZeroU8::new(replication_factor).expect("must be non zero"),
),
Expand Down Expand Up @@ -1537,7 +1535,7 @@ pub mod tests {
&nodes.observed_state
));

let DefaultProvider::Replicated(ref replicated_loglet_config) =
let ProviderConfiguration::Replicated(ref replicated_loglet_config) =
logs_config.default_provider
else {
unreachable!()
Expand Down Expand Up @@ -1571,7 +1569,7 @@ pub mod tests {

let logs_config = logs_configuration(2);

let DefaultProvider::Replicated(ref replicated_loglet_config) =
let ProviderConfiguration::Replicated(ref replicated_loglet_config) =
logs_config.default_provider
else {
unreachable!()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use std::cmp::{max, Ordering};
use itertools::Itertools;
use rand::prelude::IteratorRandom;
use rand::Rng;
use restate_types::logs::metadata::NodeSetSelectionStrategy;
use tracing::trace;

use restate_types::logs::metadata::NodeSetSelectionStrategy;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::replicated_loglet::{LocationScope, NodeSet, ReplicationProperty};

Expand Down
17 changes: 8 additions & 9 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tracing::{debug, info};
use restate_metadata_store::ReadModifyWriteError;
use restate_types::cluster_controller::SchedulingPlan;
use restate_types::logs::metadata::{
DefaultProvider, LogletParams, Logs, LogsConfiguration, ProviderKind, SegmentIndex,
LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind, SegmentIndex,
};
use restate_types::metadata_store::keys::{
BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY, SCHEDULING_PLAN_KEY,
Expand Down Expand Up @@ -183,7 +183,7 @@ enum ClusterControllerCommand {
UpdateClusterConfiguration {
num_partitions: NonZeroU16,
replication_strategy: ReplicationStrategy,
default_provider: DefaultProvider,
default_provider: ProviderConfiguration,
response_tx: oneshot::Sender<anyhow::Result<()>>,
},
SealAndExtendChain {
Expand Down Expand Up @@ -249,7 +249,7 @@ impl ClusterControllerHandle {
&self,
num_partitions: NonZeroU16,
replication_strategy: ReplicationStrategy,
default_provider: DefaultProvider,
default_provider: ProviderConfiguration,
) -> Result<anyhow::Result<()>, ShutdownError> {
let (response_tx, response_rx) = oneshot::channel();

Expand Down Expand Up @@ -439,7 +439,7 @@ impl<T: TransportConnect> Service<T> {
&self,
num_partitions: u16,
replication_strategy: ReplicationStrategy,
default_provider: DefaultProvider,
default_provider: ProviderConfiguration,
) -> anyhow::Result<()> {
let logs = self
.metadata_store_client
Expand All @@ -457,8 +457,7 @@ impl<T: TransportConnect> Service<T> {

// we can only change the default provider
if logs.version() != Version::INVALID
&& logs.configuration().default_provider.as_provider_kind()
!= default_provider.as_provider_kind()
&& logs.configuration().default_provider.kind() != default_provider.kind()
{
{
return Err(
Expand Down Expand Up @@ -786,16 +785,16 @@ impl SealAndExtendTask {

let (provider, params) = match &logs.configuration().default_provider {
#[cfg(any(test, feature = "memory-loglet"))]
DefaultProvider::InMemory => (
ProviderConfiguration::InMemory => (
ProviderKind::InMemory,
u64::from(loglet_id.next()).to_string().into(),
),
DefaultProvider::Local => (
ProviderConfiguration::Local => (
ProviderKind::Local,
u64::from(loglet_id.next()).to_string().into(),
),
#[cfg(feature = "replicated-loglet")]
DefaultProvider::Replicated(config) => {
ProviderConfiguration::Replicated(config) => {
let schedule_plan = self
.metadata_store_client
.get::<SchedulingPlan>(SCHEDULING_PLAN_KEY.clone())
Expand Down
1 change: 0 additions & 1 deletion crates/admin/src/cluster_controller/service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ where
.await?;

let logs_controller = LogsController::init(
&configuration,
service.bifrost.clone(),
service.metadata_store_client.clone(),
service.metadata_writer.clone(),
Expand Down
1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ derive_more = { workspace = true }
enum-map = { workspace = true, features = ["serde"] }
futures = { workspace = true }
googletest = { workspace = true, features = ["anyhow"], optional = true }
itertools = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
pin-project = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Appender {
info!(
attempt = attempt,
segment_index = %loglet.segment_index(),
"Append batch will be retried (loglet being sealed), waiting for tail to be determined"
"Append batch will be retried (loglet is being sealed), waiting for tail to be determined"
);
let new_loglet = Self::wait_next_unsealed_loglet(
self.log_id,
Expand All @@ -131,7 +131,7 @@ impl Appender {
Err(AppendError::Other(err)) if err.retryable() => {
if let Some(retry_dur) = retry_iter.next() {
info!(
?err,
%err,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Failed to append this batch. Since underlying error is retryable, will retry in {:?}",
Expand All @@ -140,7 +140,7 @@ impl Appender {
tokio::time::sleep(retry_dur).await;
} else {
warn!(
?err,
%err,
attempt = attempt,
segment_index = %loglet.segment_index(),
"Failed to append this batch and exhausted all attempts to retry",
Expand Down
Loading

0 comments on commit 658da80

Please sign in to comment.