diff --git a/Cargo.lock b/Cargo.lock index 56382cff8..93e4b7610 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7585,6 +7585,8 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", "tokio", ] diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index b16bb3466..5ec13ecda 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -40,7 +40,7 @@ static_assertions = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true, features = ["sync"] } -tokio-util = { workspace = true } +tokio-util = { workspace = true, features = ["rt"] } tracing = { workspace = true } xxhash-rust = { workspace = true, features = ["xxh3"] } diff --git a/crates/bifrost/src/loglet/loglet_tests.rs b/crates/bifrost/src/loglet/loglet_tests.rs index 8860e6ea0..ba8799a2d 100644 --- a/crates/bifrost/src/loglet/loglet_tests.rs +++ b/crates/bifrost/src/loglet/loglet_tests.rs @@ -537,10 +537,10 @@ pub async fn append_after_seal_concurrent(loglet: Arc) -> googletest let mut committed = handle??; assert!(!committed.is_empty()); let committed_len = committed.len(); - assert!(committed_len >= WARMUP_APPENDS); + assert_that!(committed_len, ge(WARMUP_APPENDS)); let tail_record = committed.pop().unwrap(); // tail must be beyond seal point - assert!(tail.offset() > tail_record); + assert_that!(tail.offset(), gt(tail_record)); println!( "Committed len={}, last appended={}", committed_len, tail_record diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index a6d810ead..285a2116d 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -172,6 +172,9 @@ impl Loglet for ReplicatedLoglet { } async fn enqueue_batch(&self, payloads: Arc<[Record]>) -> Result { + if self.known_global_tail().is_sealed() { + return Ok(LogletCommit::sealed()); + } metrics::counter!(BIFROST_RECORDS_ENQUEUED_TOTAL).increment(payloads.len() as u64); metrics::counter!(BIFROST_RECORDS_ENQUEUED_BYTES).increment( payloads @@ -205,10 +208,8 @@ impl Loglet for ReplicatedLoglet { .await?; if result == CheckSealOutcome::Sealing { // We are likely to be sealing... - // let's fire a seal to ensure this seal is complete - if self.seal().await.is_ok() { - self.known_global_tail.notify_seal(); - } + // let's fire a seal to ensure this seal is complete. + self.seal().await?; } return Ok(*self.known_global_tail.get()); } @@ -251,7 +252,6 @@ impl Loglet for ReplicatedLoglet { } async fn seal(&self) -> Result<(), OperationError> { - // todo(asoli): If we are the sequencer node, let the sequencer know. let _ = SealTask::new( task_center(), self.my_params.clone(), @@ -260,6 +260,15 @@ impl Loglet for ReplicatedLoglet { ) .run(self.networking.clone()) .await?; + // If we are the sequencer, we need to wait until the sequencer is drained. + if let SequencerAccess::Local { handle } = &self.sequencer { + handle.drain().await?; + self.known_global_tail.notify_seal(); + }; + // On remote sequencer, we only set our global tail to sealed when we call find_tail and it + // returns Sealed. We should NOT: + // - Use AppendError::Sealed to mark our sealed global_tail + // - Mark our global tail as sealed on successful seal() call. info!(loglet_id=%self.my_params.loglet_id, "Loglet has been sealed successfully"); Ok(()) } diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs index fabc664e7..b5bf52569 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer/mod.rs @@ -16,10 +16,12 @@ use std::sync::{ }; use tokio::sync::Semaphore; +use tokio_util::task::TaskTracker; +use tracing::{debug, trace}; use restate_core::{ network::{rpc_router::RpcRouter, Networking, TransportConnect}, - task_center, ShutdownError, TaskKind, + task_center, ShutdownError, }; use restate_types::{ config::Configuration, @@ -96,6 +98,7 @@ pub struct Sequencer { /// Semaphore for the number of records in-flight. /// This is an Arc<> to allow sending owned permits record_permits: Arc, + in_flight: TaskTracker, } impl Sequencer { @@ -145,6 +148,7 @@ impl Sequencer { networking, record_permits, max_inflight_records_in_config: AtomicUsize::new(max_in_flight_records_in_config), + in_flight: TaskTracker::default(), } } @@ -158,6 +162,46 @@ impl Sequencer { self.record_permits.available_permits() } + /// wait until all in-flight appends are drained. Note that this will cause the sequencer to + /// return AppendError::Sealed for new appends but it won't start the seal process itself. The + /// seal process must be started externally. _Only_ after the drain is complete, the caller + /// can set the seal on `known_global_tail` as it's guaranteed that no more work will be + /// done by the sequencer (No acknowledgements will be delivered for appends after the first + /// observed global_tail with is_sealed=true) + /// + /// This method is cancellation safe. + pub async fn drain(&self) -> Result<(), ShutdownError> { + // stop issuing new permits + self.record_permits.close(); + // required to allow in_flight.wait() to finish. + self.in_flight.close(); + // we are assuming here that seal has been already executed on majority of nodes. This is + // important since in_flight.close() doesn't prevent new tasks from being spawned. + + if self + .sequencer_shared_state + .global_committed_tail() + .is_sealed() + { + return Ok(()); + } + + // wait for in-flight tasks to complete before returning + debug!( + loglet_id = %self.sequencer_shared_state.my_params.loglet_id, + "Draining sequencer, waiting for {} inflight appends to complete", + self.in_flight.len(), + ); + self.in_flight.wait().await; + + trace!( + loglet_id = %self.sequencer_shared_state.my_params.loglet_id, + "Sequencer drained", + ); + + Ok(()) + } + pub fn ensure_enough_permits(&self, required: usize) { let mut available = self.max_inflight_records_in_config.load(Ordering::Relaxed); while available < required { @@ -193,16 +237,10 @@ impl Sequencer { self.ensure_enough_permits(payloads.len()); let len = u32::try_from(payloads.len()).expect("batch sizes fit in u32"); - let permit = self - .record_permits - .clone() - .acquire_many_owned(len) - .await - .unwrap(); - - // We are updating the next write offset and we want to make sure that after this call that - // we observe if task_center()'s shutdown signal was set or not consistently across - // threads. + let Ok(permit) = self.record_permits.clone().acquire_many_owned(len).await else { + return Ok(LogletCommit::sealed()); + }; + let offset = LogletOffset::new( self.sequencer_shared_state .next_write_offset @@ -222,17 +260,13 @@ impl Sequencer { commit_resolver, ); - // We are sure that if task-center is shutting down that all future appends will fail so we - // are not so worried about the offset that was updated already above. - task_center().spawn_child( - TaskKind::ReplicatedLogletAppender, - "sequencer-appender", - None, + self.in_flight.spawn({ + let tc = task_center(); async move { - appender.run().await; - Ok(()) - }, - )?; + tc.run_in_scope("sequencer-appender", None, appender.run()) + .await + } + }); Ok(loglet_commit) } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs index 772c7033e..6c64a52c9 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs @@ -112,7 +112,7 @@ impl CheckSealTask { if tail_status.is_known_sealed() { // we only need to see a single node sealed to declare that we are probably sealing (or // sealed) - return Ok(CheckSealOutcome::ProbablyOpen); + return Ok(CheckSealOutcome::Sealing); } nodeset_checker.merge_attribute(next_node, tail_status); } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs index 0206aa0ce..27c7ebc68 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. use tokio::sync::mpsc; -use tracing::trace; +use tracing::{debug, trace}; use restate_core::network::rpc_router::{RpcError, RpcRouter}; use restate_core::network::{Incoming, Networking, TransportConnect}; @@ -18,7 +18,7 @@ use restate_types::config::Configuration; use restate_types::logs::{LogletOffset, SequenceNumber}; use restate_types::net::log_server::{LogServerRequestHeader, Seal, Sealed, Status}; use restate_types::replicated_loglet::{ - EffectiveNodeSet, ReplicatedLogletId, ReplicatedLogletParams, + EffectiveNodeSet, NodeSet, ReplicatedLogletId, ReplicatedLogletParams, }; use restate_types::retries::RetryPolicy; use restate_types::{GenerationalNodeId, PlainNodeId}; @@ -33,8 +33,7 @@ use crate::providers::replicated_loglet::replication::NodeSetChecker; /// responses before acknowleding the seal. /// /// The seal operation is idempotent. It's safe to seal a loglet if it's already partially or fully -/// sealed. Note that the seal task ignores the "seal" state in the input known_global_tail watch, -/// but it will set it to `true` after the seal. +/// sealed. Note that the seal task ignores the "seal" state in the input known_global_tail watch. pub struct SealTask { task_center: TaskCenter, my_params: ReplicatedLogletParams, @@ -100,14 +99,23 @@ impl SealTask { drop(tx); // Max observed local-tail from sealed nodes - let mut max_tail = LogletOffset::INVALID; + let mut max_tail = LogletOffset::OLDEST; while let Some((node_id, local_tail)) = rx.recv().await { max_tail = std::cmp::max(max_tail, local_tail); nodeset_checker.set_attribute(node_id, true); // Do we have f-majority responses? if nodeset_checker.check_fmajority(|sealed| *sealed).passed() { - self.known_global_tail.notify_seal(); + let sealed_nodes: NodeSet = nodeset_checker + .filter(|sealed| *sealed) + .map(|(n, _)| *n) + .collect(); + + debug!(loglet_id = %self.my_params.loglet_id, + max_tail = %max_tail, + "Seal task completed on f-majority of nodes. Sealed log-servers '{}'", + sealed_nodes, + ); // note that the rest of seal requests will continue in the background return Ok(max_tail); } diff --git a/crates/core/src/task_center_types.rs b/crates/core/src/task_center_types.rs index e571c9df4..8daf7c957 100644 --- a/crates/core/src/task_center_types.rs +++ b/crates/core/src/task_center_types.rs @@ -105,7 +105,6 @@ pub enum TaskKind { Watchdog, NetworkMessageHandler, // Replicated loglet tasks - ReplicatedLogletAppender, ReplicatedLogletReadStream, #[strum(props(OnCancel = "abort"))] /// Log-server tasks