Skip to content

Commit

Permalink
[Bifrost] RepairTail task for replicated loglet
Browse files Browse the repository at this point in the history
This puts together the design and implementation of the tail repair procedure that's required when FindTail cannot establish a consistent durable tail from log-servers. The details are described as comments in code.
  • Loading branch information
AhmedSoliman committed Oct 10, 2024
1 parent a7debda commit e11e1f9
Show file tree
Hide file tree
Showing 11 changed files with 820 additions and 22 deletions.
2 changes: 2 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
to,
known_global_tail,
cache,
false,
)
.await?;
let read_stream = ReplicatedLogletReadStream::new(from, rx_stream, reader_task);
Expand Down Expand Up @@ -209,6 +210,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
self.networking.clone(),
self.logservers_rpc.clone(),
self.known_global_tail.clone(),
self.record_cache.clone(),
);
let tail_status = task.run().await;
match tail_status {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ pub struct ReadStreamTask {
read_pointer: LogletOffset,
/// Last offset to read before terminating the stream. None means "tailing" reader.
/// *Inclusive*
/// This must be set if `move_beyond_global_tail` is true.
read_to: Option<LogletOffset>,
tx: mpsc::Sender<Result<LogEntry<LogletOffset>, OperationError>>,
record_cache: RecordCache,
stats: Stats,
/// If set to true, we won't wait for the global tail to be updated before requesting the next
/// batch. This is used in tail repair tasks.
move_beyond_global_tail: bool,
}

impl ReadStreamTask {
Expand All @@ -84,13 +88,17 @@ impl ReadStreamTask {
read_to: Option<LogletOffset>,
known_global_tail: TailOffsetWatch,
record_cache: RecordCache,
move_beyond_global_tail: bool,
) -> Result<
(
mpsc::Receiver<Result<LogEntry<LogletOffset>, OperationError>>,
TaskHandle<Result<(), OperationError>>,
),
OperationError,
> {
if !move_beyond_global_tail && read_to.is_none() {
panic!("read_to must be set if move_beyond_global_tail is false");
}
// todo(asoli): configuration
let (tx, rx) = mpsc::channel(100);
// Reading from INVALID resets to OLDEST.
Expand All @@ -107,6 +115,7 @@ impl ReadStreamTask {
tx,
record_cache,
stats: Stats::default(),
move_beyond_global_tail,
};
let handle = task_center().spawn_unmanaged(
TaskKind::ReplicatedLogletReadStream,
Expand Down Expand Up @@ -140,12 +149,20 @@ impl ReadStreamTask {
debug_assert!(readahead_trigger >= 1 && readahead_trigger <= self.tx.max_capacity());

let mut tail_subscriber = self.global_tail_watch.subscribe();
// resolves immediately as it's pre-marked as changed.
tail_subscriber
.changed()
.await
.map_err(|_| OperationError::Shutdown(ShutdownError))?;
self.last_known_tail = tail_subscriber.borrow_and_update().offset();
if self.move_beyond_global_tail {
self.last_known_tail = self
.read_to
.expect("read_to must be set with move_beyond_global_tail=true")
.next();
} else {
// resolves immediately as it's pre-marked as changed.
tail_subscriber
.changed()
.await
.map_err(|_| OperationError::Shutdown(ShutdownError))?;
self.last_known_tail = tail_subscriber.borrow_and_update().offset();
}

// todo(asoli): [important] Need to fire up a FindTail task in the background? It depends on whether we
// are on the sequencer node or not. We might ask Bifrost's watchdog instead to dedupe
// FindTails and time-throttle them.
Expand Down Expand Up @@ -205,7 +222,7 @@ impl ReadStreamTask {

// Are we reading after last_known_tail offset?
// We are at tail. We need to wait until new records have been released.
if !self.can_advance() {
if !self.can_advance() && !self.move_beyond_global_tail {
// HODL.
// todo(asoli): Measure tail-change wait time in histogram
// todo(asoli): (who's going to change this? - background FindTail?)
Expand Down Expand Up @@ -440,7 +457,10 @@ impl ReadStreamTask {
timeout: Duration,
) -> Result<ServerReadResult, OperationError> {
let request = GetRecords {
header: LogServerRequestHeader::new(self.my_params.loglet_id, self.last_known_tail),
header: LogServerRequestHeader::new(
self.my_params.loglet_id,
self.global_tail_watch.latest_offset(),
),
total_limit_in_bytes: None,
filter: self.filter.clone(),
from_offset: self.read_pointer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@ impl<'a, Attribute> NodeSetChecker<'a, Attribute> {
self.node_attribute.is_empty()
}

/// resets all attributes for all nodes to this value
pub fn reset_with(&mut self, attribute: Attribute)
where
Attribute: Clone,
{
for (_, v) in self.node_attribute.iter_mut() {
*v = attribute.clone();
}
}

/// resets all attributes for all nodes with the default value of Attribute
pub fn reset_with_default(&mut self)
where
Attribute: Default,
{
for (_, v) in self.node_attribute.iter_mut() {
*v = Default::default();
}
}

/// Set the attribute value of a node. Note that a node can only be
/// associated with one attribute value at a time, so if the node has an
/// existing attribute value, the value will be cleared.
Expand All @@ -128,8 +148,12 @@ impl<'a, Attribute> NodeSetChecker<'a, Attribute> {
}
}

pub fn set_attribute_on_each(&mut self, nodes: &[PlainNodeId], f: impl Fn() -> Attribute) {
for node in nodes {
pub fn set_attribute_on_each<'b>(
&mut self,
nodes: impl IntoIterator<Item = &'b PlainNodeId>,
f: impl Fn() -> Attribute,
) {
for node in nodes.into_iter() {
// ignore if the node is not in the original nodeset
if self.storage_states.contains_key(node) {
self.node_attribute.insert(*node, f());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,62 @@ impl SpreadSelector {

Ok(selected)
}

/// Starting from an existing set of nodes that already have copies of a record, this
/// returns additional nodes that we can replicate to, in order to satisfy the replication
/// property. If not possible, it fails with `InsufficientWriteableNodes'
///
/// Note that this can return _more_ nodes than needed, depending on the selector strategy.
///
/// The selector automatically avoids nodes non-writeable nodes
pub fn select_fixups<R: Rng + ?Sized>(
&self,
existing_copies: &NodeSet,
rng: &mut R,
nodes_config: &NodesConfiguration,
exclude_nodes: &NodeSet,
) -> Result<Spread, SpreadSelectorError> {
// Get the list of non-empty nodes from the nodeset given the nodes configuration
let effective_nodeset = self.nodeset.to_effective(nodes_config);
let mut writeable_nodes: Vec<_> = effective_nodeset
.into_iter()
.filter(|node_id| !exclude_nodes.contains(node_id))
.filter(|node_id| {
nodes_config
.get_log_server_storage_state(node_id)
.can_write_to()
})
.collect();
if writeable_nodes.len() < self.replication_property.num_copies().into() {
return Err(SpreadSelectorError::InsufficientWriteableNodes);
}

let selected: Spread = match &self.strategy {
SelectorStrategy::Flood => {
writeable_nodes.shuffle(rng);
Spread::from(writeable_nodes)
}
#[cfg(any(test, feature = "test-util"))]
SelectorStrategy::Fixed(selector) => selector.select()?,
};

// validate that we can have write quorum with this spread
let mut checker =
NodeSetChecker::new(&self.nodeset, nodes_config, &self.replication_property);
checker.set_attribute_on_each(&selected, || true);
if !checker.check_write_quorum(|attr| *attr) {
return Err(SpreadSelectorError::InsufficientWriteableNodes);
}

// Remove existing nodes from selected spread to return the fixups only.
let selected: Vec<_> = selected
.into_iter()
// keep nodes that are not in existing_copies.
.filter(|n| !existing_copies.contains(n))
.collect();

Ok(selected.into())
}
}

static_assertions::assert_impl_all!(SpreadSelector: Send, Sync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::MessageRouterBuilder;
use restate_types::net::log_server::{
GetLogletInfo, GetRecords, Release, Seal, Store, Trim, WaitForTail,
GetDigest, GetLogletInfo, GetRecords, Release, Seal, Store, Trim, WaitForTail,
};
use restate_types::net::replicated_loglet::Append;

Expand All @@ -28,6 +28,7 @@ pub struct LogServersRpc {
pub seal: RpcRouter<Seal>,
pub get_loglet_info: RpcRouter<GetLogletInfo>,
pub get_records: RpcRouter<GetRecords>,
pub get_digest: RpcRouter<GetDigest>,
pub wait_for_tail: RpcRouter<WaitForTail>,
}

Expand All @@ -41,6 +42,7 @@ impl LogServersRpc {
let seal = RpcRouter::new(router_builder);
let get_loglet_info = RpcRouter::new(router_builder);
let get_records = RpcRouter::new(router_builder);
let get_digest = RpcRouter::new(router_builder);
let wait_for_tail = RpcRouter::new(router_builder);

Self {
Expand All @@ -50,6 +52,7 @@ impl LogServersRpc {
seal,
get_loglet_info,
get_records,
get_digest,
wait_for_tail,
}
}
Expand Down
Loading

0 comments on commit e11e1f9

Please sign in to comment.