From 0884ee4366da3f440ee66e170b4f598491bca8ab Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Fri, 11 Oct 2024 12:48:06 +0100 Subject: [PATCH] [Bifrost] Periodic tail checker --- .../providers/replicated_loglet/provider.rs | 19 +++- .../providers/replicated_loglet/tasks/mod.rs | 2 + .../tasks/periodic_tail_checker.rs | 93 +++++++++++++++++++ 3 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs diff --git a/crates/bifrost/src/providers/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs index cfce3727a..89c9e870c 100644 --- a/crates/bifrost/src/providers/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use dashmap::DashMap; @@ -28,6 +29,7 @@ use super::network::RequestPump; use super::rpc_routers::{LogServersRpc, SequencersRpc}; use crate::loglet::{Loglet, LogletProvider, LogletProviderFactory, OperationError}; use crate::providers::replicated_loglet::error::ReplicatedLogletError; +use crate::providers::replicated_loglet::tasks::PeriodicTailChecker; use crate::Error; pub struct Factory { @@ -79,6 +81,7 @@ impl LogletProviderFactory for Factory { async fn create(self: Box) -> Result, OperationError> { metric_definitions::describe_metrics(); let provider = Arc::new(ReplicatedLogletProvider::new( + self.task_center.clone(), self.metadata_store_client, self.networking, self.logserver_rpc_routers, @@ -103,6 +106,7 @@ impl LogletProviderFactory for Factory { } pub(super) struct ReplicatedLogletProvider { + task_center: TaskCenter, active_loglets: DashMap<(LogId, SegmentIndex), Arc>>, _metadata_store_client: MetadataStoreClient, networking: Networking, @@ -113,15 +117,15 @@ pub(super) struct ReplicatedLogletProvider { impl ReplicatedLogletProvider { fn new( + task_center: TaskCenter, metadata_store_client: MetadataStoreClient, networking: Networking, logserver_rpc_routers: LogServersRpc, sequencer_rpc_routers: SequencersRpc, record_cache: RecordCache, ) -> Self { - // todo(asoli): create all global state here that'll be shared across loglet instances - // - NodeState map. Self { + task_center, active_loglets: Default::default(), _metadata_store_client: metadata_store_client, networking, @@ -166,6 +170,7 @@ impl ReplicatedLogletProvider { "Creating a replicated loglet client" ); + let loglet_id = params.loglet_id; // Create loglet let loglet = ReplicatedLoglet::new( log_id, @@ -177,6 +182,16 @@ impl ReplicatedLogletProvider { self.record_cache.clone(), ); let key_value = entry.insert(Arc::new(loglet)); + let loglet = Arc::downgrade(key_value.value()); + let _ = self.task_center.spawn( + TaskKind::Watchdog, + "periodic-tail-checker", + None, + async move { + // todo: configuration + PeriodicTailChecker::run(loglet_id, loglet, Duration::from_secs(2)).await + }, + ); Arc::clone(key_value.value()) } dashmap::Entry::Occupied(entry) => entry.get().clone(), diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs index ae7f142d0..09ac40ee6 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/mod.rs @@ -10,9 +10,11 @@ mod digests; mod find_tail; +mod periodic_tail_checker; mod repair_tail; mod seal; pub use find_tail::*; +pub use periodic_tail_checker::*; pub use repair_tail::*; pub use seal::*; diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs new file mode 100644 index 000000000..52c221388 --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs @@ -0,0 +1,93 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Weak; +use std::time::Duration; + +use tokio::time::Instant; +use tracing::{debug, trace}; + +use restate_core::network::TransportConnect; +use restate_types::replicated_loglet::ReplicatedLogletId; + +use crate::loglet::{Loglet, OperationError}; +use crate::providers::replicated_loglet::loglet::ReplicatedLoglet; + +pub struct PeriodicTailChecker {} + +impl PeriodicTailChecker { + pub async fn run( + loglet_id: ReplicatedLogletId, + loglet: Weak>, + duration: Duration, + ) -> anyhow::Result<()> { + debug!( + %loglet_id, + "Starting a background periodic tail checker for this loglet", + ); + // Optimization. Don't run the check if the tail/seal has been updated recently. + // Unfortunately this requires a litte bit more setup in the TailOffsetWatch so we don't do + // it. + loop { + let Some(loglet) = loglet.upgrade() else { + trace!( + %loglet_id, + "Loglet has been dropped, stopping periodic tail checker", + ); + return Ok(()); + }; + trace!( + %loglet_id, + is_sequencer = ?loglet.is_sequencer_local(), + "Checking tail status for loglet", + ); + if loglet.known_global_tail().is_sealed() { + // stop the task. we are sealed already. + trace!( + %loglet_id, + is_sequencer = ?loglet.is_sequencer_local(), + "Loglet has been sealed, stopping the periodic tail checker", + ); + return Ok(()); + } + tokio::time::sleep(duration).await; + let start = Instant::now(); + match loglet.find_tail().await { + Ok(tail) => { + // todo: maybe remove this. + trace!( + %loglet_id, + known_global_tail = %tail.offset(), + is_sequencer = ?loglet.is_sequencer_local(), + is_sealed = ?tail.is_sealed(), + "Successfully determined the tail status of the loglet, took={:?}", + start.elapsed(), + ); + } + Err(OperationError::Shutdown(_)) => { + trace!( + %loglet_id, + is_sequencer = ?loglet.is_sequencer_local(), + "System is shutting down, stopping period tail checker", + ); + return Ok(()); + } + Err(OperationError::Other(e)) => { + trace!( + ?e, + is_sequencer = ?loglet.is_sequencer_local(), + %loglet_id, + "Couldn't determine the tail status of the loglet. Will retry in the next period", + ); + } + } + } + } +}