Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: register regions during procedure recovery #3859

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,13 @@ impl CreateTableProcedure {
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;

let mut creator = TableCreator {
data,
opening_regions: vec![],
};

// Only registers regions if the table route is allocated.
if let Some(x) = &creator.data.table_route {
creator.opening_regions = creator
.register_opening_regions(&context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}

Ok(CreateTableProcedure { context, creator })
Ok(CreateTableProcedure {
context,
creator: TableCreator {
data,
opening_regions: vec![],
},
})
}

fn table_info(&self) -> &RawTableInfo {
Expand Down Expand Up @@ -295,6 +288,19 @@ impl Procedure for CreateTableProcedure {
Self::TYPE_NAME
}

fn recover(&mut self) -> ProcedureResult<()> {
// Only registers regions if the table route is allocated.
if let Some(x) = &self.creator.data.table_route {
self.creator.opening_regions = self
.creator
.register_opening_regions(&self.context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}

Ok(())
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.creator.data.state;

Expand Down
20 changes: 19 additions & 1 deletion src/common/meta/src/ddl/drop_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ pub mod start;
use std::any::Any;
use std::fmt::Debug;

use common_procedure::error::{Error as ProcedureError, FromJsonSnafu, ToJsonSnafu};
use common_error::ext::BoxedError;
use common_procedure::error::{Error as ProcedureError, ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
Expand Down Expand Up @@ -68,6 +69,11 @@ pub(crate) trait State: Send + Debug {
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)>;

/// The hook is called during the recovery.
fn recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> {
Ok(())
}

/// Returns as [Any](std::any::Any).
fn as_any(&self) -> &dyn Any;
}
Expand Down Expand Up @@ -107,6 +113,11 @@ impl DropDatabaseProcedure {
state,
})
}

#[cfg(test)]
pub(crate) fn state(&self) -> &dyn State {
self.state.as_ref()
}
}

#[async_trait]
Expand All @@ -115,6 +126,13 @@ impl Procedure for DropDatabaseProcedure {
Self::TYPE_NAME
}

fn recover(&mut self) -> ProcedureResult<()> {
self.state
.recover(&self.runtime_context)
.map_err(BoxedError::new)
.context(ExternalSnafu)
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;

Expand Down
44 changes: 43 additions & 1 deletion src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ impl DropDatabaseExecutor {
}

impl DropDatabaseExecutor {
fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
/// Registers the operating regions.
pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
if !self.dropping_regions.is_empty() {
return Ok(());
}
let dropping_regions = operating_leader_regions(&self.physical_region_routes);
let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
for (region_id, datanode_id) in dropping_regions {
Expand All @@ -85,6 +89,10 @@ impl DropDatabaseExecutor {
#[async_trait::async_trait]
#[typetag::serde]
impl State for DropDatabaseExecutor {
fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
self.register_dropping_regions(ddl_ctx)
}

async fn next(
&mut self,
ddl_ctx: &DdlContext,
Expand Down Expand Up @@ -338,4 +346,38 @@ mod tests {
let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
assert!(err.is_retry_later());
}

#[tokio::test]
async fn test_on_recovery() {
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(node_manager);
let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await;
let (_, table_route) = ddl_context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.unwrap();
{
let mut state = DropDatabaseExecutor::new(
physical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes.clone(),
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
state.recover(&ddl_context).unwrap();
assert_eq!(state.dropping_regions.len(), 1);
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Physical);
}
}
}
21 changes: 20 additions & 1 deletion src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub(crate) mod executor;
mod metadata;

use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_error::ext::BoxedError;
use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
Expand Down Expand Up @@ -68,6 +69,7 @@ impl DropTableProcedure {
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let executor = data.build_executor();

Ok(Self {
context,
data,
Expand Down Expand Up @@ -175,6 +177,23 @@ impl Procedure for DropTableProcedure {
Self::TYPE_NAME
}

fn recover(&mut self) -> ProcedureResult<()> {
// Only registers regions if the metadata is deleted.
let register_operating_regions = matches!(
self.data.state,
DropTableState::DeleteMetadata
| DropTableState::InvalidateTableCache
| DropTableState::DatanodeDropRegions
);
if register_operating_regions {
self.register_dropping_regions()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}

Ok(())
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE
Expand Down
79 changes: 78 additions & 1 deletion src/common/meta/src/ddl/tests/drop_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId};
use common_procedure_test::MockContextProvider;
use common_procedure_test::{
execute_procedure_until, execute_procedure_until_done, MockContextProvider,
};
use futures::TryStreamExt;

use crate::ddl::drop_database::executor::DropDatabaseExecutor;
use crate::ddl::drop_database::DropDatabaseProcedure;
use crate::ddl::test_util::datanode_handler::{NaiveDatanodeHandler, RetryErrorDatanodeHandler};
use crate::ddl::test_util::{create_logical_table, create_physical_table};
Expand Down Expand Up @@ -121,3 +124,77 @@ async fn test_drop_database_retryable_error() {
}
}
}

#[tokio::test]
async fn test_drop_database_recover() {
common_telemetry::init_default_ut_logging();
let cluster_id = 1;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(node_manager);
ddl_context
.table_metadata_manager
.schema_manager()
.create(
SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME),
None,
false,
)
.await
.unwrap();
// Creates a physical table
let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await;
// Creates a logical tables
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
let mut procedure = DropDatabaseProcedure::new(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
false,
ddl_context.clone(),
);
let num_operating_regions = 1;
// Before dropping the logical table
execute_procedure_until(&mut procedure, |p| {
p.state()
.as_any()
.downcast_ref::<DropDatabaseExecutor>()
.is_some()
})
.await;
// Dump data
let data = procedure.dump().unwrap();
assert_eq!(ddl_context.memory_region_keeper.len(), 0);
let mut procedure = DropDatabaseProcedure::from_json(&data, ddl_context.clone()).unwrap();
procedure.recover().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions
);
ddl_context.memory_region_keeper.clear();
// Before dropping the physical table
execute_procedure_until(&mut procedure, |p| {
p.state()
.as_any()
.downcast_ref::<DropDatabaseExecutor>()
.is_some()
})
.await;
// Dump data
let data = procedure.dump().unwrap();
assert_eq!(ddl_context.memory_region_keeper.len(), 0);
let mut procedure = DropDatabaseProcedure::from_json(&data, ddl_context.clone()).unwrap();
procedure.recover().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions
);
ddl_context.memory_region_keeper.clear();
execute_procedure_until_done(&mut procedure).await;
let tables = ddl_context
.table_metadata_manager
.table_name_manager()
.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert!(tables.is_empty());
}
66 changes: 66 additions & 0 deletions src/common/meta/src/ddl/tests/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,69 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
inner_test(new_drop_table_task("s", logical_table_id, false)).await;
inner_test(new_drop_table_task("t", physical_table_id, false)).await;
}

#[tokio::test]
async fn test_from_json() {
for (state, num_operating_regions, num_operating_regions_after_recovery) in [
(DropTableState::DeleteMetadata, 0, 1),
(DropTableState::InvalidateTableCache, 1, 1),
(DropTableState::DatanodeDropRegions, 1, 1),
(DropTableState::DeleteTombstone, 1, 0),
] {
let cluster_id = 1;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let kv_backend = Arc::new(MemoryKvBackend::new());
let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend);

let physical_table_id = create_physical_table(&ddl_context, cluster_id, "t").await;
let task = new_drop_table_task("t", physical_table_id, false);
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
execute_procedure_until(&mut procedure, |p| p.data.state == state).await;
let data = procedure.dump().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions
);
// Cleans up the keeper.
ddl_context.memory_region_keeper.clear();
let mut procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap();
procedure.recover().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions_after_recovery
);
assert_eq!(
procedure.dropping_regions.len(),
num_operating_regions_after_recovery
);
}

let num_operating_regions = 0;
let num_operating_regions_after_recovery = 0;
let cluster_id = 1;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let kv_backend = Arc::new(MemoryKvBackend::new());
let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend);

let physical_table_id = create_physical_table(&ddl_context, cluster_id, "t").await;
let task = new_drop_table_task("t", physical_table_id, false);
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
execute_procedure_until_done(&mut procedure).await;
let data = procedure.dump().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions
);
// Cleans up the keeper.
ddl_context.memory_region_keeper.clear();
let mut procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap();
procedure.recover().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions_after_recovery
);
assert_eq!(
procedure.dropping_regions.len(),
num_operating_regions_after_recovery
);
}
7 changes: 7 additions & 0 deletions src/common/meta/src/region_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,16 @@ impl MemoryRegionKeeper {
inner.len()
}

/// Returns true if it's empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}

#[cfg(test)]
pub fn clear(&self) {
let mut inner = self.inner.write().unwrap();
inner.clear();
}
}

#[cfg(test)]
Expand Down
Loading
Loading