Skip to content

Commit

Permalink
[Core] MetadataWriter access to MetadataStore and Bifrost access to M…
Browse files Browse the repository at this point in the history
…etadataWriter

This PR introduces two key changes:

1. Bifrost now can provide `bifrost.admin()` to get easy access to bifrost's admin interface withouto passing metadata writer and metadata store client explicitly.
2. MetadataWriter now owns MetadataStoreClient.


This means that you can always access metadata store client directly if you have metadata_writer which used to be a pair of types we _always_ pass together. This also opens a path for a future where MetadataWriter wraps the metadata store client and automatically update metadata manager on successful writes but that's beyond the scope of this PR. What this PR provides is a direct access to the underlying metadata_store_client via an accessor function.
  • Loading branch information
AhmedSoliman committed Dec 24, 2024
1 parent 658da80 commit e2a6647
Show file tree
Hide file tree
Showing 26 changed files with 201 additions and 278 deletions.
25 changes: 10 additions & 15 deletions crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use restate_types::protobuf::cluster::ClusterConfiguration;
use tonic::{async_trait, Request, Response, Status};
use tracing::info;

use restate_bifrost::{Bifrost, BifrostAdmin, Error as BiforstError};
use restate_bifrost::{Bifrost, Error as BiforstError};
use restate_core::{Metadata, MetadataWriter};
use restate_metadata_store::MetadataStoreClient;
use restate_types::identifiers::PartitionId;
use restate_types::logs::metadata::{Logs, SegmentIndex};
use restate_types::logs::{LogId, Lsn, SequenceNumber};
Expand All @@ -44,7 +43,6 @@ use super::service::ChainExtension;
use super::ClusterControllerHandle;

