Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bifrost] Fix seal correctness on sequencers #2094

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ static_assertions = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true }
tokio-util = { workspace = true, features = ["rt"] }
tracing = { workspace = true }
xxhash-rust = { workspace = true, features = ["xxh3"] }

Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,10 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest
let mut committed = handle??;
assert!(!committed.is_empty());
let committed_len = committed.len();
assert!(committed_len >= WARMUP_APPENDS);
assert_that!(committed_len, ge(WARMUP_APPENDS));
let tail_record = committed.pop().unwrap();
// tail must be beyond seal point
assert!(tail.offset() > tail_record);
assert_that!(tail.offset(), gt(tail_record));
println!(
"Committed len={}, last appended={}",
committed_len, tail_record
Expand Down
19 changes: 14 additions & 5 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
}

async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result<LogletCommit, OperationError> {
if self.known_global_tail().is_sealed() {
return Ok(LogletCommit::sealed());
}
metrics::counter!(BIFROST_RECORDS_ENQUEUED_TOTAL).increment(payloads.len() as u64);
metrics::counter!(BIFROST_RECORDS_ENQUEUED_BYTES).increment(
payloads
Expand Down Expand Up @@ -205,10 +208,8 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
.await?;
if result == CheckSealOutcome::Sealing {
// We are likely to be sealing...
// let's fire a seal to ensure this seal is complete
if self.seal().await.is_ok() {
self.known_global_tail.notify_seal();
}
// let's fire a seal to ensure this seal is complete.
self.seal().await?;
}
return Ok(*self.known_global_tail.get());
}
Expand Down Expand Up @@ -251,7 +252,6 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
}

async fn seal(&self) -> Result<(), OperationError> {
// todo(asoli): If we are the sequencer node, let the sequencer know.
let _ = SealTask::new(
task_center(),
self.my_params.clone(),
Expand All @@ -260,6 +260,15 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
)
.run(self.networking.clone())
.await?;
// If we are the sequencer, we need to wait until the sequencer is drained.
if let SequencerAccess::Local { handle } = &self.sequencer {
handle.drain().await?;
self.known_global_tail.notify_seal();
Comment on lines +265 to +266
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it important to drain before sending the seal notification?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we don't want is:

  • Append goes in flight
  • Append is successful, known_global_tail is not updated yet.
  • Seal is done on log-servers
  • We choose current known_global_tail as sealed
  • Append is acknowledged, known_global_tail is updated.

This leads to a find_tail that conveys the picture as if no appends will be acknowledged after this offset, that's the critical guarantee that we must maintain.

};
// On remote sequencer, we only set our global tail to sealed when we call find_tail and it
// returns Sealed. We should NOT:
// - Use AppendError::Sealed to mark our sealed global_tail
// - Mark our global tail as sealed on successful seal() call.
Comment on lines +270 to +271
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would these two cases be problematic? Is it because the sequencer returns an AppendError::Sealed before the loglet is fully sealed (e.g. 1 log server currently being sealed and the sequencer is hitting this one)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Rejecting appends with Sealed doesn't automatically mean that the current value of known_global_tail is safe to be considered sealed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequencer will return its last known global commit offset, this doesn't mean that the sequencer node itself has set its own seal bit so it can still drift forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the sequencer might send AppendError::Sealed during drain, that doesn't mean that it's knowledge of global tail should be used as the reliable "seal" tail.

info!(loglet_id=%self.my_params.loglet_id, "Loglet has been sealed successfully");
Ok(())
}
Expand Down
76 changes: 55 additions & 21 deletions crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ use std::sync::{
};

use tokio::sync::Semaphore;
use tokio_util::task::TaskTracker;
use tracing::{debug, trace};

