Skip to content

Commit

Permalink
[Bifrost] CheckSeal task for replicated loglet
Browse files Browse the repository at this point in the history
This hunts for signals to tell us if there is an ongoing seal or not. It's not used as a source of truth for whether a loglet is sealed or not, but it's important to detect an externally started seal operation that the sequencer was not aware of.
  • Loading branch information
AhmedSoliman committed Oct 15, 2024
1 parent 7dd8fd0 commit d87eed8
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 123 deletions.
17 changes: 10 additions & 7 deletions crates/bifrost/src/loglet/loglet_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,14 +492,17 @@ pub async fn append_after_seal_concurrent(loglet: Arc<dyn Loglet>) -> googletest
let first_observed_seal = tokio::task::spawn({
let loglet = loglet.clone();
async move {
loop {
let res = loglet.find_tail().await.expect("find_tail succeeds");
if res.is_sealed() {
return res.offset();
tc.run_in_scope("find-tail", None, async move {
loop {
let res = loglet.find_tail().await.expect("find_tail succeeds");
if res.is_sealed() {
return res.offset();
}
// give a chance to other tasks to work
tokio::task::yield_now().await;
}
// give a chance to other tasks to work
tokio::task::yield_now().await;
}
})
.await
}
});

Expand Down
41 changes: 25 additions & 16 deletions crates/bifrost/src/providers/replicated_loglet/loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// todo(asoli): remove once this is fleshed out
#![allow(dead_code)]

use std::sync::Arc;

use async_trait::async_trait;
Expand All @@ -37,16 +34,10 @@ use super::metric_definitions::{BIFROST_RECORDS_ENQUEUED_BYTES, BIFROST_RECORDS_
use super::read_path::{ReadStreamTask, ReplicatedLogletReadStream};
use super::remote_sequencer::RemoteSequencer;
use super::rpc_routers::{LogServersRpc, SequencersRpc};
use super::tasks::FindTailResult;
use super::tasks::{CheckSealOutcome, CheckSealTask, FindTailResult};

#[derive(derive_more::Debug)]
pub(super) struct ReplicatedLoglet<T> {
/// This is used only to populate header of outgoing request to a remotely owned sequencer.
/// Otherwise, it's unused.
log_id: LogId,
/// This is used only to populate header of outgoing request to a remotely owned sequencer.
/// Otherwise, it's unused.
segment_index: SegmentIndex,
my_params: ReplicatedLogletParams,
#[debug(skip)]
networking: Networking<T>,
Expand All @@ -61,8 +52,6 @@ pub(super) struct ReplicatedLoglet<T> {
/// should run a proper tail search.
known_global_tail: TailOffsetWatch,
sequencer: SequencerAccess<T>,
#[debug(skip)]
log_server_manager: RemoteLogServerManager,
}

impl<T: TransportConnect> ReplicatedLoglet<T> {
Expand Down Expand Up @@ -112,15 +101,12 @@ impl<T: TransportConnect> ReplicatedLoglet<T> {
}
};
Self {
log_id,
segment_index,
my_params,
networking,
logservers_rpc,
record_cache,
known_global_tail,
sequencer,
log_server_manager,
}
}

Expand Down Expand Up @@ -202,7 +188,30 @@ impl<T: TransportConnect> Loglet for ReplicatedLoglet<T> {

async fn find_tail(&self) -> Result<TailState<LogletOffset>, OperationError> {
match self.sequencer {
SequencerAccess::Local { .. } => Ok(*self.known_global_tail.get()),
SequencerAccess::Local { .. } => {
let latest_tail = *self.known_global_tail.get();
if latest_tail.is_sealed() {
return Ok(latest_tail);
}
// We might have been sealed by external node and the sequencer is unaware. In this
// case, we run the a check seal task to determine if we suspect that sealing is
// happening.
let result = CheckSealTask::run(
&self.my_params,
&self.logservers_rpc.get_loglet_info,
&self.known_global_tail,
&self.networking,
)
.await?;
if result == CheckSealOutcome::Sealing {
// We are likely to be sealing...
// let's fire a seal to ensure this seal is complete
if self.seal().await.is_ok() {
self.known_global_tail.notify_seal();
}
}
return Ok(*self.known_global_tail.get());
}
SequencerAccess::Remote { .. } => {
let task = FindTailTask::new(
task_center(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
use std::time::Duration;

use metrics::{counter, Counter};
use rand::seq::SliceRandom;
use restate_types::config::Configuration;
use tokio::sync::mpsc;
use tracing::{info, trace};

use restate_core::network::{NetworkError, Networking, TransportConnect};
use restate_core::{task_center, ShutdownError, TaskHandle, TaskKind};
use restate_types::config::Configuration;
use restate_types::logs::{KeyFilter, LogletOffset, MatchKeyQuery, RecordCache, SequenceNumber};
use restate_types::net::log_server::{GetRecords, LogServerRequestHeader, MaybeRecord};
use restate_types::replicated_loglet::{EffectiveNodeSet, NodeSet, ReplicatedLogletParams};
Expand Down Expand Up @@ -253,9 +252,8 @@ impl ReadStreamTask {
// Read from logservers
let effective_nodeset =
EffectiveNodeSet::new(&self.my_params.nodeset, nodes_config.live_load());
// order the nodeset such that our node is the first one to attempt
let mut mutable_effective_nodeset =
shuffle_nodeset_for_reads(&effective_nodeset, my_node_id.as_plain());
// Order the nodeset such that our node is the first one to attempt
let mut mutable_effective_nodeset = effective_nodeset.shuffle_for_reads(my_node_id);

if mutable_effective_nodeset.is_empty() {
// if nodeset is all disabled, no readable nodes. impossible situation to resolve,
Expand Down Expand Up @@ -518,19 +516,3 @@ enum ServerReadResult {
/// Unreachable or failing node, skip and try another
Skip,
}

fn shuffle_nodeset_for_reads(nodeset: &NodeSet, my_node_id: PlainNodeId) -> Vec<PlainNodeId> {
let mut new_nodeset: Vec<_> = nodeset.iter().cloned().collect();
// Shuffle nodes
new_nodeset.shuffle(&mut rand::thread_rng());

let has_my_node_idx = nodeset.iter().position(|&x| x == my_node_id);

// put my node at the end if it's there
if let Some(idx) = has_my_node_idx {
let len = new_nodeset.len();
new_nodeset.swap(idx, len - 1);
}

new_nodeset
}
120 changes: 120 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use tracing::{info, trace};

use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::{Networking, TransportConnect};
use restate_core::{cancellation_watcher, ShutdownError};
use restate_types::net::log_server::GetLogletInfo;
use restate_types::replicated_loglet::{EffectiveNodeSet, ReplicatedLogletParams};

use super::{FindTailOnNode, NodeTailStatus};
use crate::loglet::util::TailOffsetWatch;
use crate::providers::replicated_loglet::replication::NodeSetChecker;

/// Attempts to detect if the loglet has been sealed or if there is a seal in progress by
/// consulting nodes until it reaches f-majority, and it stops at the first sealed response
/// from any log-server since this is a sufficient signal that a seal is on-going.
///
/// the goal of this operation to get a signal on sequencer node that a seal has happened (or
/// ongoing) if we have not been receiving appends for some time.
///
/// This allows PeriodicFindTail to detect seal that was triggered externally to unblock read
/// streams running locally that rely on the sequencer's view of known_global_tail.
///
///
/// Note that this task can return Open if it cannot reach out to any node, so we should not use it
/// for operations that rely on absolute correctness of the tail. For those, use FindTailTask
/// instead.
pub struct CheckSealTask {}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum CheckSealOutcome {
Sealing,
ProbablyOpen,
}

impl CheckSealTask {
pub async fn run<T: TransportConnect>(
my_params: &ReplicatedLogletParams,
get_loglet_info_rpc: &RpcRouter<GetLogletInfo>,
known_global_tail: &TailOffsetWatch,
networking: &Networking<T>,
) -> Result<CheckSealOutcome, ShutdownError> {
// If all nodes in the nodeset is in "provisioning", we can confidently short-circuit
// the result to LogletOffset::Oldest and the loglet is definitely unsealed.
if my_params
.nodeset
.all_provisioning(&networking.metadata().nodes_config_ref())
{
return Ok(CheckSealOutcome::ProbablyOpen);
}
// todo: If effective nodeset is empty, should we consider that the loglet is implicitly
// sealed?

let effective_nodeset = EffectiveNodeSet::new(
&my_params.nodeset,
&networking.metadata().nodes_config_ref(),
);

let mut nodeset_checker = NodeSetChecker::<'_, NodeTailStatus>::new(
&effective_nodeset,
&networking.metadata().nodes_config_ref(),
&my_params.replication,
);

let mut nodes = effective_nodeset.shuffle_for_reads(networking.metadata().my_node_id());

let mut cancel = std::pin::pin!(cancellation_watcher());
trace!(
loglet_id = %my_params.loglet_id,
effective_nodeset = %effective_nodeset,
"Checking seal status for loglet",
);
loop {
if nodeset_checker
.check_fmajority(NodeTailStatus::is_known_unsealed)
.passed()
{
// once we reach f-majority of unsealed, we stop.
return Ok(CheckSealOutcome::ProbablyOpen);
}

let Some(next_node) = nodes.pop() else {
info!(
loglet_id = %my_params.loglet_id,
effective_nodeset = %effective_nodeset,
"Insufficient nodes responded to GetLogletInfo requests, we cannot determine seal status, we'll assume it's unsealed for now",
);
return Ok(CheckSealOutcome::ProbablyOpen);
};

let task = FindTailOnNode {
node_id: next_node,
loglet_id: my_params.loglet_id,
get_loglet_info_rpc,
known_global_tail,
};
let tail_status = tokio::select! {
_ = &mut cancel => {
return Err(ShutdownError);
}
(_, tail_status) = task.run(networking) => { tail_status },
};
if tail_status.is_known_sealed() {
// we only need to see a single node sealed to declare that we are probably sealing (or
// sealed)
return Ok(CheckSealOutcome::ProbablyOpen);
}
nodeset_checker.merge_attribute(next_node, tail_status);
}
}
}
Loading

0 comments on commit d87eed8

Please sign in to comment.