Skip to content

Commit

Permalink
[Bifrost] Periodic tail checker
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Oct 11, 2024
1 parent a58c18f commit 5090df5
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 2 deletions.
19 changes: 17 additions & 2 deletions crates/bifrost/src/providers/replicated_loglet/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use dashmap::DashMap;
Expand All @@ -28,6 +29,7 @@ use super::network::RequestPump;
use super::rpc_routers::{LogServersRpc, SequencersRpc};
use crate::loglet::{Loglet, LogletProvider, LogletProviderFactory, OperationError};
use crate::providers::replicated_loglet::error::ReplicatedLogletError;
use crate::providers::replicated_loglet::tasks::PeriodicTailChecker;
use crate::Error;

pub struct Factory<T> {
Expand Down Expand Up @@ -79,6 +81,7 @@ impl<T: TransportConnect> LogletProviderFactory for Factory<T> {
async fn create(self: Box<Self>) -> Result<Arc<dyn LogletProvider>, OperationError> {
metric_definitions::describe_metrics();
let provider = Arc::new(ReplicatedLogletProvider::new(
self.task_center.clone(),
self.metadata_store_client,
self.networking,
self.logserver_rpc_routers,
Expand All @@ -103,6 +106,7 @@ impl<T: TransportConnect> LogletProviderFactory for Factory<T> {
}

pub(super) struct ReplicatedLogletProvider<T> {
task_center: TaskCenter,
active_loglets: DashMap<(LogId, SegmentIndex), Arc<ReplicatedLoglet<T>>>,
_metadata_store_client: MetadataStoreClient,
networking: Networking<T>,
Expand All @@ -113,15 +117,15 @@ pub(super) struct ReplicatedLogletProvider<T> {

impl<T: TransportConnect> ReplicatedLogletProvider<T> {
fn new(
task_center: TaskCenter,
metadata_store_client: MetadataStoreClient,
networking: Networking<T>,
logserver_rpc_routers: LogServersRpc,
sequencer_rpc_routers: SequencersRpc,
record_cache: RecordCache,
) -> Self {
// todo(asoli): create all global state here that'll be shared across loglet instances
// - NodeState map.
Self {
task_center,
active_loglets: Default::default(),
_metadata_store_client: metadata_store_client,
networking,
Expand Down Expand Up @@ -166,6 +170,7 @@ impl<T: TransportConnect> ReplicatedLogletProvider<T> {
"Creating a replicated loglet client"
);

let loglet_id = params.loglet_id;
// Create loglet
let loglet = ReplicatedLoglet::new(
log_id,
Expand All @@ -177,6 +182,16 @@ impl<T: TransportConnect> ReplicatedLogletProvider<T> {
self.record_cache.clone(),
);
let key_value = entry.insert(Arc::new(loglet));
let loglet = Arc::downgrade(key_value.value());
let _ = self.task_center.spawn(
TaskKind::Watchdog,
"periodic-tail-checker",
None,
async move {
// todo: configuration
PeriodicTailChecker::run(loglet_id, loglet, Duration::from_secs(2)).await
},
);
Arc::clone(key_value.value())
}
dashmap::Entry::Occupied(entry) => entry.get().clone(),
Expand Down
2 changes: 2 additions & 0 deletions crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

mod digests;
mod find_tail;
mod periodic_tail_checker;
mod repair_tail;
mod seal;

pub use find_tail::*;
pub use periodic_tail_checker::*;
pub use repair_tail::*;
pub use seal::*;
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Weak;
use std::time::Duration;

use tokio::time::Instant;
use tracing::{debug, trace};

use restate_core::network::TransportConnect;
use restate_types::replicated_loglet::ReplicatedLogletId;

use crate::loglet::{Loglet, OperationError};
use crate::providers::replicated_loglet::loglet::ReplicatedLoglet;

pub struct PeriodicTailChecker {}

impl PeriodicTailChecker {
pub async fn run<T: TransportConnect>(
loglet_id: ReplicatedLogletId,
loglet: Weak<ReplicatedLoglet<T>>,
duration: Duration,
) -> anyhow::Result<()> {
debug!(
%loglet_id,
"Starting a background periodic tail checker for this loglet",
);
// Optimization. Don't run the check if the tail/seal has been updated recently.
// Unfortunately this requires a litte bit more setup in the TailOffsetWatch so we don't do
// it.
loop {
let Some(loglet) = loglet.upgrade() else {
trace!(
%loglet_id,
"Loglet has been dropped, stopping periodic tail checker",
);
return Ok(());
};
trace!(
%loglet_id,
is_sequencer = ?loglet.is_sequencer_local(),
"Checking tail status for loglet",
);
if loglet.known_global_tail().is_sealed() {
// stop the task. we are sealed already.
trace!(
%loglet_id,
is_sequencer = ?loglet.is_sequencer_local(),
"Loglet has been sealed, stopping the periodic tail checker",
);
return Ok(());
}
tokio::time::sleep(duration).await;
let start = Instant::now();
match loglet.find_tail().await {
Ok(tail) => {
// todo: maybe remove this.
trace!(
%loglet_id,
known_global_tail = %tail.offset(),
is_sequencer = ?loglet.is_sequencer_local(),
is_sealed = ?tail.is_sealed(),
"Successfully determined the tail status of the loglet, took={:?}",
start.elapsed(),
);
}
Err(OperationError::Shutdown(_)) => {
trace!(
%loglet_id,
is_sequencer = ?loglet.is_sequencer_local(),
"System is shutting down, stopping period tail checker",
);
return Ok(());
}
Err(OperationError::Other(e)) => {
trace!(
?e,
is_sequencer = ?loglet.is_sequencer_local(),
%loglet_id,
"Couldn't determine the tail status of the loglet. Will retry in the next period",
);
}
}
}
}
}

0 comments on commit 5090df5

Please sign in to comment.