Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bifrost] Periodic tail checker #2064

Merged
merged 3 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl Appender {
Err(AppendError::Sealed) => {
info!(
attempt = attempt,
segment_index = %loglet.segment_index(),
"Append batch will be retried (loglet being sealed), waiting for tail to be determined"
);
let new_loglet = Self::wait_next_unsealed_loglet(
Expand All @@ -124,7 +125,7 @@ impl Appender {
}
}

#[instrument(level = "debug" err, skip(retry_iter, bifrost_inner))]
#[instrument(level = "error" err, skip(retry_iter, bifrost_inner))]
async fn wait_next_unsealed_loglet(
log_id: LogId,
bifrost_inner: &Arc<BifrostInner>,
Expand All @@ -146,7 +147,10 @@ impl Appender {
);
return Ok(loglet);
} else {
debug!("Still waiting for sealing to complete");
debug!(
"Still waiting for sealing to complete. Elapsed={:?}",
start.elapsed(),
);
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
to,
known_global_tail,
cache,
false,
)
.await?;
let read_stream = ReplicatedLogletReadStream::new(from, rx_stream, reader_task);
Expand Down Expand Up @@ -209,6 +210,7 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {
self.networking.clone(),
self.logservers_rpc.clone(),
self.known_global_tail.clone(),
self.record_cache.clone(),
);
let tail_status = task.run().await;
match tail_status {
Expand Down
19 changes: 17 additions & 2 deletions crates/bifrost/src/providers/replicated_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use dashmap::DashMap;
Expand All @@ -28,6 +29,7 @@ use super::network::RequestPump;
use super::rpc_routers::{LogServersRpc, SequencersRpc};
use crate::loglet::{Loglet, LogletProvider, LogletProviderFactory, OperationError};
use crate::providers::replicated_loglet::error::ReplicatedLogletError;
use crate::providers::replicated_loglet::tasks::PeriodicTailChecker;
use crate::Error;

pub struct Factory<T> {
Expand Down Expand Up @@ -79,6 +81,7 @@ impl<T: TransportConnect> LogletProviderFactory for Factory<T> {
async fn create(self: Box<Self>) -> Result<Arc<dyn LogletProvider>, OperationError> {
metric_definitions::describe_metrics();
let provider = Arc::new(ReplicatedLogletProvider::new(
self.task_center.clone(),
self.metadata_store_client,
self.networking,
self.logserver_rpc_routers,
Expand All @@ -103,6 +106,7 @@ impl<T: TransportConnect> LogletProviderFactory for Factory<T> {
}

pub(super) struct ReplicatedLogletProvider<T> {
task_center: TaskCenter,
active_loglets: DashMap<(LogId, SegmentIndex), Arc<ReplicatedLoglet<T>>>,
_metadata_store_client: MetadataStoreClient,
networking: Networking<T>,
Expand All @@ -113,15 +117,15 @@ pub(super) struct ReplicatedLogletProvider<T> {

impl<T: TransportConnect> ReplicatedLogletProvider<T> {
fn new(
task_center: TaskCenter,
metadata_store_client: MetadataStoreClient,
networking: Networking<T>,
logserver_rpc_routers: LogServersRpc,
sequencer_rpc_routers: SequencersRpc,
record_cache: RecordCache,
) -> Self {
// todo(asoli): create all global state here that'll be shared across loglet instances
// - NodeState map.
Self {
task_center,
active_loglets: Default::default(),
_metadata_store_client: metadata_store_client,
networking,
Expand Down Expand Up @@ -166,6 +170,7 @@ impl<T: TransportConnect> ReplicatedLogletProvider<T> {
"Creating a replicated loglet client"
);

let loglet_id = params.loglet_id;
// Create loglet
let loglet = ReplicatedLoglet::new(
log_id,
Expand All @@ -177,6 +182,16 @@ impl<T: TransportConnect> ReplicatedLogletProvider<T> {
self.record_cache.clone(),
);
let key_value = entry.insert(Arc::new(loglet));
let loglet = Arc::downgrade(key_value.value());
let _ = self.task_center.spawn(
TaskKind::Watchdog,
"periodic-tail-checker",
None,
async move {
// todo: configuration
PeriodicTailChecker::run(loglet_id, loglet, Duration::from_secs(2)).await
},
);
Arc::clone(key_value.value())
}
dashmap::Entry::Occupied(entry) => entry.get().clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ pub struct ReadStreamTask {
read_pointer: LogletOffset,
/// Last offset to read before terminating the stream. None means "tailing" reader.
/// *Inclusive*
/// This must be set if `move_beyond_global_tail` is true.
read_to: Option<LogletOffset>,
tx: mpsc::Sender<Result<LogEntry<LogletOffset>, OperationError>>,
record_cache: RecordCache,
stats: Stats,
/// If set to true, we won't wait for the global tail to be updated before requesting the next
/// batch. This is used in tail repair tasks.
move_beyond_global_tail: bool,
}

impl ReadStreamTask {
Expand All @@ -84,13 +88,17 @@ impl ReadStreamTask {
read_to: Option<LogletOffset>,
known_global_tail: TailOffsetWatch,
record_cache: RecordCache,
move_beyond_global_tail: bool,
) -> Result<
(
mpsc::Receiver<Result<LogEntry<LogletOffset>, OperationError>>,
TaskHandle<Result<(), OperationError>>,
),
OperationError,
> {
if move_beyond_global_tail && read_to.is_none() {
panic!("read_to must be set if move_beyond_global_tail=true");
}
// todo(asoli): configuration
let (tx, rx) = mpsc::channel(100);
// Reading from INVALID resets to OLDEST.
Expand All @@ -107,6 +115,7 @@ impl ReadStreamTask {
tx,
record_cache,
stats: Stats::default(),
move_beyond_global_tail,
};
let handle = task_center().spawn_unmanaged(
TaskKind::ReplicatedLogletReadStream,
Expand Down Expand Up @@ -140,12 +149,20 @@ impl ReadStreamTask {
debug_assert!(readahead_trigger >= 1 && readahead_trigger <= self.tx.max_capacity());

let mut tail_subscriber = self.global_tail_watch.subscribe();
// resolves immediately as it's pre-marked as changed.
tail_subscriber
.changed()
.await
.map_err(|_| OperationError::Shutdown(ShutdownError))?;
self.last_known_tail = tail_subscriber.borrow_and_update().offset();
if self.move_beyond_global_tail {
self.last_known_tail = self
.read_to
.expect("read_to must be set with move_beyond_global_tail=true")
.next();
} else {
// resolves immediately as it's pre-marked as changed.
tail_subscriber
.changed()
.await
.map_err(|_| OperationError::Shutdown(ShutdownError))?;
self.last_known_tail = tail_subscriber.borrow_and_update().offset();
}

// todo(asoli): [important] Need to fire up a FindTail task in the background? It depends on whether we
// are on the sequencer node or not. We might ask Bifrost's watchdog instead to dedupe
// FindTails and time-throttle them.
Expand Down Expand Up @@ -205,7 +222,7 @@ impl ReadStreamTask {

// Are we reading after last_known_tail offset?
// We are at tail. We need to wait until new records have been released.
if !self.can_advance() {
if !self.can_advance() && !self.move_beyond_global_tail {
// HODL.
// todo(asoli): Measure tail-change wait time in histogram
// todo(asoli): (who's going to change this? - background FindTail?)
Expand Down Expand Up @@ -440,7 +457,10 @@ impl ReadStreamTask {
timeout: Duration,
) -> Result<ServerReadResult, OperationError> {
let request = GetRecords {
header: LogServerRequestHeader::new(self.my_params.loglet_id, self.last_known_tail),
header: LogServerRequestHeader::new(
self.my_params.loglet_id,
self.global_tail_watch.latest_offset(),
),
total_limit_in_bytes: None,
filter: self.filter.clone(),
from_offset: self.read_pointer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@ impl<'a, Attribute> NodeSetChecker<'a, Attribute> {
self.node_attribute.is_empty()
}

/// resets all attributes for all nodes to this value
pub fn reset_with(&mut self, attribute: Attribute)
where
Attribute: Clone,
{
for (_, v) in self.node_attribute.iter_mut() {
*v = attribute.clone();
}
}

/// resets all attributes for all nodes with the default value of Attribute
pub fn reset_with_default(&mut self)
where
Attribute: Default,
{
for (_, v) in self.node_attribute.iter_mut() {
*v = Default::default();
}
}

/// Set the attribute value of a node. Note that a node can only be
/// associated with one attribute value at a time, so if the node has an
/// existing attribute value, the value will be cleared.
Expand All @@ -128,8 +148,12 @@ impl<'a, Attribute> NodeSetChecker<'a, Attribute> {
}
}

pub fn set_attribute_on_each(&mut self, nodes: &[PlainNodeId], f: impl Fn() -> Attribute) {
for node in nodes {
pub fn set_attribute_on_each<'b>(
&mut self,
nodes: impl IntoIterator<Item = &'b PlainNodeId>,
f: impl Fn() -> Attribute,
) {
for node in nodes.into_iter() {
// ignore if the node is not in the original nodeset
if self.storage_states.contains_key(node) {
self.node_attribute.insert(*node, f());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,62 @@ impl SpreadSelector {

Ok(selected)
}

/// Starting from an existing set of nodes that already have copies of a record, this
/// returns additional nodes that we can replicate to, in order to satisfy the replication
/// property. If not possible, it fails with `InsufficientWriteableNodes'
///
/// Note that this can return _more_ nodes than needed, depending on the selector strategy.
///
/// The selector automatically avoids nodes non-writeable nodes
pub fn select_fixups<R: Rng + ?Sized>(
&self,
existing_copies: &NodeSet,
rng: &mut R,
nodes_config: &NodesConfiguration,
exclude_nodes: &NodeSet,
) -> Result<Spread, SpreadSelectorError> {
// Get the list of non-empty nodes from the nodeset given the nodes configuration
let effective_nodeset = self.nodeset.to_effective(nodes_config);
let mut writeable_nodes: Vec<_> = effective_nodeset
.into_iter()
.filter(|node_id| !exclude_nodes.contains(node_id))
.filter(|node_id| {
nodes_config
.get_log_server_storage_state(node_id)
.can_write_to()
})
.collect();
if writeable_nodes.len() < self.replication_property.num_copies().into() {
return Err(SpreadSelectorError::InsufficientWriteableNodes);
}

let selected: Spread = match &self.strategy {
SelectorStrategy::Flood => {
writeable_nodes.shuffle(rng);
Spread::from(writeable_nodes)
}
#[cfg(any(test, feature = "test-util"))]
SelectorStrategy::Fixed(selector) => selector.select()?,
};

// validate that we can have write quorum with this spread
let mut checker =
NodeSetChecker::new(&self.nodeset, nodes_config, &self.replication_property);
checker.set_attribute_on_each(&selected, || true);
if !checker.check_write_quorum(|attr| *attr) {
return Err(SpreadSelectorError::InsufficientWriteableNodes);
}

// Remove existing nodes from selected spread to return the fixups only.
let selected: Vec<_> = selected
.into_iter()
// keep nodes that are not in existing_copies.
.filter(|n| !existing_copies.contains(n))
.collect();

Ok(selected.into())
}
}

static_assertions::assert_impl_all!(SpreadSelector: Send, Sync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::MessageRouterBuilder;
use restate_types::net::log_server::{
GetLogletInfo, GetRecords, Release, Seal, Store, Trim, WaitForTail,
GetDigest, GetLogletInfo, GetRecords, Release, Seal, Store, Trim, WaitForTail,
};
use restate_types::net::replicated_loglet::Append;

Expand All @@ -28,6 +28,7 @@ pub struct LogServersRpc {
pub seal: RpcRouter<Seal>,
pub get_loglet_info: RpcRouter<GetLogletInfo>,
pub get_records: RpcRouter<GetRecords>,
pub get_digest: RpcRouter<GetDigest>,
pub wait_for_tail: RpcRouter<WaitForTail>,
}

Expand All @@ -41,6 +42,7 @@ impl LogServersRpc {
let seal = RpcRouter::new(router_builder);
let get_loglet_info = RpcRouter::new(router_builder);
let get_records = RpcRouter::new(router_builder);
let get_digest = RpcRouter::new(router_builder);
let wait_for_tail = RpcRouter::new(router_builder);

Self {
Expand All @@ -50,6 +52,7 @@ impl LogServersRpc {
seal,
get_loglet_info,
get_records,
get_digest,
wait_for_tail,
}
}
Expand Down
Loading
Loading