pub(crate) struct ClusterCtrlSvcHandler {
metadata_store_client: MetadataStoreClient,
controller_handle: ClusterControllerHandle,
bifrost: Bifrost,
metadata_writer: MetadataWriter,
Expand All @@ -53,20 +51,19 @@ pub(crate) struct ClusterCtrlSvcHandler {
impl ClusterCtrlSvcHandler {
pub fn new(
controller_handle: ClusterControllerHandle,
metadata_store_client: MetadataStoreClient,
bifrost: Bifrost,
metadata_writer: MetadataWriter,
) -> Self {
Self {
controller_handle,
metadata_store_client,
bifrost,
metadata_writer,
}
}

async fn get_logs(&self) -> Result<Logs, Status> {
self.metadata_store_client
self.metadata_writer
.metadata_store_client()
.get::<Logs>(BIFROST_CONFIG_KEY.clone())
.await
.map_err(|error| Status::unknown(format!("Failed to get log metadata: {error:?}")))?
Expand Down Expand Up @@ -120,7 +117,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {

let (trim_point, nodes_config) = tokio::join!(
self.bifrost.get_trim_point(log_id),
self.metadata_store_client
self.metadata_writer
.metadata_store_client()
.get::<NodesConfiguration>(NODES_CONFIG_KEY.clone()),
);

Expand Down Expand Up @@ -151,7 +149,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
_request: Request<ListNodesRequest>,
) -> Result<Response<ListNodesResponse>, Status> {
let nodes_config = self
.metadata_store_client
.metadata_writer
.metadata_store_client()
.get::<NodesConfiguration>(NODES_CONFIG_KEY.clone())
.await
.map_err(|error| {
Expand Down Expand Up @@ -261,13 +260,9 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
let request = request.into_inner();
let log_id: LogId = request.log_id.into();

let admin = BifrostAdmin::new(
&self.bifrost,
&self.metadata_writer,
&self.metadata_store_client,
);

let writable_loglet = admin
let writable_loglet = self
.bifrost
.admin()
.writeable_loglet(log_id)
.await
.map_err(|err| match err {
Expand Down
34 changes: 9 additions & 25 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use tokio::task::JoinSet;
use tracing::{debug, error, trace, trace_span, Instrument};
use xxhash_rust::xxh3::Xxh3Builder;

use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError};
use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadWriteError, WriteError};
use restate_bifrost::{Bifrost, Error as BifrostError};
use restate_core::metadata_store::{Precondition, ReadWriteError, WriteError};
use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt};
use restate_types::errors::GenericError;
use restate_types::identifiers::PartitionId;
Expand Down Expand Up @@ -922,20 +922,16 @@ pub struct LogsController {
effects: Option<Vec<Effect>>,
inner: LogsControllerInner,
bifrost: Bifrost,
metadata_store_client: MetadataStoreClient,
metadata_writer: MetadataWriter,
async_operations: JoinSet<Event>,
find_logs_tail_semaphore: Arc<Semaphore>,
}

impl LogsController {
pub async fn init(
bifrost: Bifrost,
metadata_store_client: MetadataStoreClient,
metadata_writer: MetadataWriter,
) -> Result<Self> {
pub async fn init(bifrost: Bifrost, metadata_writer: MetadataWriter) -> Result<Self> {
// fetches latest logs or init it with an empty logs variant
BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client)
bifrost
.admin()
.init_metadata()
.await
.map_err(|e| LogsControllerError::Other(e.into()))?;
Expand All @@ -955,7 +951,6 @@ impl LogsController {
retry_policy,
),
bifrost,
metadata_store_client,
metadata_writer,
async_operations: JoinSet::default(),
find_logs_tail_semaphore: Arc::new(Semaphore::new(1)),
Expand All @@ -974,17 +969,12 @@ impl LogsController {

let logs = Arc::clone(&self.inner.current_logs);
let bifrost = self.bifrost.clone();
let metadata_store_client = self.metadata_store_client.clone();
let metadata_writer = self.metadata_writer.clone();
let find_tail = async move {
let bifrost_admin =
BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client);

let mut updates = LogsTailUpdates::default();
for (log_id, chain) in logs.iter() {
let tail_segment = chain.tail();

let writable_loglet = match bifrost_admin.writeable_loglet(*log_id).await {
let writable_loglet = match bifrost.admin().writeable_loglet(*log_id).await {
Ok(loglet) => loglet,
Err(BifrostError::Shutdown(_)) => break,
Err(err) => {
Expand Down Expand Up @@ -1088,7 +1078,6 @@ impl LogsController {
logs: Arc<Logs>,
mut debounce: Option<RetryIter<'static>>,
) {
let metadata_store_client = self.metadata_store_client.clone();
let metadata_writer = self.metadata_writer.clone();

self.async_operations.spawn(async move {
Expand All @@ -1098,7 +1087,7 @@ impl LogsController {
tokio::time::sleep(delay).await;
}

if let Err(err) = metadata_store_client
if let Err(err) = metadata_writer.metadata_store_client()
.put(
BIFROST_CONFIG_KEY.clone(),
logs.deref(),
Expand All @@ -1110,7 +1099,7 @@ impl LogsController {
WriteError::FailedPrecondition(_) => {
debug!("Detected a concurrent modification of logs. Fetching the latest logs now.");
// There was a concurrent modification of the logs. Fetch the latest version.
match metadata_store_client
match metadata_writer.metadata_store_client()
.get::<Logs>(BIFROST_CONFIG_KEY.clone())
.await
{
Expand Down Expand Up @@ -1156,8 +1145,6 @@ impl LogsController {
mut debounce: Option<RetryIter<'static>>,
) {
let bifrost = self.bifrost.clone();
let metadata_store_client = self.metadata_store_client.clone();
let metadata_writer = self.metadata_writer.clone();

self.async_operations.spawn(
async move {
Expand All @@ -1167,10 +1154,7 @@ impl LogsController {
tokio::time::sleep(delay).await;
}

let bifrost_admin =
BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client);

match bifrost_admin.seal(log_id, segment_index).await {
match bifrost.admin().seal(log_id, segment_index).await {
Ok(sealed_segment) => {
if sealed_segment.tail.is_sealed() {
Event::SealSucceeded {
Expand Down
Loading

0 comments on commit e2a6647

Please sign in to comment.