Skip to content

Commit

Permalink
[Bifrost] Fix seal correctness on sequencers
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Oct 15, 2024
1 parent d87eed8 commit e233ae8
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 32 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ static_assertions = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true }
tokio-util = { workspace = true, features = ["rt"] }
tracing = { workspace = true }
xxhash-rust = { workspace = true, features = ["xxh3"] }

Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,10 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest
let mut committed = handle??;
assert!(!committed.is_empty());
let committed_len = committed.len();
assert!(committed_len >= WARMUP_APPENDS);
assert_that!(committed_len, ge(WARMUP_APPENDS));
let tail_record = committed.pop().unwrap();
// tail must be beyond seal point
assert!(tail.offset() > tail_record);
assert_that!(tail.offset(), gt(tail_record));
println!(
"Committed len={}, last appended={}",
committed_len, tail_record
Expand Down
19 changes: 14 additions & 5 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
}

async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError> {
if self.known_global_tail().is_sealed() {
return Ok(LogletCommit::sealed());
}
metrics::counter!(BIFROST_RECORDS_ENQUEUED_TOTAL).increment(payloads.len() as u64);
metrics::counter!(BIFROST_RECORDS_ENQUEUED_BYTES).increment(
payloads
Expand Down Expand Up @@ -205,10 +208,8 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
.await?;
if result == CheckSealOutcome::Sealing {
// We are likely to be sealing...
// let's fire a seal to ensure this seal is complete
if self.seal().await.is_ok() {
self.known_global_tail.notify_seal();
}
// let's fire a seal to ensure this seal is complete.
self.seal().await?;
}
return Ok(*self.known_global_tail.get());
}
Expand Down Expand Up @@ -251,7 +252,6 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
}

async fn seal(&self) -> Result<(), OperationError> {
// todo(asoli): If we are the sequencer node, let the sequencer know.
let _ = SealTask::new(
task_center(),
self.my_params.clone(),
Expand All @@ -260,6 +260,15 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
)
.run(self.networking.clone())
.await?;
// If we are the sequencer, we need to wait until the sequencer is drained.
if let SequencerAccess::Local { handle } = &self.sequencer {
handle.drain().await?;
self.known_global_tail.notify_seal();
};
// On remote sequencer, we only set our global tail to sealed when we call find_tail and it
// returns Sealed. We should NOT:
// - Use AppendError::Sealed to mark our sealed global_tail
// - Mark our global tail as sealed on successful seal() call.
info!(loglet_id=%self.my_params.loglet_id, "Loglet has been sealed successfully");
Ok(())
}
Expand Down
70 changes: 53 additions & 17 deletions crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use std::sync::{
};

use tokio::sync::Semaphore;
use tokio_util::task::TaskTracker;
use tracing::{debug, trace};

