Skip to content

Commit

Permalink
[ReplicatedLoglet] Sync to metadata version on append
Browse files Browse the repository at this point in the history
  • Loading branch information
muhamadazmy committed Oct 11, 2024
1 parent c2a90bc commit eb03bd2
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 41 deletions.
149 changes: 108 additions & 41 deletions crates/bifrost/src/providers/replicated_loglet/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@

use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use futures::{Stream, StreamExt};
use restate_types::errors::MaybeRetryableError;
use tracing::trace;

use restate_core::network::{Incoming, MessageRouterBuilder, Reciprocal, TransportConnect};
use restate_core::{cancellation_watcher, task_center, Metadata, TaskKind};
use restate_core::network::{
Incoming, MessageRouterBuilder, PeerMetadataVersion, Reciprocal, TransportConnect,
};
use restate_core::{
cancellation_watcher, task_center, Metadata, MetadataKind, SyncError, TargetVersion, TaskKind,
};
use restate_types::config::ReplicatedLogletOptions;
use restate_types::logs::{LogletOffset, SequenceNumber};
use restate_types::net::replicated_loglet::{
Expand All @@ -36,7 +41,7 @@ use crate::loglet::{AppendError, Loglet, LogletCommit, OperationError};
type MessageStream<T> = Pin<Box<dyn Stream<Item = Incoming<T>> + Send + Sync + 'static>>;

macro_rules! return_error_status {
($reciprocal:ident, $status:expr, $tail:expr) => {{
($reciprocal:expr, $status:expr, $tail:expr) => {{
let msg = Appended {
first_offset: LogletOffset::INVALID,
header: CommonResponseHeader {
Expand All @@ -46,15 +51,19 @@ macro_rules! return_error_status {
},
};

let _ =
task_center().spawn_child(TaskKind::Disposable, "append-return-error", None, async {
let _ = task_center().spawn_child(
TaskKind::Disposable,
"append-return-error",
None,
async move {
$reciprocal.prepare(msg).send().await?;
Ok(())
});
},
);

return;
}};
($reciprocal:ident, $status:expr) => {{
($reciprocal:expr, $status:expr) => {{
let msg = Appended {
first_offset: LogletOffset::INVALID,
header: CommonResponseHeader {
Expand All @@ -64,11 +73,15 @@ macro_rules! return_error_status {
},
};

let _ =
task_center().spawn_child(TaskKind::Disposable, "append-return-error", None, async {
let _ = task_center().spawn_child(
TaskKind::Disposable,
"append-return-error",
None,
async move {
$reciprocal.prepare(msg).send().await?;
Ok(())
});
},
);

return;
}};
Expand Down Expand Up @@ -126,15 +139,22 @@ impl RequestPump {
provider: &ReplicatedLogletProvider<T>,
incoming: Incoming<GetSequencerState>,
) {
let (reciprocal, body) = incoming.split();

let loglet = match self.get_loglet(provider, &body.header).await {
let loglet = match self
.get_loglet(
provider,
incoming.metadata_version(),
&incoming.body().header,
)
.await
{
Ok(loglet) => loglet,
Err(err) => {
return_error_status!(reciprocal, err);
return_error_status!(incoming.create_reciprocal(), err);
}
};

let (reciprocal, _) = incoming.split();

if !loglet.is_sequencer_local() {
return_error_status!(reciprocal, SequencerStatus::NotSequencer);
}
Expand All @@ -161,15 +181,21 @@ impl RequestPump {
provider: &ReplicatedLogletProvider<T>,
incoming: Incoming<Append>,
) {
let (reciprocal, append) = incoming.split();

let loglet = match self.get_loglet(provider, &append.header).await {
let loglet = match self
.get_loglet(
provider,
incoming.metadata_version(),
&incoming.body().header,
)
.await
{
Ok(loglet) => loglet,
Err(err) => {
return_error_status!(reciprocal, err);
return_error_status!(incoming.create_reciprocal(), err);
}
};

let (reciprocal, append) = incoming.split();
if !loglet.is_sequencer_local() {
return_error_status!(reciprocal, SequencerStatus::NotSequencer);
}
Expand All @@ -195,34 +221,75 @@ impl RequestPump {
async fn get_loglet<T: TransportConnect>(
&self,
provider: &ReplicatedLogletProvider<T>,
peer_version: &PeerMetadataVersion,
header: &CommonRequestHeader,
) -> Result<Arc<ReplicatedLoglet<T>>, SequencerStatus> {
let loglet = match provider.get_active_loglet(header.log_id, header.segment_index) {
Some(loglet) if loglet.params().loglet_id == header.loglet_id => loglet,
Some(_) => return Err(SequencerStatus::LogletIdMismatch),
None => {
let logs = self.metadata.logs();
let chain = logs
.chain(&header.log_id)
.ok_or(SequencerStatus::UnknownLogId)?;

let segment = chain
.iter()
.rev()
.find(|segment| segment.index() == header.segment_index)
.ok_or(SequencerStatus::UnknownSegmentIndex)?;

provider
.get_or_create_loglet(
header.log_id,
header.segment_index,
&segment.config.params,
let mut current_logs_version = provider.networking().metadata().logs_version();
let request_logs_version = peer_version.logs.unwrap_or(current_logs_version);

loop {
if let Some(loglet) = provider.get_active_loglet(header.log_id, header.segment_index) {
if loglet.params().loglet_id == header.loglet_id {
return Ok(loglet);
}

return Err(SequencerStatus::LogletIdMismatch);
}

match self.create_loglet(provider, header).await {
Ok(loglet) => return Ok(loglet),
Err(SequencerStatus::UnknownLogId | SequencerStatus::UnknownSegmentIndex) => {
// possible outdated metadata
}
Err(status) => return Err(status),
}

if request_logs_version < current_logs_version {
match provider
.networking()
.metadata()
.sync(
MetadataKind::Logs,
TargetVersion::Version(request_logs_version),
)
.map_err(SequencerStatus::from)?
.await
{
Ok(_) => continue,
Err(SyncError::Shutdown(_)) => return Err(SequencerStatus::Shutdown),
Err(SyncError::MetadataStore(err)) => {
tracing::trace!(error=%err, target_version=%request_logs_version, "Failed to sync metadata");
//todo(azmy): make sleep configurable
tokio::time::sleep(Duration::from_millis(100)).await;
}
}

current_logs_version = provider.networking().metadata().logs_version();
} else {
return Err(SequencerStatus::UnknownLogId);
}
};
}
}

Ok(loglet)
async fn create_loglet<T: TransportConnect>(
&self,
provider: &ReplicatedLogletProvider<T>,
header: &CommonRequestHeader,
) -> Result<Arc<ReplicatedLoglet<T>>, SequencerStatus> {
// search the chain
let logs = self.metadata.logs();
let chain = logs
.chain(&header.log_id)
.ok_or(SequencerStatus::UnknownLogId)?;

let segment = chain
.iter()
.rev()
.find(|segment| segment.index() == header.segment_index)
.ok_or(SequencerStatus::UnknownSegmentIndex)?;

provider
.get_or_create_loglet(header.log_id, header.segment_index, &segment.config.params)
.map_err(SequencerStatus::from)
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ impl<T: TransportConnect> ReplicatedLogletProvider<T> {
}
}

pub(crate) fn networking(&self) -> &Networking<T> {
&self.networking
}
/// Gets a loglet if it's already have been activated
pub(crate) fn get_active_loglet(
&self,
Expand Down

0 comments on commit eb03bd2

Please sign in to comment.