diff --git a/crates/bifrost/src/providers/replicated_loglet/network.rs b/crates/bifrost/src/providers/replicated_loglet/network.rs index 3c5fbc7ab..0cfbef5ce 100644 --- a/crates/bifrost/src/providers/replicated_loglet/network.rs +++ b/crates/bifrost/src/providers/replicated_loglet/network.rs @@ -13,13 +13,18 @@ use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use futures::{Stream, StreamExt}; use restate_types::errors::MaybeRetryableError; use tracing::trace; -use restate_core::network::{Incoming, MessageRouterBuilder, Reciprocal, TransportConnect}; -use restate_core::{cancellation_watcher, task_center, Metadata, TaskKind}; +use restate_core::network::{ + Incoming, MessageRouterBuilder, PeerMetadataVersion, Reciprocal, TransportConnect, +}; +use restate_core::{ + cancellation_watcher, task_center, Metadata, MetadataKind, SyncError, TargetVersion, TaskKind, +}; use restate_types::config::ReplicatedLogletOptions; use restate_types::logs::{LogletOffset, SequenceNumber}; use restate_types::net::replicated_loglet::{ @@ -36,7 +41,7 @@ use crate::loglet::{AppendError, Loglet, LogletCommit, OperationError}; type MessageStream = Pin> + Send + Sync + 'static>>; macro_rules! return_error_status { - ($reciprocal:ident, $status:expr, $tail:expr) => {{ + ($reciprocal:expr, $status:expr, $tail:expr) => {{ let msg = Appended { first_offset: LogletOffset::INVALID, header: CommonResponseHeader { @@ -46,15 +51,19 @@ macro_rules! return_error_status { }, }; - let _ = - task_center().spawn_child(TaskKind::Disposable, "append-return-error", None, async { + let _ = task_center().spawn_child( + TaskKind::Disposable, + "append-return-error", + None, + async move { $reciprocal.prepare(msg).send().await?; Ok(()) - }); + }, + ); return; }}; - ($reciprocal:ident, $status:expr) => {{ + ($reciprocal:expr, $status:expr) => {{ let msg = Appended { first_offset: LogletOffset::INVALID, header: CommonResponseHeader { @@ -64,11 +73,15 @@ macro_rules! return_error_status { }, }; - let _ = - task_center().spawn_child(TaskKind::Disposable, "append-return-error", None, async { + let _ = task_center().spawn_child( + TaskKind::Disposable, + "append-return-error", + None, + async move { $reciprocal.prepare(msg).send().await?; Ok(()) - }); + }, + ); return; }}; @@ -126,15 +139,22 @@ impl RequestPump { provider: &ReplicatedLogletProvider, incoming: Incoming, ) { - let (reciprocal, body) = incoming.split(); - - let loglet = match self.get_loglet(provider, &body.header).await { + let loglet = match self + .get_loglet( + provider, + incoming.metadata_version(), + &incoming.body().header, + ) + .await + { Ok(loglet) => loglet, Err(err) => { - return_error_status!(reciprocal, err); + return_error_status!(incoming.create_reciprocal(), err); } }; + let (reciprocal, _) = incoming.split(); + if !loglet.is_sequencer_local() { return_error_status!(reciprocal, SequencerStatus::NotSequencer); } @@ -161,15 +181,21 @@ impl RequestPump { provider: &ReplicatedLogletProvider, incoming: Incoming, ) { - let (reciprocal, append) = incoming.split(); - - let loglet = match self.get_loglet(provider, &append.header).await { + let loglet = match self + .get_loglet( + provider, + incoming.metadata_version(), + &incoming.body().header, + ) + .await + { Ok(loglet) => loglet, Err(err) => { - return_error_status!(reciprocal, err); + return_error_status!(incoming.create_reciprocal(), err); } }; + let (reciprocal, append) = incoming.split(); if !loglet.is_sequencer_local() { return_error_status!(reciprocal, SequencerStatus::NotSequencer); } @@ -195,34 +221,75 @@ impl RequestPump { async fn get_loglet( &self, provider: &ReplicatedLogletProvider, + peer_version: &PeerMetadataVersion, header: &CommonRequestHeader, ) -> Result>, SequencerStatus> { - let loglet = match provider.get_active_loglet(header.log_id, header.segment_index) { - Some(loglet) if loglet.params().loglet_id == header.loglet_id => loglet, - Some(_) => return Err(SequencerStatus::LogletIdMismatch), - None => { - let logs = self.metadata.logs(); - let chain = logs - .chain(&header.log_id) - .ok_or(SequencerStatus::UnknownLogId)?; - - let segment = chain - .iter() - .rev() - .find(|segment| segment.index() == header.segment_index) - .ok_or(SequencerStatus::UnknownSegmentIndex)?; - - provider - .get_or_create_loglet( - header.log_id, - header.segment_index, - &segment.config.params, + let mut current_logs_version = provider.networking().metadata().logs_version(); + let request_logs_version = peer_version.logs.unwrap_or(current_logs_version); + + loop { + if let Some(loglet) = provider.get_active_loglet(header.log_id, header.segment_index) { + if loglet.params().loglet_id == header.loglet_id { + return Ok(loglet); + } + + return Err(SequencerStatus::LogletIdMismatch); + } + + match self.create_loglet(provider, header).await { + Ok(loglet) => return Ok(loglet), + Err(SequencerStatus::UnknownLogId | SequencerStatus::UnknownSegmentIndex) => { + // possible outdated metadata + } + Err(status) => return Err(status), + } + + if request_logs_version < current_logs_version { + match provider + .networking() + .metadata() + .sync( + MetadataKind::Logs, + TargetVersion::Version(request_logs_version), ) - .map_err(SequencerStatus::from)? + .await + { + Ok(_) => continue, + Err(SyncError::Shutdown(_)) => return Err(SequencerStatus::Shutdown), + Err(SyncError::MetadataStore(err)) => { + tracing::trace!(error=%err, target_version=%request_logs_version, "Failed to sync metadata"); + //todo(azmy): make sleep configurable + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + + current_logs_version = provider.networking().metadata().logs_version(); + } else { + return Err(SequencerStatus::UnknownLogId); } - }; + } + } - Ok(loglet) + async fn create_loglet( + &self, + provider: &ReplicatedLogletProvider, + header: &CommonRequestHeader, + ) -> Result>, SequencerStatus> { + // search the chain + let logs = self.metadata.logs(); + let chain = logs + .chain(&header.log_id) + .ok_or(SequencerStatus::UnknownLogId)?; + + let segment = chain + .iter() + .rev() + .find(|segment| segment.index() == header.segment_index) + .ok_or(SequencerStatus::UnknownSegmentIndex)?; + + provider + .get_or_create_loglet(header.log_id, header.segment_index, &segment.config.params) + .map_err(SequencerStatus::from) } } diff --git a/crates/bifrost/src/providers/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs index cfce3727a..5da06eba6 100644 --- a/crates/bifrost/src/providers/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -131,6 +131,9 @@ impl ReplicatedLogletProvider { } } + pub(crate) fn networking(&self) -> &Networking { + &self.networking + } /// Gets a loglet if it's already have been activated pub(crate) fn get_active_loglet( &self,