Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: register & deregister region failure detectors actively #4223

Merged
merged 17 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@
| `failure_detector` | -- | -- | -- |
| `failure_detector.threshold` | Float | `8.0` | -- |
| `failure_detector.min_std_deviation` | String | `100ms` | -- |
| `failure_detector.acceptable_heartbeat_pause` | String | `3000ms` | -- |
| `failure_detector.acceptable_heartbeat_pause` | String | `10000ms` | -- |
| `failure_detector.first_heartbeat_estimate` | String | `1000ms` | -- |
| `datanode` | -- | -- | Datanode options. |
| `datanode.client` | -- | -- | Datanode client options. |
Expand Down
2 changes: 1 addition & 1 deletion config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ max_metadata_value_size = "1500KiB"
[failure_detector]
threshold = 8.0
min_std_deviation = "100ms"
acceptable_heartbeat_pause = "3000ms"
acceptable_heartbeat_pause = "10000ms"
first_heartbeat_estimate = "1000ms"

## Datanode options.
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, ProcedureExecutorRef};
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
Expand Down Expand Up @@ -559,6 +559,7 @@ impl StartCommand {
flow_metadata_manager,
flow_metadata_allocator,
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
coderabbitai[bot] marked this conversation as resolved.
Show resolved Hide resolved
},
procedure_manager,
true,
Expand Down
55 changes: 53 additions & 2 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionNumber, TableId};
use store_api::storage::{RegionId, RegionNumber, TableId};

use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
Expand All @@ -30,7 +30,7 @@ use crate::peer::PeerLookupServiceRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::ClusterId;
use crate::{ClusterId, DatanodeId};
WenyXu marked this conversation as resolved.
Show resolved Hide resolved

pub mod alter_logical_tables;
pub mod alter_table;
Expand Down Expand Up @@ -102,6 +102,33 @@ pub struct TableMetadata {
pub region_wal_options: HashMap<RegionNumber, String>,
}

pub type RegionFailureDetectorControllerRef = Arc<dyn RegionFailureDetectorController>;

WenyXu marked this conversation as resolved.
Show resolved Hide resolved
pub type DetectingRegion = (ClusterId, DatanodeId, RegionId);

/// Used for actively registering Region failure detectors.
///
/// Ensuring the Region Supervisor can detect Region failures without relying on the first heartbeat from the datanode.
#[async_trait::async_trait]
pub trait RegionFailureDetectorController: Send + Sync {
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
/// Registers failure detectors for the given identifiers.
async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>);

/// Deregisters failure detectors for the given identifiers.
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>);
}

/// A noop implementation of [`RegionFailureDetectorController`].
#[derive(Debug, Clone)]
pub struct NoopRegionFailureDetectorControl;

#[async_trait::async_trait]
impl RegionFailureDetectorController for NoopRegionFailureDetectorControl {
async fn register_failure_detectors(&self, _detecting_regions: Vec<DetectingRegion>) {}

async fn deregister_failure_detectors(&self, _detecting_regions: Vec<DetectingRegion>) {}
}

/// The context of ddl.
#[derive(Clone)]
pub struct DdlContext {
Expand All @@ -121,4 +148,28 @@ pub struct DdlContext {
pub flow_metadata_allocator: FlowMetadataAllocatorRef,
/// look up peer by id.
pub peer_lookup_service: PeerLookupServiceRef,
/// controller of region failure detector.
pub region_failure_detector_controller: RegionFailureDetectorControllerRef,
}

impl DdlContext {
/// Notifies the RegionSupervisor to register failure detector of new created regions.
///
/// The datanode may crash without sending a heartbeat that contains information about newly created regions,
/// which may prevent the RegionSupervisor from detecting failures in these newly created regions.
pub async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
self.region_failure_detector_controller
.register_failure_detectors(detecting_regions)
.await;
}

/// Notifies the RegionSupervisor to remove failure detectors.
///
/// Once the regions were dropped, subsequent heartbeats no longer include these regions.
/// Therefore, we should remove the failure detectors for these dropped regions.
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
self.region_failure_detector_controller
.deregister_failure_detectors(detecting_regions)
.await;
}
}
16 changes: 14 additions & 2 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;

