From b9b485e2fbd42d8d075ffbac57d913cb560b8253 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 6 May 2024 04:24:59 +0000 Subject: [PATCH 1/3] fix: register regions during procedure recovery --- src/common/meta/src/ddl/drop_database.rs | 23 ++++++- .../meta/src/ddl/drop_database/executor.rs | 44 ++++++++++++- src/common/meta/src/ddl/drop_table.rs | 21 +++++- src/common/meta/src/ddl/tests/drop_table.rs | 64 +++++++++++++++++++ src/common/meta/src/region_keeper.rs | 7 ++ 5 files changed, 152 insertions(+), 7 deletions(-) diff --git a/src/common/meta/src/ddl/drop_database.rs b/src/common/meta/src/ddl/drop_database.rs index e91485ee0c52..5c942725a206 100644 --- a/src/common/meta/src/ddl/drop_database.rs +++ b/src/common/meta/src/ddl/drop_database.rs @@ -20,7 +20,8 @@ pub mod start; use std::any::Any; use std::fmt::Debug; -use common_procedure::error::{Error as ProcedureError, FromJsonSnafu, ToJsonSnafu}; +use common_error::ext::BoxedError; +use common_procedure::error::{Error as ProcedureError, ExternalSnafu, FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, }; @@ -68,6 +69,11 @@ pub(crate) trait State: Send + Debug { ctx: &mut DropDatabaseContext, ) -> Result<(Box, Status)>; + /// The hook is called during the recovery. + fn on_recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> { + Ok(()) + } + /// Returns as [Any](std::any::Any). fn as_any(&self) -> &dyn Any; } @@ -88,6 +94,11 @@ impl DropDatabaseProcedure { } } + fn on_recover(&mut self) -> Result<()> { + let state = &mut self.state; + state.on_recover(&self.runtime_context) + } + pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult { let DropDatabaseOwnedData { catalog, @@ -96,7 +107,7 @@ impl DropDatabaseProcedure { state, } = serde_json::from_str(json).context(FromJsonSnafu)?; - Ok(Self { + let mut procedure = Self { runtime_context, context: DropDatabaseContext { catalog, @@ -105,7 +116,13 @@ impl DropDatabaseProcedure { tables: None, }, state, - }) + }; + procedure + .on_recover() + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + Ok(procedure) } } diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index acc2d6333156..839c74e1f928 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -64,7 +64,11 @@ impl DropDatabaseExecutor { } impl DropDatabaseExecutor { - fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> { + /// Registers the operating regions. + pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> { + if !self.dropping_regions.is_empty() { + return Ok(()); + } let dropping_regions = operating_leader_regions(&self.physical_region_routes); let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len()); for (region_id, datanode_id) in dropping_regions { @@ -85,6 +89,10 @@ impl DropDatabaseExecutor { #[async_trait::async_trait] #[typetag::serde] impl State for DropDatabaseExecutor { + fn on_recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> { + self.register_dropping_regions(ddl_ctx) + } + async fn next( &mut self, ddl_ctx: &DdlContext, @@ -338,4 +346,38 @@ mod tests { let err = state.next(&ddl_context, &mut ctx).await.unwrap_err(); assert!(err.is_retry_later()); } + + #[tokio::test] + async fn test_on_recovery() { + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await; + let (_, table_route) = ddl_context + .table_metadata_manager + .table_route_manager() + .get_physical_table_route(physical_table_id) + .await + .unwrap(); + { + let mut state = DropDatabaseExecutor::new( + physical_table_id, + physical_table_id, + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), + table_route.region_routes.clone(), + DropTableTarget::Physical, + ); + let mut ctx = DropDatabaseContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + drop_if_exists: false, + tables: None, + }; + state.on_recover(&ddl_context).unwrap(); + assert_eq!(state.dropping_regions.len(), 1); + let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); + assert!(!status.need_persist()); + let cursor = state.as_any().downcast_ref::().unwrap(); + assert_eq!(cursor.target, DropTableTarget::Physical); + } + } } diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index fcf1ffb4aa00..b9a8afd82af5 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -16,7 +16,8 @@ pub(crate) mod executor; mod metadata; use async_trait::async_trait; -use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; +use common_error::ext::BoxedError; +use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Result as ProcedureResult, Status, @@ -68,12 +69,26 @@ impl DropTableProcedure { pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?; let executor = data.build_executor(); - Ok(Self { + // Only registers regions if the metadata is deleted. + let register_operating_regions = matches!( + data.state, + DropTableState::DeleteMetadata + | DropTableState::InvalidateTableCache + | DropTableState::DatanodeDropRegions + ); + let mut procedure = Self { context, data, dropping_regions: vec![], executor, - }) + }; + if register_operating_regions { + procedure + .register_dropping_regions() + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + } + Ok(procedure) } pub(crate) async fn on_prepare<'a>(&mut self) -> Result { diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index 20034fa06f97..343750a6d446 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -296,3 +296,67 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() { inner_test(new_drop_table_task("s", logical_table_id, false)).await; inner_test(new_drop_table_task("t", physical_table_id, false)).await; } + +#[tokio::test] +async fn test_from_json() { + for (state, num_operating_regions, num_operating_regions_after_recovery) in [ + (DropTableState::DeleteMetadata, 0, 1), + (DropTableState::InvalidateTableCache, 1, 1), + (DropTableState::DatanodeDropRegions, 1, 1), + (DropTableState::DeleteTombstone, 1, 0), + ] { + let cluster_id = 1; + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend); + + let physical_table_id = create_physical_table(&ddl_context, cluster_id, "t").await; + let task = new_drop_table_task("t", physical_table_id, false); + let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone()); + execute_procedure_until(&mut procedure, |p| p.data.state == state).await; + let data = procedure.dump().unwrap(); + assert_eq!( + ddl_context.memory_region_keeper.len(), + num_operating_regions + ); + // Cleans up the keeper. + ddl_context.memory_region_keeper.clear(); + let procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap(); + assert_eq!( + ddl_context.memory_region_keeper.len(), + num_operating_regions_after_recovery + ); + assert_eq!( + procedure.dropping_regions.len(), + num_operating_regions_after_recovery + ); + } + + let num_operating_regions = 0; + let num_operating_regions_after_recovery = 0; + let cluster_id = 1; + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend); + + let physical_table_id = create_physical_table(&ddl_context, cluster_id, "t").await; + let task = new_drop_table_task("t", physical_table_id, false); + let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone()); + execute_procedure_until_done(&mut procedure).await; + let data = procedure.dump().unwrap(); + assert_eq!( + ddl_context.memory_region_keeper.len(), + num_operating_regions + ); + // Cleans up the keeper. + ddl_context.memory_region_keeper.clear(); + let procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap(); + assert_eq!( + ddl_context.memory_region_keeper.len(), + num_operating_regions_after_recovery + ); + assert_eq!( + procedure.dropping_regions.len(), + num_operating_regions_after_recovery + ); +} diff --git a/src/common/meta/src/region_keeper.rs b/src/common/meta/src/region_keeper.rs index 1002dcc6b8d0..fc5f0531b45e 100644 --- a/src/common/meta/src/region_keeper.rs +++ b/src/common/meta/src/region_keeper.rs @@ -101,9 +101,16 @@ impl MemoryRegionKeeper { inner.len() } + /// Returns true if it's empty. pub fn is_empty(&self) -> bool { self.len() == 0 } + + #[cfg(test)] + pub fn clear(&self) { + let mut inner = self.inner.write().unwrap(); + inner.clear(); + } } #[cfg(test)] From f38c0bf792d7f0e5283d49fdcaf1e51b4e845776 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 6 May 2024 08:25:29 +0000 Subject: [PATCH 2/3] feat: add `recover` to `Procedure` trait --- src/common/procedure/src/local.rs | 106 +++++++++++++++++++++++++- src/common/procedure/src/procedure.rs | 5 ++ 2 files changed, 109 insertions(+), 2 deletions(-) diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 4e13a82b240b..d43c2071ca84 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -488,7 +488,7 @@ impl LocalManager { if message.parent_id.is_none() { // This is the root procedure. We only submit the root procedure as it will // submit sub-procedures to the manager. - let Some(loaded_procedure) = self + let Some(mut loaded_procedure) = self .manager_ctx .load_one_procedure_from_message(*procedure_id, message) else { @@ -515,13 +515,17 @@ impl LocalManager { InitProcedureState::Running => ProcedureState::Running, }; + if let Err(e) = loaded_procedure.procedure.recover() { + logging::error!(e; "Failed to recover procedure {}", procedure_id); + } + if let Err(e) = self.submit_root( *procedure_id, procedure_state, loaded_procedure.step, loaded_procedure.procedure, ) { - logging::error!(e; "Failed to recover procedure {}", procedure_id); + logging::error!(e; "Failed to submit recovered procedure {}", procedure_id); } } } @@ -688,6 +692,7 @@ mod tests { use common_error::mock::MockError; use common_error::status_code::StatusCode; use common_test_util::temp_dir::create_temp_dir; + use tokio::time::timeout; use super::*; use crate::error::{self, Error}; @@ -1141,4 +1146,101 @@ mod tests { .unwrap() .is_none()); } + + #[derive(Debug)] + struct ProcedureToRecover { + content: String, + lock_key: LockKey, + notify: Option>, + } + + #[async_trait] + impl Procedure for ProcedureToRecover { + fn type_name(&self) -> &str { + "ProcedureToRecover" + } + + async fn execute(&mut self, _ctx: &Context) -> Result { + Ok(Status::done()) + } + + fn dump(&self) -> Result { + Ok(self.content.clone()) + } + + fn lock_key(&self) -> LockKey { + self.lock_key.clone() + } + + fn recover(&mut self) -> Result<()> { + self.notify.as_ref().unwrap().notify_one(); + Ok(()) + } + } + + impl ProcedureToRecover { + fn new(content: &str) -> ProcedureToRecover { + ProcedureToRecover { + content: content.to_string(), + lock_key: LockKey::default(), + notify: None, + } + } + + fn loader(notify: Arc) -> BoxedProcedureLoader { + let f = move |json: &str| { + let procedure = ProcedureToRecover { + content: json.to_string(), + lock_key: LockKey::default(), + notify: Some(notify.clone()), + }; + Ok(Box::new(procedure) as _) + }; + Box::new(f) + } + } + + #[tokio::test] + async fn test_procedure_recover() { + common_telemetry::init_default_ut_logging(); + let dir = create_temp_dir("procedure_recover"); + let object_store = test_util::new_object_store(&dir); + let config = ManagerConfig { + parent_path: "data/".to_string(), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), + ..Default::default() + }; + let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); + let manager = LocalManager::new(config, state_store); + manager.manager_ctx.start(); + + let notify = Arc::new(Notify::new()); + manager + .register_loader( + "ProcedureToRecover", + ProcedureToRecover::loader(notify.clone()), + ) + .unwrap(); + + // Prepare data + let procedure_store = ProcedureStore::from_object_store(object_store.clone()); + let root: BoxedProcedure = Box::new(ProcedureToRecover::new("test procedure recovery")); + let root_id = ProcedureId::random(); + // Prepare data for the root procedure. + for step in 0..3 { + let type_name = root.type_name().to_string(); + let data = root.dump().unwrap(); + procedure_store + .store_procedure(root_id, step, type_name, data, None) + .await + .unwrap(); + } + + // Recover the manager + manager.recover().await.unwrap(); + timeout(Duration::from_secs(10), notify.notified()) + .await + .unwrap(); + } } diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 709a8ac4e441..6c694315e93b 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -140,6 +140,11 @@ pub trait Procedure: Send { /// Dump the state of the procedure to a string. fn dump(&self) -> Result; + /// The hook is called after the procedure recovery. + fn recover(&mut self) -> Result<()> { + Ok(()) + } + /// Returns the [LockKey] that this procedure needs to acquire. fn lock_key(&self) -> LockKey; } From 50c2a198aa51cd0c54fd3b4f977de0bf9eb52865 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Mon, 6 May 2024 08:46:21 +0000 Subject: [PATCH 3/3] refactor: move recovery to `recover` method --- src/common/meta/src/ddl/create_table.rs | 34 ++++---- src/common/meta/src/ddl/drop_database.rs | 27 ++++--- .../meta/src/ddl/drop_database/executor.rs | 4 +- src/common/meta/src/ddl/drop_table.rs | 36 +++++---- .../meta/src/ddl/tests/drop_database.rs | 79 ++++++++++++++++++- src/common/meta/src/ddl/tests/drop_table.rs | 6 +- 6 files changed, 138 insertions(+), 48 deletions(-) diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 044715b32381..4300ab013a33 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -63,20 +63,13 @@ impl CreateTableProcedure { pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data = serde_json::from_str(json).context(FromJsonSnafu)?; - let mut creator = TableCreator { - data, - opening_regions: vec![], - }; - - // Only registers regions if the table route is allocated. - if let Some(x) = &creator.data.table_route { - creator.opening_regions = creator - .register_opening_regions(&context, &x.region_routes) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - } - - Ok(CreateTableProcedure { context, creator }) + Ok(CreateTableProcedure { + context, + creator: TableCreator { + data, + opening_regions: vec![], + }, + }) } fn table_info(&self) -> &RawTableInfo { @@ -296,6 +289,19 @@ impl Procedure for CreateTableProcedure { Self::TYPE_NAME } + fn recover(&mut self) -> ProcedureResult<()> { + // Only registers regions if the table route is allocated. + if let Some(x) = &self.creator.data.table_route { + self.creator.opening_regions = self + .creator + .register_opening_regions(&self.context, &x.region_routes) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + } + + Ok(()) + } + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &self.creator.data.state; diff --git a/src/common/meta/src/ddl/drop_database.rs b/src/common/meta/src/ddl/drop_database.rs index 5c942725a206..ce62b7d0c316 100644 --- a/src/common/meta/src/ddl/drop_database.rs +++ b/src/common/meta/src/ddl/drop_database.rs @@ -70,7 +70,7 @@ pub(crate) trait State: Send + Debug { ) -> Result<(Box, Status)>; /// The hook is called during the recovery. - fn on_recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> { + fn recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> { Ok(()) } @@ -94,11 +94,6 @@ impl DropDatabaseProcedure { } } - fn on_recover(&mut self) -> Result<()> { - let state = &mut self.state; - state.on_recover(&self.runtime_context) - } - pub fn from_json(json: &str, runtime_context: DdlContext) -> ProcedureResult { let DropDatabaseOwnedData { catalog, @@ -107,7 +102,7 @@ impl DropDatabaseProcedure { state, } = serde_json::from_str(json).context(FromJsonSnafu)?; - let mut procedure = Self { + Ok(Self { runtime_context, context: DropDatabaseContext { catalog, @@ -116,13 +111,12 @@ impl DropDatabaseProcedure { tables: None, }, state, - }; - procedure - .on_recover() - .map_err(BoxedError::new) - .context(ExternalSnafu)?; + }) + } - Ok(procedure) + #[cfg(test)] + pub(crate) fn state(&self) -> &dyn State { + self.state.as_ref() } } @@ -132,6 +126,13 @@ impl Procedure for DropDatabaseProcedure { Self::TYPE_NAME } + fn recover(&mut self) -> ProcedureResult<()> { + self.state + .recover(&self.runtime_context) + .map_err(BoxedError::new) + .context(ExternalSnafu) + } + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &mut self.state; diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 839c74e1f928..48b840e8d9bf 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -89,7 +89,7 @@ impl DropDatabaseExecutor { #[async_trait::async_trait] #[typetag::serde] impl State for DropDatabaseExecutor { - fn on_recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> { + fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> { self.register_dropping_regions(ddl_ctx) } @@ -372,7 +372,7 @@ mod tests { drop_if_exists: false, tables: None, }; - state.on_recover(&ddl_context).unwrap(); + state.recover(&ddl_context).unwrap(); assert_eq!(state.dropping_regions.len(), 1); let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); assert!(!status.need_persist()); diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index b9a8afd82af5..dd597e54a87f 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -69,26 +69,13 @@ impl DropTableProcedure { pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?; let executor = data.build_executor(); - // Only registers regions if the metadata is deleted. - let register_operating_regions = matches!( - data.state, - DropTableState::DeleteMetadata - | DropTableState::InvalidateTableCache - | DropTableState::DatanodeDropRegions - ); - let mut procedure = Self { + + Ok(Self { context, data, dropping_regions: vec![], executor, - }; - if register_operating_regions { - procedure - .register_dropping_regions() - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - } - Ok(procedure) + }) } pub(crate) async fn on_prepare<'a>(&mut self) -> Result { @@ -190,6 +177,23 @@ impl Procedure for DropTableProcedure { Self::TYPE_NAME } + fn recover(&mut self) -> ProcedureResult<()> { + // Only registers regions if the metadata is deleted. + let register_operating_regions = matches!( + self.data.state, + DropTableState::DeleteMetadata + | DropTableState::InvalidateTableCache + | DropTableState::DatanodeDropRegions + ); + if register_operating_regions { + self.register_dropping_regions() + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + } + + Ok(()) + } + async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &self.data.state; let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE diff --git a/src/common/meta/src/ddl/tests/drop_database.rs b/src/common/meta/src/ddl/tests/drop_database.rs index 656e6eb914e8..66a5d3e7568b 100644 --- a/src/common/meta/src/ddl/tests/drop_database.rs +++ b/src/common/meta/src/ddl/tests/drop_database.rs @@ -16,9 +16,12 @@ use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId}; -use common_procedure_test::MockContextProvider; +use common_procedure_test::{ + execute_procedure_until, execute_procedure_until_done, MockContextProvider, +}; use futures::TryStreamExt; +use crate::ddl::drop_database::executor::DropDatabaseExecutor; use crate::ddl::drop_database::DropDatabaseProcedure; use crate::ddl::test_util::datanode_handler::{NaiveDatanodeHandler, RetryErrorDatanodeHandler}; use crate::ddl::test_util::{create_logical_table, create_physical_table}; @@ -121,3 +124,77 @@ async fn test_drop_database_retryable_error() { } } } + +#[tokio::test] +async fn test_drop_database_recover() { + common_telemetry::init_default_ut_logging(); + let cluster_id = 1; + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); + ddl_context + .table_metadata_manager + .schema_manager() + .create( + SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME), + None, + false, + ) + .await + .unwrap(); + // Creates a physical table + let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await; + // Creates a logical tables + create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await; + let mut procedure = DropDatabaseProcedure::new( + DEFAULT_CATALOG_NAME.to_string(), + DEFAULT_SCHEMA_NAME.to_string(), + false, + ddl_context.clone(), + ); + let num_operating_regions = 1; + // Before dropping the logical table + execute_procedure_until(&mut procedure, |p| { + p.state() + .as_any() + .downcast_ref::() + .is_some() + }) + .await; + // Dump data + let data = procedure.dump().unwrap(); + assert_eq!(ddl_context.memory_region_keeper.len(), 0); + let mut procedure = DropDatabaseProcedure::from_json(&data, ddl_context.clone()).unwrap(); + procedure.recover().unwrap(); + assert_eq!( + ddl_context.memory_region_keeper.len(), + num_operating_regions + ); + ddl_context.memory_region_keeper.clear(); + // Before dropping the physical table + execute_procedure_until(&mut procedure, |p| { + p.state() + .as_any() + .downcast_ref::() + .is_some() + }) + .await; + // Dump data + let data = procedure.dump().unwrap(); + assert_eq!(ddl_context.memory_region_keeper.len(), 0); + let mut procedure = DropDatabaseProcedure::from_json(&data, ddl_context.clone()).unwrap(); + procedure.recover().unwrap(); + assert_eq!( + ddl_context.memory_region_keeper.len(), + num_operating_regions + ); + ddl_context.memory_region_keeper.clear(); + execute_procedure_until_done(&mut procedure).await; + let tables = ddl_context + .table_metadata_manager + .table_name_manager() + .tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) + .try_collect::>() + .await + .unwrap(); + assert!(tables.is_empty()); +} diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index 343750a6d446..fd34e2646348 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -321,7 +321,8 @@ async fn test_from_json() { ); // Cleans up the keeper. ddl_context.memory_region_keeper.clear(); - let procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap(); + let mut procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap(); + procedure.recover().unwrap(); assert_eq!( ddl_context.memory_region_keeper.len(), num_operating_regions_after_recovery @@ -350,7 +351,8 @@ async fn test_from_json() { ); // Cleans up the keeper. ddl_context.memory_region_keeper.clear(); - let procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap(); + let mut procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap(); + procedure.recover().unwrap(); assert_eq!( ddl_context.memory_region_keeper.len(), num_operating_regions_after_recovery