Skip to content

Commit

Permalink
[Bifrost] Auto provision logs metadata
Browse files Browse the repository at this point in the history
BifrostService will not auto provision its own metadata entry in metadata store on startup.
  • Loading branch information
AhmedSoliman committed Dec 24, 2024
1 parent 91f69a9 commit a8c0143
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 36 deletions.
26 changes: 2 additions & 24 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ use std::time::Duration;
use futures::never::Never;
use rand::prelude::IteratorRandom;
use rand::thread_rng;
use restate_types::config::Configuration;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, error, trace, trace_span, Instrument};
use xxhash_rust::xxh3::Xxh3Builder;

use restate_bifrost::{Bifrost, Error as BifrostError};
use restate_core::metadata_store::{
retry_on_network_error, Precondition, ReadWriteError, WriteError,
};
use restate_core::metadata_store::{Precondition, ReadWriteError, WriteError};
use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt};
use restate_types::errors::GenericError;
use restate_types::identifiers::PartitionId;
Expand Down Expand Up @@ -930,26 +927,7 @@ pub struct LogsController {
}

impl LogsController {
pub async fn init(
configuration: &Configuration,
bifrost: Bifrost,
metadata_writer: MetadataWriter,
) -> Result<Self> {
// obtain the latest logs or init it with an empty logs variant
let logs = retry_on_network_error(
configuration.common.network_error_retry_policy.clone(),
|| {
metadata_writer
.metadata_store_client()
.get_or_insert(BIFROST_CONFIG_KEY.clone(), || {
Logs::from_configuration(configuration)
})
},
)
.await?;

metadata_writer.update(Arc::new(logs)).await?;

pub async fn init(bifrost: Bifrost, metadata_writer: MetadataWriter) -> Result<Self> {
//todo(azmy): make configurable
let retry_policy = RetryPolicy::exponential(
Duration::from_millis(10),
Expand Down
8 changes: 2 additions & 6 deletions crates/admin/src/cluster_controller/service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,8 @@ where
)
.await?;

let logs_controller = LogsController::init(
&configuration,
service.bifrost.clone(),
service.metadata_writer.clone(),
)
.await?;
let logs_controller =
LogsController::init(service.bifrost.clone(), service.metadata_writer.clone()).await?;

let (log_trim_interval, log_trim_threshold) =
create_log_trim_interval(&configuration.admin);
Expand Down
1 change: 1 addition & 0 deletions crates/bifrost/src/bifrost_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ impl<'a> BifrostAdmin<'a> {
.metadata_writer
.metadata_store_client()
.get_or_insert(BIFROST_CONFIG_KEY.clone(), || {
debug!("Attempting to initialize logs metadata in metadata store");
Logs::from_configuration(&Configuration::pinned())
})
})
Expand Down
10 changes: 4 additions & 6 deletions crates/bifrost/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context;
use enum_map::EnumMap;
use tracing::{debug, error, trace};

Expand Down Expand Up @@ -81,11 +80,10 @@ impl BifrostService {
///
/// This requires to run within a task_center context.
pub async fn start(self) -> anyhow::Result<()> {
// Perform an initial metadata sync.
self.inner
.sync_metadata()
.await
.context("Initial bifrost metadata sync has failed!")?;
// Make sure we have v1 metadata written to metadata store with the default
// configuration. If metadata is already initialized, this will make sure we have the
// latest version set in metadata manager.
self.bifrost.admin().init_metadata().await?;

// initialize all enabled providers.
if self.factories.is_empty() {
Expand Down

0 comments on commit a8c0143

Please sign in to comment.