diff --git a/config/config.md b/config/config.md index 7ff0364ed535..a594e7368074 100644 --- a/config/config.md +++ b/config/config.md @@ -259,7 +259,7 @@ | `failure_detector` | -- | -- | -- | | `failure_detector.threshold` | Float | `8.0` | -- | | `failure_detector.min_std_deviation` | String | `100ms` | -- | -| `failure_detector.acceptable_heartbeat_pause` | String | `3000ms` | -- | +| `failure_detector.acceptable_heartbeat_pause` | String | `10000ms` | -- | | `failure_detector.first_heartbeat_estimate` | String | `1000ms` | -- | | `datanode` | -- | -- | Datanode options. | | `datanode.client` | -- | -- | Datanode client options. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 71dc48077c9c..1128d274cef2 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -54,7 +54,7 @@ max_metadata_value_size = "1500KiB" [failure_detector] threshold = 8.0 min_std_deviation = "100ms" -acceptable_heartbeat_pause = "3000ms" +acceptable_heartbeat_pause = "10000ms" first_heartbeat_estimate = "1000ms" ## Datanode options. diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b48951062fc2..316aa6db7f25 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -25,7 +25,7 @@ use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef}; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; -use common_meta::ddl::{DdlContext, ProcedureExecutorRef}; +use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef}; use common_meta::ddl_manager::DdlManager; use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; @@ -559,6 +559,7 @@ impl StartCommand { flow_metadata_manager, flow_metadata_allocator, peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, procedure_manager, true, diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index 3e9443d88d4b..008153a94284 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use common_telemetry::tracing_context::W3cTrace; -use store_api::storage::{RegionNumber, TableId}; +use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::cache_invalidator::CacheInvalidatorRef; use crate::ddl::flow_meta::FlowMetadataAllocatorRef; @@ -30,7 +30,7 @@ use crate::peer::PeerLookupServiceRef; use crate::region_keeper::MemoryRegionKeeperRef; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; -use crate::ClusterId; +use crate::{ClusterId, DatanodeId}; pub mod alter_logical_tables; pub mod alter_table; @@ -102,6 +102,33 @@ pub struct TableMetadata { pub region_wal_options: HashMap, } +pub type RegionFailureDetectorControllerRef = Arc; + +pub type DetectingRegion = (ClusterId, DatanodeId, RegionId); + +/// Used for actively registering Region failure detectors. +/// +/// Ensuring the Region Supervisor can detect Region failures without relying on the first heartbeat from the datanode. +#[async_trait::async_trait] +pub trait RegionFailureDetectorController: Send + Sync { + /// Registers failure detectors for the given identifiers. + async fn register_failure_detectors(&self, detecting_regions: Vec); + + /// Deregisters failure detectors for the given identifiers. + async fn deregister_failure_detectors(&self, detecting_regions: Vec); +} + +/// A noop implementation of [`RegionFailureDetectorController`]. +#[derive(Debug, Clone)] +pub struct NoopRegionFailureDetectorControl; + +#[async_trait::async_trait] +impl RegionFailureDetectorController for NoopRegionFailureDetectorControl { + async fn register_failure_detectors(&self, _detecting_regions: Vec) {} + + async fn deregister_failure_detectors(&self, _detecting_regions: Vec) {} +} + /// The context of ddl. #[derive(Clone)] pub struct DdlContext { @@ -121,4 +148,28 @@ pub struct DdlContext { pub flow_metadata_allocator: FlowMetadataAllocatorRef, /// look up peer by id. pub peer_lookup_service: PeerLookupServiceRef, + /// controller of region failure detector. + pub region_failure_detector_controller: RegionFailureDetectorControllerRef, +} + +impl DdlContext { + /// Notifies the RegionSupervisor to register failure detector of new created regions. + /// + /// The datanode may crash without sending a heartbeat that contains information about newly created regions, + /// which may prevent the RegionSupervisor from detecting failures in these newly created regions. + pub async fn register_failure_detectors(&self, detecting_regions: Vec) { + self.region_failure_detector_controller + .register_failure_detectors(detecting_regions) + .await; + } + + /// Notifies the RegionSupervisor to remove failure detectors. + /// + /// Once the regions were dropped, subsequent heartbeats no longer include these regions. + /// Therefore, we should remove the failure detectors for these dropped regions. + async fn deregister_failure_detectors(&self, detecting_regions: Vec) { + self.region_failure_detector_controller + .deregister_failure_detectors(detecting_regions) + .await; + } } diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index d0b889609a84..1d171f595e44 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -33,7 +33,10 @@ use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; -use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path}; +use crate::ddl::utils::{ + add_peer_context_if_needed, convert_region_routes_to_detecting_regions, handle_retry_error, + region_storage_path, +}; use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext}; use crate::error::{self, Result}; use crate::key::table_name::TableNameKey; @@ -265,16 +268,25 @@ impl CreateTableProcedure { /// - Failed to create table metadata. async fn on_create_metadata(&mut self) -> Result { let table_id = self.table_id(); + let cluster_id = self.creator.data.cluster_id; let manager = &self.context.table_metadata_manager; let raw_table_info = self.table_info().clone(); // Safety: the region_wal_options must be allocated. let region_wal_options = self.region_wal_options()?.clone(); // Safety: the table_route must be allocated. - let table_route = TableRouteValue::Physical(self.table_route()?.clone()); + let physical_table_route = self.table_route()?.clone(); + let detecting_regions = convert_region_routes_to_detecting_regions( + cluster_id, + &physical_table_route.region_routes, + ); + let table_route = TableRouteValue::Physical(physical_table_route); manager .create_table_metadata(raw_table_info, table_route, region_wal_options) .await?; + self.context + .register_failure_detectors(detecting_regions) + .await; info!("Created table metadata for table {table_id}"); self.creator.opening_regions.clear(); diff --git a/src/common/meta/src/ddl/drop_database.rs b/src/common/meta/src/ddl/drop_database.rs index ce62b7d0c316..578e7744f1a6 100644 --- a/src/common/meta/src/ddl/drop_database.rs +++ b/src/common/meta/src/ddl/drop_database.rs @@ -35,6 +35,7 @@ use crate::ddl::DdlContext; use crate::error::Result; use crate::key::table_name::TableNameValue; use crate::lock_key::{CatalogLock, SchemaLock}; +use crate::ClusterId; pub struct DropDatabaseProcedure { /// The context of procedure runtime. @@ -53,6 +54,7 @@ pub(crate) enum DropTableTarget { /// Context of [DropDatabaseProcedure] execution. pub(crate) struct DropDatabaseContext { + cluster_id: ClusterId, catalog: String, schema: String, drop_if_exists: bool, @@ -85,6 +87,7 @@ impl DropDatabaseProcedure { Self { runtime_context: context, context: DropDatabaseContext { + cluster_id: 0, catalog, schema, drop_if_exists, @@ -105,6 +108,7 @@ impl DropDatabaseProcedure { Ok(Self { runtime_context, context: DropDatabaseContext { + cluster_id: 0, catalog, schema, drop_if_exists, diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs index c3dd8a582684..3b25b4202539 100644 --- a/src/common/meta/src/ddl/drop_database/cursor.rs +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -221,6 +221,7 @@ mod tests { // It always starts from Logical let mut state = DropDatabaseCursor::new(DropTableTarget::Logical); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -256,6 +257,7 @@ mod tests { // It always starts from Logical let mut state = DropDatabaseCursor::new(DropTableTarget::Logical); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -284,6 +286,7 @@ mod tests { let ddl_context = new_ddl_context(node_manager); let mut state = DropDatabaseCursor::new(DropTableTarget::Physical); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 433e7dc96b20..a8d8ed9d1ffc 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -96,10 +96,11 @@ impl State for DropDatabaseExecutor { async fn next( &mut self, ddl_ctx: &DdlContext, - _ctx: &mut DropDatabaseContext, + ctx: &mut DropDatabaseContext, ) -> Result<(Box, Status)> { self.register_dropping_regions(ddl_ctx)?; - let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true); + let executor = + DropTableExecutor::new(ctx.cluster_id, self.table_name.clone(), self.table_id, true); // Deletes metadata for table permanently. let table_route_value = TableRouteValue::new( self.table_id, @@ -186,6 +187,7 @@ mod tests { DropTableTarget::Physical, ); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -198,6 +200,7 @@ mod tests { } // Execute again let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -238,6 +241,7 @@ mod tests { DropTableTarget::Logical, ); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -250,6 +254,7 @@ mod tests { } // Execute again let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -339,6 +344,7 @@ mod tests { DropTableTarget::Physical, ); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, @@ -368,6 +374,7 @@ mod tests { DropTableTarget::Physical, ); let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: DEFAULT_CATALOG_NAME.to_string(), schema: DEFAULT_SCHEMA_NAME.to_string(), drop_if_exists: false, diff --git a/src/common/meta/src/ddl/drop_database/metadata.rs b/src/common/meta/src/ddl/drop_database/metadata.rs index 005806146013..8d338df07c5f 100644 --- a/src/common/meta/src/ddl/drop_database/metadata.rs +++ b/src/common/meta/src/ddl/drop_database/metadata.rs @@ -118,6 +118,7 @@ mod tests { .unwrap(); let mut state = DropDatabaseRemoveMetadata; let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: "foo".to_string(), schema: "bar".to_string(), drop_if_exists: true, @@ -144,6 +145,7 @@ mod tests { // Schema not exists let mut state = DropDatabaseRemoveMetadata; let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: "foo".to_string(), schema: "bar".to_string(), drop_if_exists: true, diff --git a/src/common/meta/src/ddl/drop_database/start.rs b/src/common/meta/src/ddl/drop_database/start.rs index 792eeac8dda1..deeb8ed215ed 100644 --- a/src/common/meta/src/ddl/drop_database/start.rs +++ b/src/common/meta/src/ddl/drop_database/start.rs @@ -89,6 +89,7 @@ mod tests { let ddl_context = new_ddl_context(node_manager); let mut step = DropDatabaseStart; let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: "foo".to_string(), schema: "bar".to_string(), drop_if_exists: false, @@ -104,6 +105,7 @@ mod tests { let ddl_context = new_ddl_context(node_manager); let mut state = DropDatabaseStart; let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: "foo".to_string(), schema: "bar".to_string(), drop_if_exists: true, @@ -126,6 +128,7 @@ mod tests { .unwrap(); let mut state = DropDatabaseStart; let mut ctx = DropDatabaseContext { + cluster_id: 0, catalog: "foo".to_string(), schema: "bar".to_string(), drop_if_exists: false, diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index f3840a7d6774..e2a7adf3cc0e 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -279,6 +279,7 @@ impl DropTableData { fn build_executor(&self) -> DropTableExecutor { DropTableExecutor::new( + self.cluster_id, self.task.table_name(), self.task.table_id, self.task.drop_if_exists, diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 0783ce86ccaf..3848eeb4fc54 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -26,13 +26,14 @@ use table::metadata::TableId; use table::table_name::TableName; use crate::cache_invalidator::Context; -use crate::ddl::utils::add_peer_context_if_needed; +use crate::ddl::utils::{add_peer_context_if_needed, convert_region_routes_to_detecting_regions}; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::instruction::CacheIdent; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; +use crate::ClusterId; /// [Control] indicated to the caller whether to go to the next step. #[derive(Debug)] @@ -50,8 +51,14 @@ impl Control { impl DropTableExecutor { /// Returns the [DropTableExecutor]. - pub fn new(table: TableName, table_id: TableId, drop_if_exists: bool) -> Self { + pub fn new( + cluster_id: ClusterId, + table: TableName, + table_id: TableId, + drop_if_exists: bool, + ) -> Self { Self { + cluster_id, table, table_id, drop_if_exists, @@ -64,6 +71,7 @@ impl DropTableExecutor { /// - Invalidates the cache on the Frontend nodes. /// - Drops the regions on the Datanode nodes. pub struct DropTableExecutor { + cluster_id: ClusterId, table: TableName, table_id: TableId, drop_if_exists: bool, @@ -130,7 +138,17 @@ impl DropTableExecutor { ) -> Result<()> { ctx.table_metadata_manager .destroy_table_metadata(self.table_id, &self.table, table_route_value) - .await + .await?; + + let detecting_regions = if table_route_value.is_physical() { + // Safety: checked. + let regions = table_route_value.region_routes().unwrap(); + convert_region_routes_to_detecting_regions(self.cluster_id, regions) + } else { + vec![] + }; + ctx.deregister_failure_detectors(detecting_regions).await; + Ok(()) } /// Restores the table metadata. @@ -274,6 +292,7 @@ mod tests { let node_manager = Arc::new(MockDatanodeManager::new(())); let ctx = new_ddl_context(node_manager); let executor = DropTableExecutor::new( + 0, TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"), 1024, true, @@ -283,6 +302,7 @@ mod tests { // Drops a non-exists table let executor = DropTableExecutor::new( + 0, TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"), 1024, false, @@ -292,6 +312,7 @@ mod tests { // Drops a exists table let executor = DropTableExecutor::new( + 0, TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"), 1024, false, diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index de6171d4efcf..36cb97338655 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -19,11 +19,14 @@ use snafu::{ensure, location, Location, OptionExt}; use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; use table::metadata::TableId; +use crate::ddl::DetectingRegion; use crate::error::{Error, Result, TableNotFoundSnafu, UnsupportedSnafu}; use crate::key::table_name::TableNameKey; use crate::key::TableMetadataManagerRef; use crate::peer::Peer; use crate::rpc::ddl::CreateTableTask; +use crate::rpc::router::RegionRoute; +use crate::ClusterId; /// Adds [Peer] context if the error is unretryable. pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error { @@ -126,3 +129,19 @@ pub async fn get_physical_table_id( .get_physical_table_id(logical_table_id) .await } + +/// Converts a list of [`RegionRoute`] to a list of [`DetectingRegion`]. +pub fn convert_region_routes_to_detecting_regions( + cluster_id: ClusterId, + region_routes: &[RegionRoute], +) -> Vec { + region_routes + .iter() + .flat_map(|route| { + route + .leader_peer + .as_ref() + .map(|peer| (cluster_id, peer.id, route.region.id)) + }) + .collect::>() +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 1228de66dab7..567498a38dbc 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -805,7 +805,7 @@ mod tests { use crate::ddl::flow_meta::FlowMetadataAllocator; use crate::ddl::table_meta::TableMetadataAllocator; use crate::ddl::truncate_table::TruncateTableProcedure; - use crate::ddl::DdlContext; + use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl}; use crate::key::flow::FlowMetadataManager; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; @@ -856,6 +856,7 @@ mod tests { flow_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, procedure_manager.clone(), true, diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index a68b93597737..44c534dc32d8 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -24,7 +24,7 @@ use common_recordbatch::SendableRecordBatchStream; use crate::cache_invalidator::DummyCacheInvalidator; use crate::ddl::flow_meta::FlowMetadataAllocator; use crate::ddl::table_meta::TableMetadataAllocator; -use crate::ddl::DdlContext; +use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl}; use crate::error::Result; use crate::key::flow::FlowMetadataManager; use crate::key::TableMetadataManager; @@ -182,6 +182,7 @@ pub fn new_ddl_context_with_kv_backend( flow_metadata_allocator, flow_metadata_manager, peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), } } diff --git a/src/meta-srv/src/failure_detector.rs b/src/meta-srv/src/failure_detector.rs index bc7f4972fd00..ac0df6c2dc13 100644 --- a/src/meta-srv/src/failure_detector.rs +++ b/src/meta-srv/src/failure_detector.rs @@ -15,6 +15,7 @@ use std::collections::VecDeque; use std::time::Duration; +use common_meta::distributed_time_constants; use serde::{Deserialize, Serialize}; /// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)" @@ -37,7 +38,7 @@ use serde::{Deserialize, Serialize}; /// /// where F is the cumulative distribution function of a normal distribution with mean /// and standard deviation estimated from historical heartbeat inter-arrival times. -#[cfg_attr(test, derive(Clone))] +#[cfg_attr(test, derive(Debug, Clone, PartialEq))] pub(crate) struct PhiAccrualFailureDetector { /// A low threshold is prone to generate many wrong suspicions but ensures a quick detection /// in the event of a real crash. Conversely, a high threshold generates fewer mistakes but @@ -82,7 +83,9 @@ impl Default for PhiAccrualFailureDetectorOptions { Self { threshold: 8_f32, min_std_deviation: Duration::from_millis(100), - acceptable_heartbeat_pause: Duration::from_millis(3000), + acceptable_heartbeat_pause: Duration::from_secs( + distributed_time_constants::DATANODE_LEASE_SECS, + ), first_heartbeat_estimate: Duration::from_millis(1000), } } @@ -195,7 +198,7 @@ fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 { /// It is capped by the number of samples specified in `max_sample_size`. /// /// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] struct HeartbeatHistory { /// Number of samples to use for calculation of mean and standard deviation of inter-arrival /// times. diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index efef360a204a..4024a77af8ff 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -26,8 +26,10 @@ pub struct RegionFailureHandler { } impl RegionFailureHandler { - pub(crate) fn new(mut region_supervisor: RegionSupervisor) -> Self { - let heartbeat_acceptor = region_supervisor.heartbeat_acceptor(); + pub(crate) fn new( + mut region_supervisor: RegionSupervisor, + heartbeat_acceptor: HeartbeatAcceptor, + ) -> Self { info!("Starting region supervisor"); common_runtime::spawn_bg(async move { region_supervisor.run().await }); Self { heartbeat_acceptor } @@ -71,13 +73,13 @@ mod tests { use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::builder::MetasrvBuilder; use crate::region::supervisor::tests::new_test_supervisor; - use crate::region::supervisor::Event; + use crate::region::supervisor::{Event, HeartbeatAcceptor}; #[tokio::test] async fn test_handle_heartbeat() { - let supervisor = new_test_supervisor(); - let sender = supervisor.sender(); - let handler = RegionFailureHandler::new(supervisor); + let (supervisor, sender) = new_test_supervisor(); + let heartbeat_acceptor = HeartbeatAcceptor::new(sender.clone()); + let handler = RegionFailureHandler::new(supervisor, heartbeat_acceptor); let req = &HeartbeatRequest::default(); let builder = MetasrvBuilder::new(); let metasrv = builder.build().await.unwrap(); diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 3f3a86b2e8bf..3c039fc136b9 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -22,7 +22,9 @@ use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_grpc::channel_manager::ChannelConfig; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; -use common_meta::ddl::DdlContext; +use common_meta::ddl::{ + DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef, +}; use common_meta::ddl_manager::DdlManager; use common_meta::distributed_time_constants; use common_meta::key::flow::FlowMetadataManager; @@ -68,7 +70,10 @@ use crate::metasrv::{ use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::DefaultContextFactory; use crate::pubsub::PublisherRef; -use crate::region::supervisor::{RegionSupervisor, DEFAULT_TICK_INTERVAL}; +use crate::region::supervisor::{ + HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker, + DEFAULT_TICK_INTERVAL, +}; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::round_robin::RoundRobinSelector; use crate::service::mailbox::MailboxRef; @@ -282,6 +287,60 @@ impl MetasrvBuilder { }, )); let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone())); + if !is_remote_wal && options.enable_region_failover { + return error::UnexpectedSnafu { + violated: "Region failover is not supported in the local WAL implementation!", + } + .fail(); + } + + let (tx, rx) = RegionSupervisor::channel(); + let (region_failure_detector_controller, region_supervisor_ticker): ( + RegionFailureDetectorControllerRef, + Option>, + ) = if options.enable_region_failover && is_remote_wal { + ( + Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _, + Some(Arc::new(RegionSupervisorTicker::new( + DEFAULT_TICK_INTERVAL, + tx.clone(), + ))), + ) + } else { + (Arc::new(NoopRegionFailureDetectorControl) as _, None as _) + }; + + let region_migration_manager = Arc::new(RegionMigrationManager::new( + procedure_manager.clone(), + DefaultContextFactory::new( + table_metadata_manager.clone(), + memory_region_keeper.clone(), + region_failure_detector_controller.clone(), + mailbox.clone(), + options.server_addr.clone(), + ), + )); + region_migration_manager.try_start()?; + + let region_failover_handler = if options.enable_region_failover && is_remote_wal { + let region_supervisor = RegionSupervisor::new( + rx, + options.failure_detector, + selector_ctx.clone(), + selector.clone(), + region_migration_manager.clone(), + leader_cached_kv_backend.clone() as _, + peer_lookup_service.clone(), + ); + + Some(RegionFailureHandler::new( + region_supervisor, + HeartbeatAcceptor::new(tx), + )) + } else { + None + }; + let ddl_manager = Arc::new( DdlManager::try_new( DdlContext { @@ -292,7 +351,8 @@ impl MetasrvBuilder { table_metadata_allocator: table_metadata_allocator.clone(), flow_metadata_manager: flow_metadata_manager.clone(), flow_metadata_allocator: flow_metadata_allocator.clone(), - peer_lookup_service: peer_lookup_service.clone(), + peer_lookup_service, + region_failure_detector_controller, }, procedure_manager.clone(), true, @@ -300,44 +360,6 @@ impl MetasrvBuilder { .context(error::InitDdlManagerSnafu)?, ); - let region_migration_manager = Arc::new(RegionMigrationManager::new( - procedure_manager.clone(), - DefaultContextFactory::new( - table_metadata_manager.clone(), - memory_region_keeper.clone(), - mailbox.clone(), - options.server_addr.clone(), - ), - )); - region_migration_manager.try_start()?; - - if !is_remote_wal && options.enable_region_failover { - return error::UnexpectedSnafu { - violated: "Region failover is not supported in the local WAL implementation!", - } - .fail(); - } - - let (region_failover_handler, region_supervisor_ticker) = - if options.enable_region_failover && is_remote_wal { - let region_supervisor = RegionSupervisor::new( - options.failure_detector, - DEFAULT_TICK_INTERVAL, - selector_ctx.clone(), - selector.clone(), - region_migration_manager.clone(), - leader_cached_kv_backend.clone() as _, - peer_lookup_service, - ); - let region_supervisor_ticker = region_supervisor.ticker(); - ( - Some(RegionFailureHandler::new(region_supervisor)), - Some(region_supervisor_ticker), - ) - } else { - (None, None) - }; - let handler_group = match handler_group { Some(handler_group) => handler_group, None => { diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index da9a5641a5ac..3edea35c0707 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -29,6 +29,7 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use common_error::ext::BoxedError; +use common_meta::ddl::RegionFailureDetectorControllerRef; use common_meta::instruction::{CacheIdent, Instruction}; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; use common_meta::key::table_info::TableInfoValue; @@ -154,15 +155,17 @@ pub struct DefaultContextFactory { volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, opening_region_keeper: MemoryRegionKeeperRef, + region_failure_detector_controller: RegionFailureDetectorControllerRef, mailbox: MailboxRef, server_addr: String, } impl DefaultContextFactory { - /// Returns an [ContextFactoryImpl]. + /// Returns an [`DefaultContextFactory`]. pub fn new( table_metadata_manager: TableMetadataManagerRef, opening_region_keeper: MemoryRegionKeeperRef, + region_failure_detector_controller: RegionFailureDetectorControllerRef, mailbox: MailboxRef, server_addr: String, ) -> Self { @@ -170,6 +173,7 @@ impl DefaultContextFactory { volatile_ctx: VolatileContext::default(), table_metadata_manager, opening_region_keeper, + region_failure_detector_controller, mailbox, server_addr, } @@ -183,6 +187,7 @@ impl ContextFactory for DefaultContextFactory { volatile_ctx: self.volatile_ctx, table_metadata_manager: self.table_metadata_manager, opening_region_keeper: self.opening_region_keeper, + region_failure_detector_controller: self.region_failure_detector_controller, mailbox: self.mailbox, server_addr: self.server_addr, } @@ -195,6 +200,7 @@ pub struct Context { volatile_ctx: VolatileContext, table_metadata_manager: TableMetadataManagerRef, opening_region_keeper: MemoryRegionKeeperRef, + region_failure_detector_controller: RegionFailureDetectorControllerRef, mailbox: MailboxRef, server_addr: String, } @@ -236,6 +242,20 @@ impl Context { Ok(table_route_value.as_ref().unwrap()) } + /// Notifies the RegionSupervisor to register failure detectors of failed region. + /// + /// The original failure detector was removed once the procedure was triggered. + /// Now, we need to register the failure detector for the failed region. + pub async fn register_failure_detectors(&self) { + let cluster_id = self.persistent_ctx.cluster_id; + let datanode_id = self.persistent_ctx.from_peer.id; + let region_id = self.persistent_ctx.region_id; + + self.region_failure_detector_controller + .register_failure_detectors(vec![(cluster_id, datanode_id, region_id)]) + .await; + } + /// Removes the `table_route` of [VolatileContext], returns true if any. pub fn remove_table_route_value(&mut self) -> bool { let value = self.volatile_ctx.table_route.take(); diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 2f3638ef9323..6cc8ea12a507 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -20,6 +20,7 @@ use std::time::Duration; use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader}; +use common_meta::ddl::NoopRegionFailureDetectorControl; use common_meta::instruction::{ DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply, }; @@ -150,6 +151,7 @@ impl TestingEnv { volatile_ctx: Default::default(), mailbox: self.mailbox_ctx.mailbox().clone(), server_addr: self.server_addr.to_string(), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index c9253be2d597..8da1bbb0dbc9 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -51,6 +51,7 @@ impl UpdateMetadata { }); } + ctx.register_failure_detectors().await; ctx.remove_table_route_value(); Ok(()) @@ -60,6 +61,7 @@ impl UpdateMetadata { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::sync::Arc; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -71,6 +73,7 @@ mod tests { use crate::procedure::region_migration::test_util::{self, TestingEnv}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; + use crate::region::supervisor::RegionFailureDetectorControl; fn new_persistent_context() -> PersistentContext { test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)) @@ -98,6 +101,8 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); + let (tx, mut rx) = tokio::sync::mpsc::channel(8); + ctx.region_failure_detector_controller = Arc::new(RegionFailureDetectorControl::new(tx)); let table_id = ctx.region_id().table_id(); let table_info = new_test_table_info(1024, vec![1, 2, 3]).into(); @@ -161,8 +166,18 @@ mod tests { assert!(ctx.volatile_ctx.table_route.is_none()); assert!(err.is_retryable()); assert!(format!("{err:?}").contains("Failed to update the table route")); - + assert_eq!(rx.len(), 0); state.rollback_downgraded_region(&mut ctx).await.unwrap(); + let event = rx.try_recv().unwrap(); + let detecting_regions = event.into_region_failure_detectors(); + assert_eq!( + detecting_regions, + vec![( + ctx.persistent_ctx.cluster_id, + from_peer.id, + ctx.persistent_ctx.region_id + )] + ); let table_route = table_metadata_manager .table_route_manager() diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 2812294781a6..eda1ae7cdf5b 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -114,7 +114,7 @@ pub mod test_data { use common_catalog::consts::MITO2_ENGINE; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; - use common_meta::ddl::DdlContext; + use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl}; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; @@ -226,6 +226,7 @@ pub mod test_data { flow_metadata_allocator, memory_region_keeper: Arc::new(MemoryRegionKeeper::new()), peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), } } } diff --git a/src/meta-srv/src/region/failure_detector.rs b/src/meta-srv/src/region/failure_detector.rs index e9c574cadd3e..8533d27f30ac 100644 --- a/src/meta-srv/src/region/failure_detector.rs +++ b/src/meta-srv/src/region/failure_detector.rs @@ -14,27 +14,24 @@ use std::ops::DerefMut; -use common_meta::{ClusterId, DatanodeId}; +use common_meta::ddl::DetectingRegion; use dashmap::mapref::multiple::RefMulti; use dashmap::DashMap; -use store_api::storage::RegionId; use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions}; -pub(crate) type Ident = (ClusterId, DatanodeId, RegionId); - /// Detects the region failures. pub(crate) struct RegionFailureDetector { options: PhiAccrualFailureDetectorOptions, - detectors: DashMap, + detectors: DashMap, } pub(crate) struct FailureDetectorEntry<'a> { - e: RefMulti<'a, Ident, PhiAccrualFailureDetector>, + e: RefMulti<'a, DetectingRegion, PhiAccrualFailureDetector>, } impl FailureDetectorEntry<'_> { - pub(crate) fn region_ident(&self) -> &Ident { + pub(crate) fn region_ident(&self) -> &DetectingRegion { self.e.key() } @@ -51,16 +48,31 @@ impl RegionFailureDetector { } } - /// Returns [PhiAccrualFailureDetector] of the specific ([DatanodeId],[RegionId]). + /// Returns [`PhiAccrualFailureDetector`] of the specific [`DetectingRegion`]. pub(crate) fn region_failure_detector( &self, - ident: Ident, + detecting_region: DetectingRegion, ) -> impl DerefMut + '_ { self.detectors - .entry(ident) + .entry(detecting_region) .or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options)) } + /// Returns A mutable reference to the [`PhiAccrualFailureDetector`] for the specified [`DetectingRegion`]. + /// If a detector already exists for the region, it is returned. Otherwise, a new + /// detector is created and initialized with the provided timestamp. + pub(crate) fn maybe_init_region_failure_detector( + &self, + detecting_region: DetectingRegion, + ts_millis: i64, + ) -> impl DerefMut + '_ { + self.detectors.entry(detecting_region).or_insert_with(|| { + let mut detector = PhiAccrualFailureDetector::from_options(self.options); + detector.heartbeat(ts_millis); + detector + }) + } + /// Returns a [FailureDetectorEntry] iterator. pub(crate) fn iter(&self) -> impl Iterator + '_ { self.detectors @@ -69,8 +81,8 @@ impl RegionFailureDetector { } /// Removes the specific [PhiAccrualFailureDetector] if exists. - pub(crate) fn remove(&self, ident: &Ident) { - self.detectors.remove(ident); + pub(crate) fn remove(&self, region: &DetectingRegion) { + self.detectors.remove(region); } /// Removes all [PhiAccrualFailureDetector]s. @@ -78,10 +90,10 @@ impl RegionFailureDetector { self.detectors.clear() } - /// Returns true if the specific `ident` exists. + /// Returns true if the specific [`DetectingRegion`] exists. #[cfg(test)] - pub(crate) fn contains(&self, ident: &Ident) -> bool { - self.detectors.contains_key(ident) + pub(crate) fn contains(&self, region: &DetectingRegion) -> bool { + self.detectors.contains_key(region) } /// Returns the length @@ -110,14 +122,16 @@ impl RegionFailureDetector { #[cfg(test)] mod tests { + use store_api::storage::RegionId; + use super::*; #[test] fn test_default_failure_detector_container() { let container = RegionFailureDetector::new(Default::default()); - let ident = (0, 2, RegionId::new(1, 1)); - let _ = container.region_failure_detector(ident); - assert!(container.contains(&ident)); + let detecting_region = (0, 2, RegionId::new(1, 1)); + let _ = container.region_failure_detector(detecting_region); + assert!(container.contains(&detecting_region)); { let mut iter = container.iter(); diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 83b264eaa4ba..8c7dff9b3df8 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -16,6 +16,7 @@ use std::fmt::Debug; use std::sync::{Arc, Mutex}; use std::time::Duration; +use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController}; use common_meta::key::MAINTENANCE_KEY; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::PeerLookupServiceRef; @@ -29,13 +30,13 @@ use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::time::{interval, MissedTickBehavior}; -use super::failure_detector::RegionFailureDetector; use crate::error::{self, Result}; use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::node_stat::Stat; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::procedure::region_migration::RegionMigrationProcedureTask; +use crate::region::failure_detector::RegionFailureDetector; use crate::selector::SelectorOptions; /// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode. @@ -75,18 +76,38 @@ impl From<&Stat> for DatanodeHeartbeat { /// of the supervisor during tests. pub(crate) enum Event { Tick, + RegisterFailureDetectors(Vec), + DeregisterFailureDetectors(Vec), HeartbeatArrived(DatanodeHeartbeat), Clear, #[cfg(test)] Dump(tokio::sync::oneshot::Sender), } +#[cfg(test)] +impl Event { + pub(crate) fn into_region_failure_detectors(self) -> Vec { + match self { + Self::RegisterFailureDetectors(detecting_regions) => detecting_regions, + _ => unreachable!(), + } + } +} + impl Debug for Event { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Tick => write!(f, "Tick"), Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(), Self::Clear => write!(f, "Clear"), + Self::RegisterFailureDetectors(arg0) => f + .debug_tuple("RegisterFailureDetectors") + .field(arg0) + .finish(), + Self::DeregisterFailureDetectors(arg0) => f + .debug_tuple("DeregisterFailureDetectors") + .field(arg0) + .finish(), #[cfg(test)] Self::Dump(_) => f.debug_struct("Dump").finish(), } @@ -109,6 +130,14 @@ pub struct RegionSupervisorTicker { } impl RegionSupervisorTicker { + pub(crate) fn new(tick_interval: Duration, sender: Sender) -> Self { + Self { + tick_handle: Mutex::new(None), + tick_interval, + sender, + } + } + /// Starts the ticker. pub fn start(&self) { let mut handle = self.tick_handle.lock().unwrap(); @@ -160,12 +189,8 @@ pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1); pub struct RegionSupervisor { /// Used to detect the failure of regions. failure_detector: RegionFailureDetector, - /// The interval of tick - tick_interval: Duration, /// Receives [Event]s. receiver: Receiver, - /// [Event] Sender. - sender: Sender, /// The context of [`SelectorRef`] selector_context: SelectorContext, /// Candidate node selector. @@ -178,44 +203,77 @@ pub struct RegionSupervisor { peer_lookup: PeerLookupServiceRef, } +/// Controller for managing failure detectors for regions. +#[derive(Debug, Clone)] +pub struct RegionFailureDetectorControl { + sender: Sender, +} + +impl RegionFailureDetectorControl { + pub(crate) fn new(sender: Sender) -> Self { + Self { sender } + } +} + +#[async_trait::async_trait] +impl RegionFailureDetectorController for RegionFailureDetectorControl { + async fn register_failure_detectors(&self, detecting_regions: Vec) { + if let Err(err) = self + .sender + .send(Event::RegisterFailureDetectors(detecting_regions)) + .await + { + error!(err; "RegionSupervisor is stop receiving heartbeat"); + } + } + + async fn deregister_failure_detectors(&self, detecting_regions: Vec) { + if let Err(err) = self + .sender + .send(Event::DeregisterFailureDetectors(detecting_regions)) + .await + { + error!(err; "RegionSupervisor is stop receiving heartbeat"); + } + } +} + /// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`]. pub(crate) struct HeartbeatAcceptor { sender: Sender, } impl HeartbeatAcceptor { + pub(crate) fn new(sender: Sender) -> Self { + Self { sender } + } + /// Accepts heartbeats from datanodes. pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) { - if let Err(e) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await { - error!(e; "RegionSupervisor is stop receiving heartbeat"); + if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await { + error!(err; "RegionSupervisor is stop receiving heartbeat"); } } } -#[cfg(test)] impl RegionSupervisor { - /// Returns the [Event] sender. - pub(crate) fn sender(&self) -> Sender { - self.sender.clone() + /// Returns a a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages. + pub(crate) fn channel() -> (Sender, Receiver) { + tokio::sync::mpsc::channel(1024) } -} -impl RegionSupervisor { pub(crate) fn new( + event_receiver: Receiver, options: PhiAccrualFailureDetectorOptions, - tick_interval: Duration, selector_context: SelectorContext, selector: SelectorRef, region_migration_manager: RegionMigrationManagerRef, kv_backend: KvBackendRef, peer_lookup: PeerLookupServiceRef, ) -> Self { - let (tx, rx) = tokio::sync::mpsc::channel(1024); Self { failure_detector: RegionFailureDetector::new(options), - tick_interval, - receiver: rx, - sender: tx, + receiver: event_receiver, selector_context, selector, region_migration_manager, @@ -224,22 +282,6 @@ impl RegionSupervisor { } } - /// Returns the [`HeartbeatAcceptor`]. - pub(crate) fn heartbeat_acceptor(&self) -> HeartbeatAcceptor { - HeartbeatAcceptor { - sender: self.sender.clone(), - } - } - - /// Returns the [`RegionSupervisorTicker`]. - pub(crate) fn ticker(&self) -> RegionSupervisorTickerRef { - Arc::new(RegionSupervisorTicker { - tick_interval: self.tick_interval, - sender: self.sender.clone(), - tick_handle: Mutex::new(None), - }) - } - /// Runs the main loop. pub(crate) async fn run(&mut self) { while let Some(event) = self.receiver.recv().await { @@ -248,6 +290,12 @@ impl RegionSupervisor { let regions = self.detect_region_failure(); self.handle_region_failures(regions).await; } + Event::RegisterFailureDetectors(detecting_regions) => { + self.register_failure_detectors(detecting_regions).await + } + Event::DeregisterFailureDetectors(detecting_regions) => { + self.deregister_failure_detectors(detecting_regions).await + } Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat), Event::Clear => self.clear(), #[cfg(test)] @@ -259,6 +307,21 @@ impl RegionSupervisor { info!("RegionSupervisor is stopped!"); } + async fn register_failure_detectors(&self, detecting_regions: Vec) { + let ts_millis = current_time_millis(); + for region in detecting_regions { + // The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode. + self.failure_detector + .maybe_init_region_failure_detector(region, ts_millis); + } + } + + async fn deregister_failure_detectors(&self, detecting_regions: Vec) { + for region in detecting_regions { + self.failure_detector.remove(®ion) + } + } + async fn handle_region_failures(&self, mut regions: Vec<(ClusterId, DatanodeId, RegionId)>) { if regions.is_empty() { return; @@ -376,8 +439,10 @@ impl RegionSupervisor { /// Updates the state of corresponding failure detectors. fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) { for region_id in heartbeat.regions { - let ident = (heartbeat.cluster_id, heartbeat.datanode_id, region_id); - let mut detector = self.failure_detector.region_failure_detector(ident); + let detecting_region = (heartbeat.cluster_id, heartbeat.datanode_id, region_id); + let mut detector = self + .failure_detector + .region_failure_detector(detecting_region); detector.heartbeat(heartbeat.timestamp); } } @@ -393,22 +458,25 @@ pub(crate) mod tests { use std::sync::{Arc, Mutex}; use std::time::Duration; + use common_meta::ddl::RegionFailureDetectorController; use common_meta::peer::Peer; use common_meta::test_util::NoopPeerLookupService; use common_time::util::current_time_millis; use rand::Rng; use store_api::storage::RegionId; + use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; use tokio::time::sleep; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::test_util::TestingEnv; use crate::region::supervisor::{ - DatanodeHeartbeat, Event, RegionSupervisor, RegionSupervisorTicker, + DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor, + RegionSupervisorTicker, }; use crate::selector::test_utils::{new_test_selector_context, RandomNodeSelector}; - pub(crate) fn new_test_supervisor() -> RegionSupervisor { + pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender) { let env = TestingEnv::new(); let selector_context = new_test_selector_context(); let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)])); @@ -419,22 +487,25 @@ pub(crate) mod tests { )); let kv_backend = env.kv_backend(); let peer_lookup = Arc::new(NoopPeerLookupService); - - RegionSupervisor::new( - Default::default(), - Duration::from_secs(1), - selector_context, - selector, - region_migration_manager, - kv_backend, - peer_lookup, + let (tx, rx) = RegionSupervisor::channel(); + + ( + RegionSupervisor::new( + rx, + Default::default(), + selector_context, + selector, + region_migration_manager, + kv_backend, + peer_lookup, + ), + tx, ) } #[tokio::test] async fn test_heartbeat() { - let mut supervisor = new_test_supervisor(); - let sender = supervisor.sender(); + let (mut supervisor, sender) = new_test_supervisor(); tokio::spawn(async move { supervisor.run().await }); sender @@ -526,4 +597,37 @@ pub(crate) mod tests { } } } + + #[tokio::test] + async fn test_region_failure_detector_controller() { + let (mut supervisor, sender) = new_test_supervisor(); + let controller = RegionFailureDetectorControl::new(sender.clone()); + tokio::spawn(async move { supervisor.run().await }); + let detecting_region = (0, 1, RegionId::new(1, 1)); + controller + .register_failure_detectors(vec![detecting_region]) + .await; + + let (tx, rx) = oneshot::channel(); + sender.send(Event::Dump(tx)).await.unwrap(); + let detector = rx.await.unwrap(); + let region_detector = detector.region_failure_detector(detecting_region).clone(); + + // Registers failure detector again + controller + .register_failure_detectors(vec![detecting_region]) + .await; + let (tx, rx) = oneshot::channel(); + sender.send(Event::Dump(tx)).await.unwrap(); + let detector = rx.await.unwrap(); + let got = detector.region_failure_detector(detecting_region).clone(); + assert_eq!(region_detector, got); + + controller + .deregister_failure_detectors(vec![detecting_region]) + .await; + let (tx, rx) = oneshot::channel(); + sender.send(Event::Dump(tx)).await.unwrap(); + assert!(rx.await.unwrap().is_empty()); + } } diff --git a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs index 65b600ea4184..57332d5a0621 100644 --- a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs +++ b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs @@ -84,9 +84,9 @@ impl Arbitrary<'_> for FuzzInput { let seed = u.int_in_range(u64::MIN..=u64::MAX)?; let mut rng = ChaChaRng::seed_from_u64(seed); let columns = rng.gen_range(2..64); - let rows = rng.gen_range(2..4096); + let rows = rng.gen_range(2..2048); let tables = rng.gen_range(1..64); - let inserts = rng.gen_range(2..16); + let inserts = rng.gen_range(2..8); Ok(FuzzInput { columns, rows, @@ -264,7 +264,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { let mut rng = ChaCha20Rng::seed_from_u64(input.seed); info!("Generates {} tables", input.tables); let exprs = generate_create_exprs(input.tables, input.columns, &mut rng)?; - let parallelism = 8; + let parallelism = 4; let table_ctxs = exprs .iter() .map(|expr| Arc::new(TableContext::from(expr))) diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 2e761d52a280..458f51b0948a 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -23,7 +23,7 @@ use common_config::KvBackendConfig; use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; -use common_meta::ddl::DdlContext; +use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl}; use common_meta::ddl_manager::DdlManager; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::TableMetadataManager; @@ -199,6 +199,7 @@ impl GreptimeDbStandaloneBuilder { flow_metadata_manager, flow_metadata_allocator, peer_lookup_service: Arc::new(StandalonePeerLookupService::new()), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), }, procedure_manager.clone(), register_procedure_loaders,