use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
use crate::ddl::utils::{
add_peer_context_if_needed, convert_region_routes_to_detecting_regions, handle_retry_error,
region_storage_path,
};
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
Expand Down Expand Up @@ -265,16 +268,25 @@ impl CreateTableProcedure {
/// - Failed to create table metadata.
async fn on_create_metadata(&mut self) -> Result<Status> {
let table_id = self.table_id();
let cluster_id = self.creator.data.cluster_id;
let manager = &self.context.table_metadata_manager;

let raw_table_info = self.table_info().clone();
// Safety: the region_wal_options must be allocated.
let region_wal_options = self.region_wal_options()?.clone();
// Safety: the table_route must be allocated.
let table_route = TableRouteValue::Physical(self.table_route()?.clone());
let physical_table_route = self.table_route()?.clone();
let detecting_regions = convert_region_routes_to_detecting_regions(
cluster_id,
&physical_table_route.region_routes,
);
let table_route = TableRouteValue::Physical(physical_table_route);
manager
.create_table_metadata(raw_table_info, table_route, region_wal_options)
.await?;
self.context
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
.register_failure_detectors(detecting_regions)
.await;
info!("Created table metadata for table {table_id}");

self.creator.opening_regions.clear();
Expand Down
4 changes: 4 additions & 0 deletions src/common/meta/src/ddl/drop_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::table_name::TableNameValue;
use crate::lock_key::{CatalogLock, SchemaLock};
use crate::ClusterId;

pub struct DropDatabaseProcedure {
/// The context of procedure runtime.
Expand All @@ -53,6 +54,7 @@ pub(crate) enum DropTableTarget {

/// Context of [DropDatabaseProcedure] execution.
pub(crate) struct DropDatabaseContext {
cluster_id: ClusterId,
coderabbitai[bot] marked this conversation as resolved.
Show resolved Hide resolved
catalog: String,
schema: String,
drop_if_exists: bool,
Expand Down Expand Up @@ -85,6 +87,7 @@ impl DropDatabaseProcedure {
Self {
runtime_context: context,
context: DropDatabaseContext {
cluster_id: 0,
catalog,
schema,
drop_if_exists,
Expand All @@ -105,6 +108,7 @@ impl DropDatabaseProcedure {
Ok(Self {
runtime_context,
context: DropDatabaseContext {
cluster_id: 0,
catalog,
schema,
drop_if_exists,
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/src/ddl/drop_database/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ mod tests {
// It always starts from Logical
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
coderabbitai[bot] marked this conversation as resolved.
Show resolved Hide resolved
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down Expand Up @@ -256,6 +257,7 @@ mod tests {
// It always starts from Logical
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down Expand Up @@ -284,6 +286,7 @@ mod tests {
let ddl_context = new_ddl_context(node_manager);
let mut state = DropDatabaseCursor::new(DropTableTarget::Physical);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down
11 changes: 9 additions & 2 deletions src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ impl State for DropDatabaseExecutor {
async fn next(
&mut self,
ddl_ctx: &DdlContext,
_ctx: &mut DropDatabaseContext,
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)> {
self.register_dropping_regions(ddl_ctx)?;
let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
let executor =
DropTableExecutor::new(ctx.cluster_id, self.table_name.clone(), self.table_id, true);
// Deletes metadata for table permanently.
let table_route_value = TableRouteValue::new(
self.table_id,
Expand Down Expand Up @@ -186,6 +187,7 @@ mod tests {
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand All @@ -198,6 +200,7 @@ mod tests {
}
// Execute again
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down Expand Up @@ -238,6 +241,7 @@ mod tests {
DropTableTarget::Logical,
);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand All @@ -250,6 +254,7 @@ mod tests {
}
// Execute again
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down Expand Up @@ -339,6 +344,7 @@ mod tests {
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down Expand Up @@ -368,6 +374,7 @@ mod tests {
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/ddl/drop_database/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ mod tests {
.unwrap();
let mut state = DropDatabaseRemoveMetadata;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: true,
Expand All @@ -144,6 +145,7 @@ mod tests {
// Schema not exists
let mut state = DropDatabaseRemoveMetadata;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: true,
Expand Down
3 changes: 3 additions & 0 deletions src/common/meta/src/ddl/drop_database/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ mod tests {
let ddl_context = new_ddl_context(node_manager);
let mut step = DropDatabaseStart;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: false,
Expand All @@ -104,6 +105,7 @@ mod tests {
let ddl_context = new_ddl_context(node_manager);
let mut state = DropDatabaseStart;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: true,
Expand All @@ -126,6 +128,7 @@ mod tests {
.unwrap();
let mut state = DropDatabaseStart;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: false,
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ impl DropTableData {

fn build_executor(&self) -> DropTableExecutor {
DropTableExecutor::new(
self.cluster_id,
self.task.table_name(),
self.task.table_id,
self.task.drop_if_exists,
Expand Down
Loading