use restate_core::{
network::{rpc_router::RpcRouter, Networking, TransportConnect},
task_center, ShutdownError, TaskKind,
task_center, ShutdownError,
};
use restate_types::{
config::Configuration,
Expand Down Expand Up @@ -96,6 +98,7 @@ pub struct Sequencer<T> {
/// Semaphore for the number of records in-flight.
/// This is an Arc<> to allow sending owned permits
record_permits: Arc<Semaphore>,
in_flight: TaskTracker,
}

impl<T: TransportConnect> Sequencer<T> {
Expand Down Expand Up @@ -145,6 +148,7 @@ impl<T: TransportConnect> Sequencer<T> {
networking,
record_permits,
max_inflight_records_in_config: AtomicUsize::new(max_in_flight_records_in_config),
in_flight: TaskTracker::default(),
}
}

Expand All @@ -158,6 +162,46 @@ impl<T: TransportConnect> Sequencer<T> {
self.record_permits.available_permits()
}

/// wait until all in-flight appends are drained. Note that this will cause the sequencer to
/// return AppendError::Sealed for new appends but it won't start the seal process itself. The
/// seal process must be started externally. _Only_ after the drain is complete, the caller
/// can set the seal on `known_global_tail` as it's guaranteed that no more work will be
/// done by the sequencer (No acknowledgements will be delivered for appends after the first
/// observed global_tail with is_sealed=true)
///
/// This method is cancellation safe.
pub async fn drain(&self) -> Result<(), ShutdownError> {
// stop issuing new permits
self.record_permits.close();
// required to allow in_flight.wait() to finish.
self.in_flight.close();
// we are assuming here that seal has been already executed on majority of nodes. This is
// important since in_flight.close() doesn't prevent new tasks from being spawned.

if self
.sequencer_shared_state
.global_committed_tail()
.is_sealed()
{
return Ok(());
}

// wait for in-flight tasks to complete before returning
debug!(
loglet_id = %self.sequencer_shared_state.my_params.loglet_id,
"Draining sequencer, waiting for {} inflight appends to complete",
self.in_flight.len(),
);
self.in_flight.wait().await;

trace!(
loglet_id = %self.sequencer_shared_state.my_params.loglet_id,
"Sequencer drained",
);

Ok(())
}

pub fn ensure_enough_permits(&self, required: usize) {
let mut available = self.max_inflight_records_in_config.load(Ordering::Relaxed);
while available < required {
Expand Down Expand Up @@ -193,16 +237,10 @@ impl<T: TransportConnect> Sequencer<T> {
self.ensure_enough_permits(payloads.len());

let len = u32::try_from(payloads.len()).expect("batch sizes fit in u32");
let permit = self
.record_permits
.clone()
.acquire_many_owned(len)
.await
.unwrap();

// We are updating the next write offset and we want to make sure that after this call that
// we observe if task_center()'s shutdown signal was set or not consistently across
// threads.
let Ok(permit) = self.record_permits.clone().acquire_many_owned(len).await else {
return Ok(LogletCommit::sealed());
};

let offset = LogletOffset::new(
self.sequencer_shared_state
.next_write_offset
Expand All @@ -222,17 +260,13 @@ impl<T: TransportConnect> Sequencer<T> {
commit_resolver,
);

// We are sure that if task-center is shutting down that all future appends will fail so we
// are not so worried about the offset that was updated already above.
task_center().spawn_child(
TaskKind::ReplicatedLogletAppender,
"sequencer-appender",
None,
self.in_flight.spawn({
let tc = task_center();
async move {
appender.run().await;
Ok(())
},
)?;
tc.run_in_scope("sequencer-appender", None, appender.run())
.await
}
});

Ok(loglet_commit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl CheckSealTask {
if tail_status.is_known_sealed() {
// we only need to see a single node sealed to declare that we are probably sealing (or
// sealed)
return Ok(CheckSealOutcome::ProbablyOpen);
return Ok(CheckSealOutcome::Sealing);
}
nodeset_checker.merge_attribute(next_node, tail_status);
}
Expand Down
20 changes: 14 additions & 6 deletions crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// by the Apache License, Version 2.0.

use tokio::sync::mpsc;
use tracing::trace;
use tracing::{debug, trace};

use restate_core::network::rpc_router::{RpcError, RpcRouter};
use restate_core::network::{Incoming, Networking, TransportConnect};
Expand All @@ -18,7 +18,7 @@ use restate_types::config::Configuration;
use restate_types::logs::{LogletOffset, SequenceNumber};
use restate_types::net::log_server::{LogServerRequestHeader, Seal, Sealed, Status};
use restate_types::replicated_loglet::{
EffectiveNodeSet, ReplicatedLogletId, ReplicatedLogletParams,
EffectiveNodeSet, NodeSet, ReplicatedLogletId, ReplicatedLogletParams,
};
use restate_types::retries::RetryPolicy;
use restate_types::{GenerationalNodeId, PlainNodeId};
Expand All @@ -33,8 +33,7 @@ use crate::providers::replicated_loglet::replication::NodeSetChecker;
/// responses before acknowleding the seal.
///
/// The seal operation is idempotent. It's safe to seal a loglet if it's already partially or fully
/// sealed. Note that the seal task ignores the "seal" state in the input known_global_tail watch,
/// but it will set it to `true` after the seal.
/// sealed. Note that the seal task ignores the "seal" state in the input known_global_tail watch.
pub struct SealTask {
task_center: TaskCenter,
my_params: ReplicatedLogletParams,
Expand Down Expand Up @@ -100,14 +99,23 @@ impl SealTask {
drop(tx);

// Max observed local-tail from sealed nodes
let mut max_tail = LogletOffset::INVALID;
let mut max_tail = LogletOffset::OLDEST;
while let Some((node_id, local_tail)) = rx.recv().await {
max_tail = std::cmp::max(max_tail, local_tail);
nodeset_checker.set_attribute(node_id, true);

// Do we have f-majority responses?
if nodeset_checker.check_fmajority(|sealed| *sealed).passed() {
self.known_global_tail.notify_seal();
let sealed_nodes: NodeSet = nodeset_checker
.filter(|sealed| *sealed)
.map(|(n, _)| *n)
.collect();

debug!(loglet_id = %self.my_params.loglet_id,
max_tail = %max_tail,
"Seal task completed on f-majority of nodes. Sealed log-servers '{}'",
sealed_nodes,
);
// note that the rest of seal requests will continue in the background
return Ok(max_tail);
}
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/task_center_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub enum TaskKind {
Watchdog,
NetworkMessageHandler,
// Replicated loglet tasks
ReplicatedLogletAppender,
ReplicatedLogletReadStream,
#[strum(props(OnCancel = "abort"))]
/// Log-server tasks
Expand Down
Loading