use restate_core::{
network::{rpc_router::RpcRouter, Networking, TransportConnect},
task_center, ShutdownError, TaskKind,
task_center, ShutdownError,
};
use restate_types::{
config::Configuration,
Expand Down Expand Up @@ -96,6 +98,7 @@ pub struct Sequencer<T> {
/// Semaphore for the number of records in-flight.
/// This is an Arc<> to allow sending owned permits
record_permits: Arc<Semaphore>,
in_flight: TaskTracker,
}

impl<T: TransportConnect> Sequencer<T> {
Expand Down Expand Up @@ -145,6 +148,7 @@ impl<T: TransportConnect> Sequencer<T> {
networking,
record_permits,
max_inflight_records_in_config: AtomicUsize::new(max_in_flight_records_in_config),
in_flight: TaskTracker::default(),
}
}

Expand All @@ -158,6 +162,45 @@ impl<T: TransportConnect> Sequencer<T> {
self.record_permits.available_permits()
}

/// wait until all in-flight appends are drained. Note that this will cause the sequencer to
/// return AppendError::Sealed for new appends but it won't start the seal process itself. The
/// seal process must be started externally and _only_ after the drain is complete, the caller
/// can set the seal bit on `known_global_tail` as it's guaranteed that no more work will be
/// done by the sequencer.
///
/// This method is cancellation safe.
pub async fn drain(&self) -> Result<(), ShutdownError> {
// stop issuing new permits
self.record_permits.close();
// required to allow in_flight.wait() to finish.
self.in_flight.close();
// we are assuming here that seal has been already executed on majority of nodes. This is
// important since in_flight.close() doesn't prevent new tasks from being spawned.

if self
.sequencer_shared_state
.global_committed_tail()
.is_sealed()
{
return Ok(());
}

// wait for in-flight tasks to complete before returning
debug!(
loglet_id = %self.sequencer_shared_state.my_params.loglet_id,
"Draining sequencer, waiting for {} inflight appends to complete",
self.in_flight.len(),
);
self.in_flight.wait().await;

trace!(
loglet_id = %self.sequencer_shared_state.my_params.loglet_id,
"Sequencer drained",
);

Ok(())
}

pub fn ensure_enough_permits(&self, required: usize) {
let mut available = self.max_inflight_records_in_config.load(Ordering::Relaxed);
while available < required {
Expand Down Expand Up @@ -193,12 +236,9 @@ impl<T: TransportConnect> Sequencer<T> {
self.ensure_enough_permits(payloads.len());

let len = u32::try_from(payloads.len()).expect("batch sizes fit in u32");
let permit = self
.record_permits
.clone()
.acquire_many_owned(len)
.await
.unwrap();
let Ok(permit) = self.record_permits.clone().acquire_many_owned(len).await else {
return Ok(LogletCommit::sealed());
};

// We are updating the next write offset and we want to make sure that after this call that
// we observe if task_center()'s shutdown signal was set or not consistently across
Expand All @@ -222,17 +262,13 @@ impl<T: TransportConnect> Sequencer<T> {
commit_resolver,
);

// We are sure that if task-center is shutting down that all future appends will fail so we
// are not so worried about the offset that was updated already above.
task_center().spawn_child(
TaskKind::ReplicatedLogletAppender,
"sequencer-appender",
None,
self.in_flight.spawn({
let tc = task_center();
async move {
appender.run().await;
Ok(())
},
)?;
tc.run_in_scope("sequencer-appender", None, appender.run())
.await
}
});

Ok(loglet_commit)
}
Expand Down
20 changes: 14 additions & 6 deletions crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// by the Apache License, Version 2.0.

use tokio::sync::mpsc;
use tracing::trace;
use tracing::{debug, trace};

use restate_core::network::rpc_router::{RpcError, RpcRouter};
use restate_core::network::{Incoming, Networking, TransportConnect};
Expand All @@ -18,7 +18,7 @@ use restate_types::config::Configuration;
use restate_types::logs::{LogletOffset, SequenceNumber};
use restate_types::net::log_server::{LogServerRequestHeader, Seal, Sealed, Status};
use restate_types::replicated_loglet::{
EffectiveNodeSet, ReplicatedLogletId, ReplicatedLogletParams,
EffectiveNodeSet, NodeSet, ReplicatedLogletId, ReplicatedLogletParams,
};
use restate_types::retries::RetryPolicy;
use restate_types::{GenerationalNodeId, PlainNodeId};
Expand All @@ -33,8 +33,7 @@ use crate::providers::replicated_loglet::replication::NodeSetChecker;
/// responses before acknowleding the seal.
///
/// The seal operation is idempotent. It's safe to seal a loglet if it's already partially or fully
/// sealed. Note that the seal task ignores the "seal" state in the input known_global_tail watch,
/// but it will set it to `true` after the seal.
/// sealed. Note that the seal task ignores the "seal" state in the input known_global_tail watch.
pub struct SealTask {
task_center: TaskCenter,
my_params: ReplicatedLogletParams,
Expand Down Expand Up @@ -100,14 +99,23 @@ impl SealTask {
drop(tx);

// Max observed local-tail from sealed nodes
let mut max_tail = LogletOffset::INVALID;
let mut max_tail = LogletOffset::OLDEST;
while let Some((node_id, local_tail)) = rx.recv().await {
max_tail = std::cmp::max(max_tail, local_tail);
nodeset_checker.set_attribute(node_id, true);

// Do we have f-majority responses?
if nodeset_checker.check_fmajority(|sealed| *sealed).passed() {
self.known_global_tail.notify_seal();
let sealed_nodes: NodeSet = nodeset_checker
.filter(|sealed| *sealed)
.map(|(n, _)| *n)
.collect();

debug!(loglet_id = %self.my_params.loglet_id,
max_tail = %max_tail,
"Seal task completed on f-majority of nodes. Sealed log-servers '{}'",
sealed_nodes,
);
// note that the rest of seal requests will continue in the background
return Ok(max_tail);
}
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/task_center_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub enum TaskKind {
Watchdog,
NetworkMessageHandler,
// Replicated loglet tasks
ReplicatedLogletAppender,
ReplicatedLogletReadStream,
#[strum(props(OnCancel = "abort"))]
/// Log-server tasks
Expand Down

0 comments on commit e233ae8

Please sign in to comment.