Skip to content

Commit

Permalink
[WIP] RepairTail
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Oct 9, 2024
1 parent 56e9aca commit 83fb10e
Show file tree
Hide file tree
Showing 11 changed files with 784 additions and 22 deletions.
2 changes: 2 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
to,
known_global_tail,
cache,
false,
)
.await?;
let read_stream = ReplicatedLogletReadStream::new(from, rx_stream, reader_task);
Expand Down Expand Up @@ -197,6 +198,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
self.networking.clone(),
self.logservers_rpc.clone(),
self.known_global_tail.clone(),
self.record_cache.clone(),
);
let tail_status = task.run().await;
match tail_status {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ pub struct ReadStreamTask {
tx: mpsc::Sender<Result<LogEntry<LogletOffset>, OperationError>>,
record_cache: RecordCache,
stats: Stats,
/// If set to true, we won't wait for the global tail to be updated before requesting the next
/// batch. This is used in tail repair tasks.
move_beyond_global_tail: bool,
}

impl ReadStreamTask {
Expand All @@ -84,6 +87,7 @@ impl ReadStreamTask {
read_to: Option<LogletOffset>,
known_global_tail: TailOffsetWatch,
record_cache: RecordCache,
move_beyond_global_tail: bool,
) -> Result<
(
mpsc::Receiver<Result<LogEntry<LogletOffset>, OperationError>>,
Expand All @@ -107,6 +111,7 @@ impl ReadStreamTask {
tx,
record_cache,
stats: Stats::default(),
move_beyond_global_tail,
};
let handle = task_center().spawn_unmanaged(
TaskKind::ReplicatedLogletReadStream,
Expand Down Expand Up @@ -140,12 +145,20 @@ impl ReadStreamTask {
debug_assert!(readahead_trigger >= 1 && readahead_trigger <= self.tx.max_capacity());

let mut tail_subscriber = self.global_tail_watch.subscribe();
// resolves immediately as it's pre-marked as changed.
tail_subscriber
.changed()
.await
.map_err(|_| OperationError::Shutdown(ShutdownError))?;
self.last_known_tail = tail_subscriber.borrow_and_update().offset();
if self.move_beyond_global_tail {
self.last_known_tail = self
.read_to
.expect("read_to must be set with move_beyond_global_tail=true")
.next();
} else {
// resolves immediately as it's pre-marked as changed.
tail_subscriber
.changed()
.await
.map_err(|_| OperationError::Shutdown(ShutdownError))?;
self.last_known_tail = tail_subscriber.borrow_and_update().offset();
}

// todo(asoli): [important] Need to fire up a FindTail task in the background? It depends on whether we
// are on the sequencer node or not. We might ask Bifrost's watchdog instead to dedupe
// FindTails and time-throttle them.
Expand Down Expand Up @@ -205,7 +218,7 @@ impl ReadStreamTask {

// Are we reading after last_known_tail offset?
// We are at tail. We need to wait until new records have been released.
if !self.can_advance() {
if !self.can_advance() && !self.move_beyond_global_tail {
// HODL.
// todo(asoli): Measure tail-change wait time in histogram
// todo(asoli): (who's going to change this? - background FindTail?)
Expand Down Expand Up @@ -440,7 +453,10 @@ impl ReadStreamTask {
timeout: Duration,
) -> Result<ServerReadResult, OperationError> {
let request = GetRecords {
header: LogServerRequestHeader::new(self.my_params.loglet_id, self.last_known_tail),
header: LogServerRequestHeader::new(
self.my_params.loglet_id,
self.global_tail_watch.latest_offset(),
),
total_limit_in_bytes: None,
filter: self.filter.clone(),
from_offset: self.read_pointer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,12 @@ impl<'a, Attribute> NodeSetChecker<'a, Attribute> {
}
}

pub fn set_attribute_on_each(&mut self, nodes: &[PlainNodeId], f: impl Fn() -> Attribute) {
for node in nodes {
pub fn set_attribute_on_each<'b>(
&mut self,
nodes: impl IntoIterator<Item = &'b PlainNodeId>,
f: impl Fn() -> Attribute,
) {
for node in nodes.into_iter() {
// ignore if the node is not in the original nodeset
if self.storage_states.contains_key(node) {
self.node_attribute.insert(*node, f());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,59 @@ impl SpreadSelector {

Ok(selected)
}

/// Amend an existing spread with sufficient nodes to _at least_ satisfy the replication
/// property if possible. If not possible, it fails with `InsufficientWriteableNodes'
///
/// The selector automatically avoids nodes non-writeable nodes
pub fn amend<R: Rng + ?Sized>(
&self,
existing_copies: &NodeSet,
rng: &mut R,
nodes_config: &NodesConfiguration,
exclude_nodes: &NodeSet,
) -> Result<Spread, SpreadSelectorError> {
// Get the list of non-empty nodes from the nodeset given the nodes configuration
let effective_nodeset = self.nodeset.to_effective(nodes_config);
let mut writeable_nodes: Vec<_> = effective_nodeset
.into_iter()
.filter(|node_id| !exclude_nodes.contains(node_id))
.filter(|node_id| {
nodes_config
.get_log_server_storage_state(node_id)
.can_write_to()
})
.collect();
if writeable_nodes.len() < self.replication_property.num_copies().into() {
return Err(SpreadSelectorError::InsufficientWriteableNodes);
}

let selected: Spread = match &self.strategy {
SelectorStrategy::Flood => {
writeable_nodes.shuffle(rng);
Spread::from(writeable_nodes)
}
#[cfg(any(test, feature = "test-util"))]
SelectorStrategy::Fixed(selector) => selector.select()?,
};

// validate that we can have write quorum with this spread
let mut checker =
NodeSetChecker::new(&self.nodeset, nodes_config, &self.replication_property);
checker.set_attribute_on_each(&selected, || true);
if !checker.check_write_quorum(|attr| *attr) {
return Err(SpreadSelectorError::InsufficientWriteableNodes);
}

// remove existing nodes from selected spread
let selected: Vec<_> = selected
.into_iter()
// keep nodes that are not in existing_copies.
.filter(|n| !existing_copies.contains(n))
.collect();

Ok(selected.into())
}
}

static_assertions::assert_impl_all!(SpreadSelector: Send, Sync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::MessageRouterBuilder;
use restate_types::net::log_server::{
GetLogletInfo, GetRecords, Release, Seal, Store, Trim, WaitForTail,
GetDigest, GetLogletInfo, GetRecords, Release, Seal, Store, Trim, WaitForTail,
};
use restate_types::net::replicated_loglet::Append;

Expand All @@ -28,6 +28,7 @@ pub struct LogServersRpc {
pub seal: RpcRouter<Seal>,
pub get_loglet_info: RpcRouter<GetLogletInfo>,
pub get_records: RpcRouter<GetRecords>,
pub get_digest: RpcRouter<GetDigest>,
pub wait_for_tail: RpcRouter<WaitForTail>,
}

Expand All @@ -41,6 +42,7 @@ impl LogServersRpc {
let seal = RpcRouter::new(router_builder);
let get_loglet_info = RpcRouter::new(router_builder);
let get_records = RpcRouter::new(router_builder);
let get_digest = RpcRouter::new(router_builder);
let wait_for_tail = RpcRouter::new(router_builder);

Self {
Expand All @@ -50,6 +52,7 @@ impl LogServersRpc {
seal,
get_loglet_info,
get_records,
get_digest,
wait_for_tail,
}
}
Expand Down
Loading

0 comments on commit 83fb10e

Please sign in to comment.