From 83fb10eb10d047fc450902c314f22f252f97ae2c Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 9 Oct 2024 15:37:58 +0100 Subject: [PATCH] [WIP] RepairTail --- .../src/providers/replicated_loglet/loglet.rs | 2 + .../read_path/read_stream_task.rs | 32 +- .../replicated_loglet/replication/checker.rs | 8 +- .../replication/spread_selector.rs | 53 +++ .../replicated_loglet/rpc_routers.rs | 5 +- .../replicated_loglet/tasks/digests.rs | 339 ++++++++++++++++++ .../replicated_loglet/tasks/find_tail.rs | 54 ++- .../providers/replicated_loglet/tasks/mod.rs | 3 + .../replicated_loglet/tasks/repair_tail.rs | 284 +++++++++++++++ crates/types/src/replicated_loglet/params.rs | 16 +- crates/types/src/replicated_loglet/spread.rs | 10 + 11 files changed, 784 insertions(+), 22 deletions(-) create mode 100644 crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs create mode 100644 crates/bifrost/src/providers/replicated_loglet/tasks/repair_tail.rs diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index a02bf0b29..4610a2782 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -158,6 +158,7 @@ impl Loglet for ReplicatedLoglet { to, known_global_tail, cache, + false, ) .await?; let read_stream = ReplicatedLogletReadStream::new(from, rx_stream, reader_task); @@ -197,6 +198,7 @@ impl Loglet for ReplicatedLoglet { self.networking.clone(), self.logservers_rpc.clone(), self.known_global_tail.clone(), + self.record_cache.clone(), ); let tail_status = task.run().await; match tail_status { diff --git a/crates/bifrost/src/providers/replicated_loglet/read_path/read_stream_task.rs b/crates/bifrost/src/providers/replicated_loglet/read_path/read_stream_task.rs index a1c87dcb1..749a8eb4e 100644 --- a/crates/bifrost/src/providers/replicated_loglet/read_path/read_stream_task.rs +++ b/crates/bifrost/src/providers/replicated_loglet/read_path/read_stream_task.rs @@ -71,6 +71,9 @@ pub struct ReadStreamTask { tx: mpsc::Sender, OperationError>>, record_cache: RecordCache, stats: Stats, + /// If set to true, we won't wait for the global tail to be updated before requesting the next + /// batch. This is used in tail repair tasks. + move_beyond_global_tail: bool, } impl ReadStreamTask { @@ -84,6 +87,7 @@ impl ReadStreamTask { read_to: Option, known_global_tail: TailOffsetWatch, record_cache: RecordCache, + move_beyond_global_tail: bool, ) -> Result< ( mpsc::Receiver, OperationError>>, @@ -107,6 +111,7 @@ impl ReadStreamTask { tx, record_cache, stats: Stats::default(), + move_beyond_global_tail, }; let handle = task_center().spawn_unmanaged( TaskKind::ReplicatedLogletReadStream, @@ -140,12 +145,20 @@ impl ReadStreamTask { debug_assert!(readahead_trigger >= 1 && readahead_trigger <= self.tx.max_capacity()); let mut tail_subscriber = self.global_tail_watch.subscribe(); - // resolves immediately as it's pre-marked as changed. - tail_subscriber - .changed() - .await - .map_err(|_| OperationError::Shutdown(ShutdownError))?; - self.last_known_tail = tail_subscriber.borrow_and_update().offset(); + if self.move_beyond_global_tail { + self.last_known_tail = self + .read_to + .expect("read_to must be set with move_beyond_global_tail=true") + .next(); + } else { + // resolves immediately as it's pre-marked as changed. + tail_subscriber + .changed() + .await + .map_err(|_| OperationError::Shutdown(ShutdownError))?; + self.last_known_tail = tail_subscriber.borrow_and_update().offset(); + } + // todo(asoli): [important] Need to fire up a FindTail task in the background? It depends on whether we // are on the sequencer node or not. We might ask Bifrost's watchdog instead to dedupe // FindTails and time-throttle them. @@ -205,7 +218,7 @@ impl ReadStreamTask { // Are we reading after last_known_tail offset? // We are at tail. We need to wait until new records have been released. - if !self.can_advance() { + if !self.can_advance() && !self.move_beyond_global_tail { // HODL. // todo(asoli): Measure tail-change wait time in histogram // todo(asoli): (who's going to change this? - background FindTail?) @@ -440,7 +453,10 @@ impl ReadStreamTask { timeout: Duration, ) -> Result { let request = GetRecords { - header: LogServerRequestHeader::new(self.my_params.loglet_id, self.last_known_tail), + header: LogServerRequestHeader::new( + self.my_params.loglet_id, + self.global_tail_watch.latest_offset(), + ), total_limit_in_bytes: None, filter: self.filter.clone(), from_offset: self.read_pointer, diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs index 601afd603..8b59e2dbe 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/checker.rs @@ -128,8 +128,12 @@ impl<'a, Attribute> NodeSetChecker<'a, Attribute> { } } - pub fn set_attribute_on_each(&mut self, nodes: &[PlainNodeId], f: impl Fn() -> Attribute) { - for node in nodes { + pub fn set_attribute_on_each<'b>( + &mut self, + nodes: impl IntoIterator, + f: impl Fn() -> Attribute, + ) { + for node in nodes.into_iter() { // ignore if the node is not in the original nodeset if self.storage_states.contains_key(node) { self.node_attribute.insert(*node, f()); diff --git a/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs b/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs index c3f32b1f2..e99dab470 100644 --- a/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs +++ b/crates/bifrost/src/providers/replicated_loglet/replication/spread_selector.rs @@ -111,6 +111,59 @@ impl SpreadSelector { Ok(selected) } + + /// Amend an existing spread with sufficient nodes to _at least_ satisfy the replication + /// property if possible. If not possible, it fails with `InsufficientWriteableNodes' + /// + /// The selector automatically avoids nodes non-writeable nodes + pub fn amend( + &self, + existing_copies: &NodeSet, + rng: &mut R, + nodes_config: &NodesConfiguration, + exclude_nodes: &NodeSet, + ) -> Result { + // Get the list of non-empty nodes from the nodeset given the nodes configuration + let effective_nodeset = self.nodeset.to_effective(nodes_config); + let mut writeable_nodes: Vec<_> = effective_nodeset + .into_iter() + .filter(|node_id| !exclude_nodes.contains(node_id)) + .filter(|node_id| { + nodes_config + .get_log_server_storage_state(node_id) + .can_write_to() + }) + .collect(); + if writeable_nodes.len() < self.replication_property.num_copies().into() { + return Err(SpreadSelectorError::InsufficientWriteableNodes); + } + + let selected: Spread = match &self.strategy { + SelectorStrategy::Flood => { + writeable_nodes.shuffle(rng); + Spread::from(writeable_nodes) + } + #[cfg(any(test, feature = "test-util"))] + SelectorStrategy::Fixed(selector) => selector.select()?, + }; + + // validate that we can have write quorum with this spread + let mut checker = + NodeSetChecker::new(&self.nodeset, nodes_config, &self.replication_property); + checker.set_attribute_on_each(&selected, || true); + if !checker.check_write_quorum(|attr| *attr) { + return Err(SpreadSelectorError::InsufficientWriteableNodes); + } + + // remove existing nodes from selected spread + let selected: Vec<_> = selected + .into_iter() + // keep nodes that are not in existing_copies. + .filter(|n| !existing_copies.contains(n)) + .collect(); + + Ok(selected.into()) + } } static_assertions::assert_impl_all!(SpreadSelector: Send, Sync); diff --git a/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs b/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs index a29c70a8b..85a6dc795 100644 --- a/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs +++ b/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs @@ -14,7 +14,7 @@ use restate_core::network::rpc_router::RpcRouter; use restate_core::network::MessageRouterBuilder; use restate_types::net::log_server::{ - GetLogletInfo, GetRecords, Release, Seal, Store, Trim, WaitForTail, + GetDigest, GetLogletInfo, GetRecords, Release, Seal, Store, Trim, WaitForTail, }; use restate_types::net::replicated_loglet::Append; @@ -28,6 +28,7 @@ pub struct LogServersRpc { pub seal: RpcRouter, pub get_loglet_info: RpcRouter, pub get_records: RpcRouter, + pub get_digest: RpcRouter, pub wait_for_tail: RpcRouter, } @@ -41,6 +42,7 @@ impl LogServersRpc { let seal = RpcRouter::new(router_builder); let get_loglet_info = RpcRouter::new(router_builder); let get_records = RpcRouter::new(router_builder); + let get_digest = RpcRouter::new(router_builder); let wait_for_tail = RpcRouter::new(router_builder); Self { @@ -50,6 +52,7 @@ impl LogServersRpc { seal, get_loglet_info, get_records, + get_digest, wait_for_tail, } } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs new file mode 100644 index 000000000..07cefd362 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs @@ -0,0 +1,339 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; + +use rand::thread_rng; +use tokio::task::JoinSet; +use tracing::{debug, trace, warn}; + +use restate_core::network::rpc_router::{RpcError, RpcRouter}; +use restate_core::network::{Networking, TransportConnect}; +use restate_core::{cancellation_watcher, task_center, ShutdownError}; +use restate_types::logs::{LogletOffset, SequenceNumber}; +use restate_types::net::log_server::{ + Digest, LogServerRequestHeader, RecordStatus, Status, Store, StoreFlags, +}; +use restate_types::nodes_config::NodesConfiguration; +use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletId, ReplicatedLogletParams}; +use restate_types::{GenerationalNodeId, PlainNodeId}; + +use crate::loglet::util::TailOffsetWatch; +use crate::loglet::OperationError; +use crate::providers::replicated_loglet::replication::spread_selector::{ + SelectorStrategy, SpreadSelector, +}; +use crate::providers::replicated_loglet::replication::NodeSetChecker; +use crate::LogEntry; + +#[derive(Debug, thiserror::Error)] +#[error("could not replicate record, exhausted all store attempts")] +struct ReplicationFailed; + +pub struct Digests { + loglet_id: ReplicatedLogletId, + // inclusive + start_offset: LogletOffset, + // exclusive. + target_tail: LogletOffset, + // all offsets `[from_offset..target_tail)` + offsets_under_repair: BTreeMap, + known_nodes: NodeSet, + spread_selector: SpreadSelector, +} + +impl Digests { + pub fn new( + my_params: &ReplicatedLogletParams, + from_offset: LogletOffset, + target_tail: LogletOffset, + ) -> Self { + let offsets = if from_offset >= target_tail { + // already finished + Default::default() + } else { + BTreeMap::from_iter((*from_offset..*target_tail).map(|o| (o, Default::default()))) + }; + + let spread_selector = SpreadSelector::new( + my_params.nodeset.clone(), + // todo: should be from the same configuration source as what the sequencer uses. + SelectorStrategy::Flood, + my_params.replication.clone(), + ); + + Digests { + loglet_id: my_params.loglet_id, + start_offset: from_offset, + target_tail, + known_nodes: Default::default(), + offsets_under_repair: offsets, + spread_selector, + } + } + + pub fn start_offset(&self) -> LogletOffset { + self.start_offset + } + + pub fn target_tail(&self) -> LogletOffset { + self.target_tail + } + + pub fn is_finished(&self) -> bool { + self.start_offset >= self.target_tail + } + + pub fn on_digest_message( + &mut self, + peer_node: PlainNodeId, + msg: Digest, + known_global_tail: &TailOffsetWatch, + ) { + known_global_tail.notify_offset_update(msg.header.known_global_tail); + self.update_from_offset(known_global_tail.latest_offset()); + + if self.is_finished() { + return; + } + + if msg.header.status != Status::Ok { + return; + } + + if !self.known_nodes.insert(peer_node) { + warn!( + loglet_id = %self.loglet_id, + node_id = %peer_node, + "We have received a successful digest from this node already!" + ); + return; + } + + for entry in msg.entries { + for o in *entry.from_offset..=*entry.to_offset { + if o >= *self.start_offset && entry.status == RecordStatus::Exists { + self.offsets_under_repair + .get_mut(&o) + .expect("offset in original range") + .insert(peer_node); + } + } + } + } + + pub async fn replicate_record_and_advance( + &mut self, + entry: LogEntry, + sequencer: GenerationalNodeId, + networking: &Networking, + store_rpc: &RpcRouter, + ) -> Result<(), OperationError> { + let offset = entry.sequence_number(); + if offset < self.start_offset { + // Ignore this record. We have already moved past this offset. + return Ok(()); + } + // If we see a trim gap, we always assume that the trim-gap is up to a previously known + // global tail value so we fast forward until the end of the gap. + if entry.is_trim_gap() { + debug!( + loglet_id = %self.loglet_id, + "Observed a trim gap from offset {} to {} while repairing the tail of the loglet, \ + those offsets will be considered repaired", + offset, + entry.trim_gap_to_sequence_number().unwrap(), + ); + self.update_from_offset(entry.next_sequence_number()); + return Ok(()); + } + + // get a spread. See how many copies do we need to do to achieve write quorum. + let existing_spread = self + .offsets_under_repair + .get(&offset) + .expect("repairing an offset that we have not truncated"); + + let nodes_to_amend = self + .spread_selector + .amend( + existing_spread, + &mut thread_rng(), + &networking.metadata().nodes_config_ref(), + &NodeSet::empty(), + ) + // what do we do if we can't generate a spread? nodes are in data-loss or readonly, or + // whatever state. + // We definitely cannot proceed but should we retry? let's only do that when we need to, + // for now, we bail. + .map_err(OperationError::retryable)?; + + trace!( + loglet_id = %self.loglet_id, + %offset, + "Repairing record, existing copies on {} and nodes to add are {}", + existing_spread, + nodes_to_amend, + ); + + let record = entry.into_record().expect("must be a data record"); + + let mut replication_checker = NodeSetChecker::new( + self.spread_selector.nodeset(), + &networking.metadata().nodes_config_ref(), + self.spread_selector.replication_property(), + ); + // record is already replicated on those nodes + replication_checker.set_attribute_on_each(existing_spread, || true); + + let msg = Store { + // As we send store messages, we consider the from_offset a reliable source of + // global_known_loglet. This allows log-servers to accept those writes if they were still + // behind. + header: LogServerRequestHeader::new(self.loglet_id, self.start_offset), + timeout_at: None, + // Must be set to bypass the seal + flags: StoreFlags::IgnoreSeal, + first_offset: offset, + sequencer, + known_archived: LogletOffset::INVALID, + payloads: vec![record], + }; + // We run stores as tasks because we'll wait only for the necessary write-quorum but the + // rest of the stores can continue in the background as best-effort replication (if the + // spread selector strategy picked extra nodes) + let mut inflight_stores = JoinSet::new(); + for node in nodes_to_amend { + inflight_stores.spawn({ + let networking = networking.clone(); + let msg = msg.clone(); + let store_rpc = store_rpc.clone(); + let tc = task_center(); + async move { + tc.run_in_scope("repair-store", None, async move { + (node, store_rpc.call(&networking, node, msg).await) + }) + .await + } + }); + } + let mut cancel = std::pin::pin!(cancellation_watcher()); + + loop { + if replication_checker.check_write_quorum(|attr| *attr) { + trace!( + loglet_id = %self.loglet_id, + %offset, + "Record has been repaired" + ); + // record has been fully replicated. + self.update_from_offset(offset); + return Ok(()); + } + + if inflight_stores.is_empty() { + // No more store attempts left. We couldn't replicate this record. + return Err(OperationError::retryable(ReplicationFailed)); + } + + let stored_on_peer = tokio::select! { + _ = &mut cancel => { + return Err(OperationError::Shutdown(ShutdownError)); + } + Some(Ok((peer, maybe_stored))) = inflight_stores.join_next() => { + // maybe_stored is err if we can't send the store (or shutdown) + match maybe_stored { + Ok(stored) if stored.body().header.status == Status::Ok => { + Some(peer) + } + Ok(stored) => { + // Store failed with some non-ok status + debug!( + loglet_id = %self.loglet_id, + peer = %stored.peer(), + %offset, + "Could not store record on node as part of the tail repair procedure. Log server responded with status={:?}", + stored.body().header.status + ); + None + } + Err(RpcError::Shutdown(e)) => return Err(OperationError::Shutdown(e)), + // give up on this store. + Err(e) => { + debug!( + loglet_id = %self.loglet_id, + %peer, + %offset, + %e, + "Could not store record on node as part of the tail repair procedure. Network error", + ); + None + } + } + } + }; + + if let Some(stored_on_peer) = stored_on_peer { + replication_checker.set_attribute(stored_on_peer, true); + } + } + } + + pub fn can_repair(&self, nodes_config: &NodesConfiguration) -> bool { + // only do that if known_nodes can satisfy write-quorum. + let mut checker = NodeSetChecker::new( + self.spread_selector.nodeset(), + nodes_config, + self.spread_selector.replication_property(), + ); + checker.set_attribute_on_each(&self.known_nodes, || true); + checker.check_write_quorum(|known| *known) + } + + // returns true if we can advance to repair + pub fn advance(&mut self, nodes_config: &NodesConfiguration) -> bool { + if !self.can_repair(nodes_config) { + // we don't have write-quorum of nodes yet, we can't advance from_offset. + return false; + } + let mut range = self.offsets_under_repair.range(..); + + // walk backwards + while let Some((offset, nodes)) = range.next_back() { + let mut checker = NodeSetChecker::new( + self.spread_selector.nodeset(), + nodes_config, + self.spread_selector.replication_property(), + ); + checker.set_attribute_on_each(nodes, || true); + if checker.check_write_quorum(|known| *known) { + self.update_from_offset(LogletOffset::new(offset + 1)); + return true; + } + } + false + } + + fn update_from_offset(&mut self, new_from_offset: LogletOffset) { + let original = self.start_offset; + self.start_offset = self.start_offset.max(new_from_offset); + if self.start_offset != original { + self.truncate_range(); + } + } + + fn truncate_range(&mut self) { + if self.is_finished() { + self.offsets_under_repair.clear(); + } else { + self.offsets_under_repair = self.offsets_under_repair.split_off(&self.start_offset); + } + } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs index c8b976e38..e68d43cb9 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs @@ -8,21 +8,23 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::time::Duration; + use tokio::task::JoinSet; -use tracing::{debug, error, trace}; +use tracing::{debug, error, info, trace}; use restate_core::network::rpc_router::{RpcError, RpcRouter}; use restate_core::network::{Networking, TransportConnect}; use restate_core::TaskCenter; use restate_types::config::Configuration; -use restate_types::logs::LogletOffset; +use restate_types::logs::{LogletOffset, RecordCache}; use restate_types::net::log_server::{GetLogletInfo, LogServerRequestHeader, Status, WaitForTail}; use restate_types::replicated_loglet::{ EffectiveNodeSet, ReplicatedLogletId, ReplicatedLogletParams, }; use restate_types::PlainNodeId; -use super::SealTask; +use super::{RepairTail, RepairTailResult, SealTask}; use crate::loglet::util::TailOffsetWatch; use crate::providers::replicated_loglet::replication::{Merge, NodeSetChecker}; use crate::providers::replicated_loglet::rpc_routers::LogServersRpc; @@ -35,7 +37,7 @@ use crate::providers::replicated_loglet::rpc_routers::LogServersRpc; /// it to get the tail but this can be used as a side channel optimization. /// /// If the loglet is being sealed partially, this will create a new seal task to assist (in case -/// the previous seal process crashed). Additionally, we will start a TailRepair task to ensure consistent +/// the previous seal process crashed). Additionally, we will start a RepairTail task to ensure consistent /// state of the records between known_global_tail and the max(local_tail) observed from f-majority /// of sealed log-servers. /// @@ -47,6 +49,7 @@ pub struct FindTailTask { networking: Networking, logservers_rpc: LogServersRpc, known_global_tail: TailOffsetWatch, + record_cache: RecordCache, } pub enum FindTailResult { @@ -124,6 +127,7 @@ impl FindTailTask { networking: Networking, logservers_rpc: LogServersRpc, known_global_tail: TailOffsetWatch, + record_cache: RecordCache, ) -> Self { Self { task_center, @@ -131,6 +135,7 @@ impl FindTailTask { my_params, logservers_rpc, known_global_tail, + record_cache, } } @@ -255,7 +260,7 @@ impl FindTailTask { // Great. All nodes sealed and we have a stable value to return. // // todo: If some nodes have lower global-tail than max-local-tail, then - // broadcast a a release to max-tail to avoid unnecessary repair if + // broadcast a release to max-tail to avoid unnecessary repair if // underreplication happened after this point. inflight_info_requests.abort_all(); // Note: We don't set the known_global_tail watch to this value nor the @@ -266,9 +271,9 @@ impl FindTailTask { }; } // F-majority sealed, but tail needs repair in range - // [current_known_tail..max_local_tail] + // [current_known_global..max_local_tail] // - // todo: Although we have f-majority, it's not always guaranteed that we are + // Although we have f-majority, it's not always guaranteed that we are // able to form a write-quorum within this set of nodes. For instance, if // replication-factor is 4 in a nodeset of 5 nodes, F-majority is 2 nodes which // isn't enough to form write-quorum. In this case, we need to wait for more @@ -276,8 +281,39 @@ impl FindTailTask { // current_known_global. Once we have enough sealed node that match // write-quorum **and** f-majority, then we can repair the tail. if nodeset_checker.check_write_quorum(NodeTailStatus::is_known) { - // We can repair. - todo!("Tail repair is not implemented yet") + // We can start repair. + match RepairTail::new( + self.my_params.clone(), + self.task_center.clone(), + self.networking.clone(), + self.logservers_rpc.clone(), + self.record_cache.clone(), + self.known_global_tail.clone(), + current_known_global, + max_local_tail, + ) + .run() + .await + { + RepairTailResult::Completed => { + return FindTailResult::Sealed { + global_tail: max_local_tail, + } + } + RepairTailResult::DigestFailed + | RepairTailResult::ReplicationFailed { .. } => { + // retry the whole find-tail procedure. + info!( + "Tail repair failed. Restarting FindTail task for loglet_id={}", + self.my_params.loglet_id + ); + tokio::time::sleep(Duration::from_secs(2)).await; + continue 'find_tail; + } + RepairTailResult::Shutdown(e) => { + return FindTailResult::Error(e.to_string()); + } + } } else { // wait for more nodes break 'check_nodeset; diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs index 0569416b3..ae7f142d0 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs @@ -8,8 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod digests; mod find_tail; +mod repair_tail; mod seal; pub use find_tail::*; +pub use repair_tail::*; pub use seal::*; diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/repair_tail.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/repair_tail.rs new file mode 100644 index 000000000..1e1799dbf --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/repair_tail.rs @@ -0,0 +1,284 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::time::Duration; + +use restate_core::network::{Networking, TransportConnect}; +use restate_core::{ShutdownError, TaskCenter}; +use restate_types::logs::{KeyFilter, LogletOffset, RecordCache, SequenceNumber}; +use restate_types::net::log_server::{GetDigest, LogServerRequestHeader}; +use restate_types::replicated_loglet::{EffectiveNodeSet, ReplicatedLogletParams}; +use tokio::task::JoinSet; +use tracing::{trace, warn}; + +use crate::loglet::util::TailOffsetWatch; +use crate::providers::replicated_loglet::read_path::ReadStreamTask; +use crate::providers::replicated_loglet::rpc_routers::LogServersRpc; + +use super::digests::Digests; + +/// # Overview +/// +/// The primary reason for the need to repair the tail is that the sequencer node will mark the record as +/// released _before_ it sends out a Release message to log-servers. We also don't +/// require a quorum of nodes to acknowledge the Release message. This is an important optimization +/// for write latency as the sequencer needs 1-RTT(store wave) to acknowledge a write instead of 2-RTT +/// (store+release). +/// +/// The tradeoff is the speed at which we can determine the last global committed tail if the +/// sequencer is not available. There is a number of optimizations that can be added to improve the +/// efficiency of this operation, those include but are not limited to: +/// - Log-servers persisting the last known_global_tail periodically and resetting their local-tail +/// to this value on startup. +/// - Sequencer-driven seal. If the sequencer is alive, it can send a special value with the seal +/// message to indicate what is the actual known-global-tail that nodes should try and repair to +/// instead of relying on the max-tail. +/// - Limit `start_offset` to repair from to max(min(local_tails), max(known_global_tails), known_archived, trim_point) +/// - Archiving records to the external object-store instead of re-replication. +/// +/// The process to repair the tail of a loglet so we have confidence in the max-tail value. +/// The goal of repair is to ensure +/// +/// The repair process must be reentrant, fault-tolerant, and yields correct immutable state of the +/// tail even with concurrent runs. +/// +/// +/// # Details +/// +/// ## Digest Phase (where are my records?) +/// Given the input range [max(known_global_tails)..max-local-tail], determine the biggest offset +/// at which we can find evidence of a write-quorum of a record, including unauthoritative nodes +/// (`StorageState::DataLoss`). Call this `max_known_durable_offset`. Update the repair range to +/// [max-known-durable-offset..max-local-tail]. if len()=0, we have no work to do. max-local-tail is fully replicated. +/// +/// Ask nodes to send a digest first before reading the records? Who has what. +/// - For a digest request [start_offset, max-tail-requested] +/// - Node sends a list that looks like this: +/// (inclusive_from, inclusive_to) +/// [start_offset..t1] -> A. Good to know. +/// [t2..t10] -> X (Spread1) - len = t10-t1 = 9 records. how much does the spread info make +/// a difference here? it just tells me which nodes to read from, but we don't care. we don't +/// need single copy-delivery. +/// [t11..t11] -> X - 1 record +/// this implies [t12..max-tail-requested-1) has no records. +/// +/// +/// ## Replication Phase (restore replication) +/// +/// Starting from the beginning of the range, send a request to read record from nodes according to digest results, +/// This depends on the repair strategy: +/// A) Replicate while ignoring the seal flag to achieve write-quorum. Use original spread information if available and add new nodes +/// if insufficient nodes are writeable. +/// B) Batch the set of records and upload to object-store. Send an update to write-quorum to at-least a write-quorum of nodes to move +/// the known-archived pointer. The write-quorum is not strictly required in this case iff the +/// object-store format allows a cheap query to ask if an offset is archived or not. +/// or every repaired record, update the known_global_tail to allow nodes to move their +/// local-tail during repair. This also allows them to persist this value. +/// +/// ## todo: Completion Phase (try to avoid same-range repair in future runs) +/// Once max-tail is reached. Send a Release message with special flag (repair) to update +/// log-servers with the newly agreed-upon known_global_tail. This is a best-effort phase +/// and it should not block the completion of the repair task. +pub struct RepairTail { + my_params: ReplicatedLogletParams, + task_center: TaskCenter, + networking: Networking, + logservers_rpc: LogServersRpc, + record_cache: RecordCache, + known_global_tail: TailOffsetWatch, + digests: Digests, +} + +pub enum RepairTailResult { + Completed, + DigestFailed, + ReplicationFailed, + // Due to system shutdown + Shutdown(ShutdownError), +} + +impl RepairTail { + #[allow(clippy::too_many_arguments)] + pub fn new( + my_params: ReplicatedLogletParams, + task_center: TaskCenter, + networking: Networking, + logservers_rpc: LogServersRpc, + record_cache: RecordCache, + known_global_tail: TailOffsetWatch, + start_offset: LogletOffset, + target_tail: LogletOffset, + ) -> Self { + let digests = Digests::new(&my_params, start_offset, target_tail); + RepairTail { + my_params, + task_center, + networking, + logservers_rpc, + record_cache, + known_global_tail, + digests, + } + } + + pub async fn run(mut self) -> RepairTailResult { + if self.digests.is_finished() { + return RepairTailResult::Completed; + } + let mut get_digest_requests = JoinSet::new(); + let effective_nodeset = EffectiveNodeSet::new( + &self.my_params.nodeset, + &self.networking.metadata().nodes_config_ref(), + ); + + // Dispatch GetDigest to all readable nodes + for node in effective_nodeset.iter() { + let msg = GetDigest { + header: LogServerRequestHeader::new( + self.my_params.loglet_id, + self.known_global_tail.latest_offset(), + ), + from_offset: self.digests.start_offset(), + to_offset: self.digests.target_tail().prev(), + }; + get_digest_requests.spawn({ + let tc = self.task_center.clone(); + let networking = self.networking.clone(); + let logservers_rpc = self.logservers_rpc.clone(); + let peer = *node; + async move { + tc.run_in_scope("get-digest-from-node", None, async move { + loop { + // todo: handle retries with exponential backoff... + let Ok(incoming) = logservers_rpc + .get_digest + .call(&networking, peer, msg.clone()) + .await + else { + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + }; + return incoming; + } + }) + .await + } + }); + } + + // # Digest Phase + while let Some(Ok(digest_message)) = get_digest_requests.join_next().await { + let peer_node = digest_message.peer().as_plain(); + self.digests.on_digest_message( + peer_node, + digest_message.into_body(), + &self.known_global_tail, + ); + if self + .digests + .advance(&self.networking.metadata().nodes_config_ref()) + { + trace!( + loglet_id = %self.my_params.loglet_id, + node_id = %peer_node, + "Digest phase completed." + ); + break; + } + // can we start repair, but continue to accept digest responses as repair is on-going. + } + + if self.digests.is_finished() { + return RepairTailResult::Completed; + } + + // No enough nodes responded to be able to repair + if !self + .digests + .can_repair(&self.networking.metadata().nodes_config_ref()) + { + return RepairTailResult::DigestFailed; + } + + // Keep reading responses from digest requests since it can assist moving the start_offset + // during repair. In this case, we'll fast-forward and ignore replication for the updated + // range, this might lead to some over-replication and that's fine. + // For every record between start_offset->target_tail.prev() we need to replicate enough + // copies to satisfy the write-quorum. + // + // Replication goes in the direction of "start_offset" towards the tail, one record at a + // a time. + let Ok((mut rx, read_stream_task)) = ReadStreamTask::start( + self.my_params.clone(), + self.networking.clone(), + self.logservers_rpc.clone(), + KeyFilter::Any, + self.digests.start_offset(), + Some(self.digests.target_tail().prev()), + self.known_global_tail.clone(), + self.record_cache.clone(), + /* move-beyond-global-tail = */ true, + ) + .await + else { + return RepairTailResult::Shutdown(ShutdownError); + }; + + 'replication_phase: loop { + if self.digests.is_finished() { + break; + } + tokio::select! { + // we fail the readstream during shutdown only, in that case, there is not much + // we can do but to stop. + Some(Ok(entry)) = rx.recv() => { + // we received a record. Should we replicate it? + if let Err(e) = self.digests.replicate_record_and_advance( + entry, + self.my_params.sequencer, + &self.networking, + &self.logservers_rpc.store, + ).await { + warn!(error=%e, "Failed to replicate record while repairing the tail"); + break 'replication_phase; + } + } + Some(Ok(digest_message)) = get_digest_requests.join_next() => { + let peer_node = digest_message.peer().as_plain(); + self.digests.on_digest_message( + peer_node, + digest_message.into_body(), + &self.known_global_tail, + ); + self.digests.advance(&self.networking.metadata().nodes_config_ref()); + + } + // we have no more work to do. We'll likely fail. + else => {} + } + } + + read_stream_task.abort(); + get_digest_requests.abort_all(); + + // Are we complete? + if self.digests.is_finished() { + return RepairTailResult::Completed; + } + + warn!( + loglet_id = %self.my_params.loglet_id, + "Failed to repair the tail. The unrepaired region is from {} to {}", + self.digests.start_offset(), + self.digests.target_tail().prev() + ); + RepairTailResult::ReplicationFailed + } +} diff --git a/crates/types/src/replicated_loglet/params.rs b/crates/types/src/replicated_loglet/params.rs index 351aaebc3..65459b547 100644 --- a/crates/types/src/replicated_loglet/params.rs +++ b/crates/types/src/replicated_loglet/params.rs @@ -80,6 +80,7 @@ impl ReplicatedLogletId { serde::Deserialize, Debug, Clone, + Default, Eq, PartialEq, derive_more::IntoIterator, @@ -119,8 +120,9 @@ impl NodeSet { self.0.contains(node) } - pub fn insert(&mut self, node: PlainNodeId) { - self.0.insert(node); + /// returns true if this node didn't already exist in the nodeset + pub fn insert(&mut self, node: PlainNodeId) -> bool { + self.0.insert(node) } /// Returns true if all nodes in the nodeset are disabled @@ -151,6 +153,16 @@ impl NodeSet { } } +impl<'a> IntoIterator for &'a NodeSet { + type Item = &'a PlainNodeId; + + type IntoIter = <&'a HashSet as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.iter() + } +} + impl From<[PlainNodeId; N]> for NodeSet { fn from(value: [PlainNodeId; N]) -> Self { Self(From::from(value)) diff --git a/crates/types/src/replicated_loglet/spread.rs b/crates/types/src/replicated_loglet/spread.rs index 0bc3eeb15..4118ee82b 100644 --- a/crates/types/src/replicated_loglet/spread.rs +++ b/crates/types/src/replicated_loglet/spread.rs @@ -62,3 +62,13 @@ impl From<[u32; N]> for Spread { Self(value.into_iter().map(PlainNodeId::from).collect()) } } + +impl<'a> IntoIterator for &'a Spread { + type Item = &'a PlainNodeId; + + type IntoIter = <&'a Box<[PlainNodeId]> as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.iter() + } +}