diff --git a/src/cmd/src/cli/bench/metadata.rs b/src/cmd/src/cli/bench/metadata.rs index 6b5e4812bb80..a1009cfe8d6c 100644 --- a/src/cmd/src/cli/bench/metadata.rs +++ b/src/cmd/src/cli/bench/metadata.rs @@ -107,14 +107,11 @@ impl TableMetadataBencher { .unwrap(); let start = Instant::now(); let table_info = table_info.unwrap(); + let table_route = table_route.unwrap(); let table_id = table_info.table_info.ident.table_id; let _ = self .table_metadata_manager - .delete_table_metadata( - table_id, - &table_info.table_name(), - table_route.unwrap().region_routes().unwrap(), - ) + .delete_table_metadata(table_id, &table_info.table_name(), &table_route) .await; start.elapsed() }, diff --git a/src/common/meta/src/ddl/drop_database/cursor.rs b/src/common/meta/src/ddl/drop_database/cursor.rs index c4caf3522b8d..254a722e71d4 100644 --- a/src/common/meta/src/ddl/drop_database/cursor.rs +++ b/src/common/meta/src/ddl/drop_database/cursor.rs @@ -76,6 +76,7 @@ impl DropDatabaseCursor { .await?; Ok(( Box::new(DropDatabaseExecutor::new( + table_id, table_id, TableName::new(&ctx.catalog, &ctx.schema, &table_name), table_route.region_routes, @@ -86,6 +87,7 @@ impl DropDatabaseCursor { } (DropTableTarget::Physical, TableRouteValue::Physical(table_route)) => Ok(( Box::new(DropDatabaseExecutor::new( + table_id, table_id, TableName::new(&ctx.catalog, &ctx.schema, &table_name), table_route.region_routes, @@ -220,7 +222,7 @@ mod tests { .get_physical_table_route(physical_table_id) .await .unwrap(); - assert_eq!(table_route.region_routes, executor.region_routes); + assert_eq!(table_route.region_routes, executor.physical_region_routes); assert_eq!(executor.target, DropTableTarget::Logical); } diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 16308b948136..76911a4fa8ab 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -26,6 +26,7 @@ use crate::ddl::drop_database::State; use crate::ddl::drop_table::executor::DropTableExecutor; use crate::ddl::DdlContext; use crate::error::{self, Result}; +use crate::key::table_route::TableRouteValue; use crate::region_keeper::OperatingRegionGuard; use crate::rpc::router::{operating_leader_regions, RegionRoute}; use crate::table_name::TableName; @@ -33,8 +34,10 @@ use crate::table_name::TableName; #[derive(Debug, Serialize, Deserialize)] pub(crate) struct DropDatabaseExecutor { table_id: TableId, + physical_table_id: TableId, table_name: TableName, - pub(crate) region_routes: Vec, + /// The physical table region routes. + pub(crate) physical_region_routes: Vec, pub(crate) target: DropTableTarget, #[serde(skip)] dropping_regions: Vec, @@ -44,14 +47,16 @@ impl DropDatabaseExecutor { /// Returns a new [DropDatabaseExecutor]. pub fn new( table_id: TableId, + physical_table_id: TableId, table_name: TableName, - region_routes: Vec, + physical_region_routes: Vec, target: DropTableTarget, ) -> Self { Self { - table_name, table_id, - region_routes, + physical_table_id, + table_name, + physical_region_routes, target, dropping_regions: vec![], } @@ -60,7 +65,7 @@ impl DropDatabaseExecutor { impl DropDatabaseExecutor { fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> { - let dropping_regions = operating_leader_regions(&self.region_routes); + let dropping_regions = operating_leader_regions(&self.physical_region_routes); let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len()); for (region_id, datanode_id) in dropping_regions { let guard = ddl_ctx @@ -87,12 +92,18 @@ impl State for DropDatabaseExecutor { ) -> Result<(Box, Status)> { self.register_dropping_regions(ddl_ctx)?; let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true); + // Deletes metadata for table permanently. + let table_route_value = TableRouteValue::new( + self.table_id, + self.physical_table_id, + self.physical_region_routes.clone(), + ); executor - .on_remove_metadata(ddl_ctx, &self.region_routes) + .on_destroy_metadata(ddl_ctx, &table_route_value) .await?; executor.invalidate_table_cache(ddl_ctx).await?; executor - .on_drop_regions(ddl_ctx, &self.region_routes) + .on_drop_regions(ddl_ctx, &self.physical_region_routes) .await?; info!("Table: {}({}) is dropped", self.table_name, self.table_id); @@ -122,7 +133,9 @@ mod tests { use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State}; use crate::ddl::test_util::{create_logical_table, create_physical_table}; use crate::error::{self, Error, Result}; + use crate::key::datanode_table::DatanodeTableKey; use crate::peer::Peer; + use crate::rpc::router::region_distribution; use crate::table_name::TableName; use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager}; @@ -157,6 +170,7 @@ mod tests { .unwrap(); { let mut state = DropDatabaseExecutor::new( + physical_table_id, physical_table_id, TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), table_route.region_routes.clone(), @@ -181,9 +195,10 @@ mod tests { tables: None, }; let mut state = DropDatabaseExecutor::new( + physical_table_id, physical_table_id, TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), - table_route.region_routes, + table_route.region_routes.clone(), DropTableTarget::Physical, ); let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap(); @@ -207,6 +222,7 @@ mod tests { .unwrap(); { let mut state = DropDatabaseExecutor::new( + logical_table_id, physical_table_id, TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"), table_route.region_routes.clone(), @@ -231,8 +247,9 @@ mod tests { tables: None, }; let mut state = DropDatabaseExecutor::new( + logical_table_id, physical_table_id, - TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"), table_route.region_routes, DropTableTarget::Logical, ); @@ -240,6 +257,33 @@ mod tests { assert!(!status.need_persist()); let cursor = state.as_any().downcast_ref::().unwrap(); assert_eq!(cursor.target, DropTableTarget::Logical); + // Checks table info + ddl_context + .table_metadata_manager + .table_info_manager() + .get(physical_table_id) + .await + .unwrap() + .unwrap(); + // Checks table route + let table_route = ddl_context + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .get(physical_table_id) + .await + .unwrap() + .unwrap(); + let region_routes = table_route.region_routes().unwrap(); + for datanode_id in region_distribution(region_routes).into_keys() { + ddl_context + .table_metadata_manager + .datanode_table_manager() + .get(&DatanodeTableKey::new(datanode_id, physical_table_id)) + .await + .unwrap() + .unwrap(); + } } #[derive(Clone)] @@ -279,6 +323,7 @@ mod tests { .await .unwrap(); let mut state = DropDatabaseExecutor::new( + physical_table_id, physical_table_id, TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"), table_route.region_routes, diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index e00d1571e27c..14a39b835bd2 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -18,9 +18,11 @@ mod metadata; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ - Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, + Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, + Result as ProcedureResult, Status, }; use common_telemetry::info; +use common_telemetry::tracing::warn; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use strum::AsRefStr; @@ -31,9 +33,7 @@ use self::executor::DropTableExecutor; use crate::ddl::utils::handle_retry_error; use crate::ddl::DdlContext; use crate::error::{self, Result}; -use crate::key::table_info::TableInfoValue; use crate::key::table_route::TableRouteValue; -use crate::key::DeserializedValueWithBytes; use crate::lock_key::{CatalogLock, SchemaLock, TableLock}; use crate::metrics; use crate::region_keeper::OperatingRegionGuard; @@ -47,46 +47,50 @@ pub struct DropTableProcedure { pub data: DropTableData, /// The guards of opening regions. pub dropping_regions: Vec, + /// The drop table executor. + executor: DropTableExecutor, } impl DropTableProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::DropTable"; pub fn new(cluster_id: u64, task: DropTableTask, context: DdlContext) -> Self { + let data = DropTableData::new(cluster_id, task); + let executor = data.build_executor(); Self { context, - data: DropTableData::new(cluster_id, task), + data, dropping_regions: vec![], + executor, } } pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { - let data = serde_json::from_str(json).context(FromJsonSnafu)?; + let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?; + let executor = data.build_executor(); Ok(Self { context, data, dropping_regions: vec![], + executor, }) } - pub(crate) async fn on_prepare<'a>(&mut self, executor: &DropTableExecutor) -> Result { - if executor.on_prepare(&self.context).await?.stop() { + pub(crate) async fn on_prepare<'a>(&mut self) -> Result { + if self.executor.on_prepare(&self.context).await?.stop() { return Ok(Status::done()); } self.fill_table_metadata().await?; - self.data.state = DropTableState::RemoveMetadata; + self.data.state = DropTableState::DeleteMetadata; Ok(Status::executing(true)) } /// Register dropping regions if doesn't exist. fn register_dropping_regions(&mut self) -> Result<()> { - // Safety: filled in `on_prepare`. - let region_routes = self.data.region_routes().unwrap()?; - - let dropping_regions = operating_leader_regions(region_routes); + let dropping_regions = operating_leader_regions(&self.data.physical_region_routes); - if self.dropping_regions.len() == dropping_regions.len() { + if !self.dropping_regions.is_empty() { return Ok(()); } @@ -109,7 +113,7 @@ impl DropTableProcedure { } /// Removes the table metadata. - async fn on_remove_metadata(&mut self, executor: &DropTableExecutor) -> Result { + pub(crate) async fn on_delete_metadata(&mut self) -> Result { self.register_dropping_regions()?; // NOTES: If the meta server is crashed after the `RemoveMetadata`, // Corresponding regions of this table on the Datanode will be closed automatically. @@ -117,12 +121,15 @@ impl DropTableProcedure { // TODO(weny): Considers introducing a RegionStatus to indicate the region is dropping. let table_id = self.data.table_id(); - executor - .on_remove_metadata( - &self.context, - // Safety: filled in `on_prepare`. - self.data.region_routes().unwrap()?, - ) + let table_route_value = &TableRouteValue::new( + self.data.task.table_id, + // Safety: checked + self.data.physical_table_id.unwrap(), + self.data.physical_region_routes.clone(), + ); + // Deletes table metadata logically. + self.executor + .on_delete_metadata(&self.context, table_route_value) .await?; info!("Deleted table metadata for table {table_id}"); self.data.state = DropTableState::InvalidateTableCache; @@ -130,30 +137,33 @@ impl DropTableProcedure { } /// Broadcasts invalidate table cache instruction. - async fn on_broadcast(&mut self, executor: &DropTableExecutor) -> Result { - executor.invalidate_table_cache(&self.context).await?; + async fn on_broadcast(&mut self) -> Result { + self.executor.invalidate_table_cache(&self.context).await?; self.data.state = DropTableState::DatanodeDropRegions; Ok(Status::executing(true)) } - pub async fn on_datanode_drop_regions(&self, executor: &DropTableExecutor) -> Result { - executor - .on_drop_regions( - &self.context, - // Safety: filled in `on_prepare`. - self.data.region_routes().unwrap()?, - ) + pub async fn on_datanode_drop_regions(&mut self) -> Result { + self.executor + .on_drop_regions(&self.context, &self.data.physical_region_routes) .await?; - Ok(Status::done()) + self.data.state = DropTableState::DeleteTombstone; + Ok(Status::executing(true)) } - pub(crate) fn executor(&self) -> DropTableExecutor { - DropTableExecutor::new( - self.data.task.table_name(), - self.data.table_id(), - self.data.task.drop_if_exists, - ) + /// Deletes metadata tombstone. + async fn on_delete_metadata_tombstone(&self) -> Result { + let table_route_value = &TableRouteValue::new( + self.data.task.table_id, + // Safety: checked + self.data.physical_table_id.unwrap(), + self.data.physical_region_routes.clone(), + ); + self.executor + .on_delete_metadata_tombstone(&self.context, table_route_value) + .await?; + Ok(Status::done()) } } @@ -164,17 +174,17 @@ impl Procedure for DropTableProcedure { } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { - let executor = self.executor(); let state = &self.data.state; let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE .with_label_values(&[state.as_ref()]) .start_timer(); match self.data.state { - DropTableState::Prepare => self.on_prepare(&executor).await, - DropTableState::RemoveMetadata => self.on_remove_metadata(&executor).await, - DropTableState::InvalidateTableCache => self.on_broadcast(&executor).await, - DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions(&executor).await, + DropTableState::Prepare => self.on_prepare().await, + DropTableState::DeleteMetadata => self.on_delete_metadata().await, + DropTableState::InvalidateTableCache => self.on_broadcast().await, + DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await, + DropTableState::DeleteTombstone => self.on_delete_metadata_tombstone().await, } .map_err(handle_retry_error) } @@ -194,6 +204,28 @@ impl Procedure for DropTableProcedure { LockKey::new(lock_key) } + + fn rollback_supported(&self) -> bool { + !matches!(self.data.state, DropTableState::Prepare) + } + + async fn rollback(&mut self, _: &ProcedureContext) -> ProcedureResult<()> { + warn!( + "Rolling back the drop table procedure, table: {}", + self.data.table_id() + ); + + let table_route_value = &TableRouteValue::new( + self.data.task.table_id, + // Safety: checked + self.data.physical_table_id.unwrap(), + self.data.physical_region_routes.clone(), + ); + self.executor + .on_restore_metadata(&self.context, table_route_value) + .await + .map_err(ProcedureError::external) + } } #[derive(Debug, Serialize, Deserialize)] @@ -201,8 +233,8 @@ pub struct DropTableData { pub state: DropTableState, pub cluster_id: u64, pub task: DropTableTask, - pub table_route_value: Option>, - pub table_info_value: Option>, + pub physical_region_routes: Vec, + pub physical_table_id: Option, } impl DropTableData { @@ -211,8 +243,8 @@ impl DropTableData { state: DropTableState::Prepare, cluster_id, task, - table_route_value: None, - table_info_value: None, + physical_region_routes: vec![], + physical_table_id: None, } } @@ -220,13 +252,17 @@ impl DropTableData { self.task.table_ref() } - fn region_routes(&self) -> Option>> { - self.table_route_value.as_ref().map(|v| v.region_routes()) - } - fn table_id(&self) -> TableId { self.task.table_id } + + fn build_executor(&self) -> DropTableExecutor { + DropTableExecutor::new( + self.task.table_name(), + self.task.table_id, + self.task.drop_if_exists, + ) + } } /// The state of drop table. @@ -234,10 +270,12 @@ impl DropTableData { pub enum DropTableState { /// Prepares to drop the table Prepare, - /// Removes metadata - RemoveMetadata, + /// Deletes metadata logically + DeleteMetadata, /// Invalidates Table Cache InvalidateTableCache, /// Drops regions on Datanode DatanodeDropRegions, + /// Deletes metadata tombstone permanently + DeleteTombstone, } diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 37ca7c20c4e8..8be31555533c 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -30,6 +30,7 @@ use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::instruction::CacheIdent; use crate::key::table_name::TableNameKey; +use crate::key::table_route::TableRouteValue; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; use crate::table_name::TableName; @@ -99,14 +100,73 @@ impl DropTableExecutor { Ok(Control::Continue(())) } - /// Removes the table metadata. - pub async fn on_remove_metadata( + /// Deletes the table metadata **logically**. + pub async fn on_delete_metadata( &self, ctx: &DdlContext, - region_routes: &[RegionRoute], + table_route_value: &TableRouteValue, + ) -> Result<()> { + let table_name_key = TableNameKey::new( + &self.table.catalog_name, + &self.table.schema_name, + &self.table.table_name, + ); + if !ctx + .table_metadata_manager + .table_name_manager() + .exists(table_name_key) + .await? + { + return Ok(()); + } + ctx.table_metadata_manager + .delete_table_metadata(self.table_id, &self.table, table_route_value) + .await + } + + /// Deletes the table metadata tombstone **permanently**. + pub async fn on_delete_metadata_tombstone( + &self, + ctx: &DdlContext, + table_route_value: &TableRouteValue, + ) -> Result<()> { + ctx.table_metadata_manager + .delete_table_metadata_tombstone(self.table_id, &self.table, table_route_value) + .await + } + + /// Deletes metadata for table **permanently**. + pub async fn on_destroy_metadata( + &self, + ctx: &DdlContext, + table_route_value: &TableRouteValue, ) -> Result<()> { ctx.table_metadata_manager - .delete_table_metadata(self.table_id, &self.table, region_routes) + .destroy_table_metadata(self.table_id, &self.table, table_route_value) + .await + } + + /// Restores the table metadata. + pub async fn on_restore_metadata( + &self, + ctx: &DdlContext, + table_route_value: &TableRouteValue, + ) -> Result<()> { + let table_name_key = TableNameKey::new( + &self.table.catalog_name, + &self.table.schema_name, + &self.table.table_name, + ); + if ctx + .table_metadata_manager + .table_name_manager() + .exists(table_name_key) + .await? + { + return Ok(()); + } + ctx.table_metadata_manager + .restore_table_metadata(self.table_id, &self.table, table_route_value) .await } diff --git a/src/common/meta/src/ddl/drop_table/metadata.rs b/src/common/meta/src/ddl/drop_table/metadata.rs index e3596eb5837a..52d82a003c2c 100644 --- a/src/common/meta/src/ddl/drop_table/metadata.rs +++ b/src/common/meta/src/ddl/drop_table/metadata.rs @@ -12,35 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_catalog::format_full_table_name; -use snafu::OptionExt; - use crate::ddl::drop_table::DropTableProcedure; -use crate::error::{self, Result}; +use crate::error::Result; impl DropTableProcedure { - /// Fetches the table info and table route. + /// Fetches the table info and physical table route. pub(crate) async fn fill_table_metadata(&mut self) -> Result<()> { let task = &self.data.task; - let table_info_value = self - .context - .table_metadata_manager - .table_info_manager() - .get(task.table_id) - .await? - .with_context(|| error::TableInfoNotFoundSnafu { - table: format_full_table_name(&task.catalog, &task.schema, &task.table), - })?; - let (_, table_route_value) = self + let (physical_table_id, physical_table_route_value) = self .context .table_metadata_manager .table_route_manager() - .table_route_storage() - .get_raw_physical_table_route(task.table_id) + .get_physical_table_route(task.table_id) .await?; - self.data.table_info_value = Some(table_info_value); - self.data.table_route_value = Some(table_route_value); + self.data.physical_region_routes = physical_table_route_value.region_routes; + self.data.physical_table_id = Some(physical_table_id); + Ok(()) } } diff --git a/src/common/meta/src/ddl/tests/drop_table.rs b/src/common/meta/src/ddl/tests/drop_table.rs index 9908f604a427..f422c853f061 100644 --- a/src/common/meta/src/ddl/tests/drop_table.rs +++ b/src/common/meta/src/ddl/tests/drop_table.rs @@ -19,17 +19,25 @@ use api::v1::region::{region_request, RegionRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; +use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId}; +use common_procedure_test::MockContextProvider; use store_api::storage::RegionId; use tokio::sync::mpsc; +use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::test_util::create_table::test_create_table_task; -use crate::ddl::test_util::datanode_handler::DatanodeWatcher; +use crate::ddl::test_util::datanode_handler::{DatanodeWatcher, NaiveDatanodeHandler}; +use crate::ddl::test_util::{ + create_physical_table_metadata, test_create_logical_table_task, test_create_physical_table_task, +}; +use crate::ddl::{TableMetadata, TableMetadataAllocatorContext}; use crate::key::table_route::TableRouteValue; +use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; use crate::rpc::ddl::DropTableTask; use crate::rpc::router::{Region, RegionRoute}; -use crate::test_util::{new_ddl_context, MockDatanodeManager}; +use crate::test_util::{new_ddl_context, new_ddl_context_with_kv_backend, MockDatanodeManager}; #[tokio::test] async fn test_on_prepare_table_not_exists_err() { @@ -59,8 +67,7 @@ async fn test_on_prepare_table_not_exists_err() { }; let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context); - let executor = procedure.executor(); - let err = procedure.on_prepare(&executor).await.unwrap_err(); + let err = procedure.on_prepare().await.unwrap_err(); assert_eq!(err.status_code(), StatusCode::TableNotFound); } @@ -93,8 +100,7 @@ async fn test_on_prepare_table() { // Drop if exists let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone()); - let executor = procedure.executor(); - procedure.on_prepare(&executor).await.unwrap(); + procedure.on_prepare().await.unwrap(); let task = DropTableTask { catalog: DEFAULT_CATALOG_NAME.to_string(), @@ -106,8 +112,7 @@ async fn test_on_prepare_table() { // Drop table let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context); - let executor = procedure.executor(); - procedure.on_prepare(&executor).await.unwrap(); + procedure.on_prepare().await.unwrap(); } #[tokio::test] @@ -162,9 +167,8 @@ async fn test_on_datanode_drop_regions() { }; // Drop table let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context); - let executor = procedure.executor(); - procedure.on_prepare(&executor).await.unwrap(); - procedure.on_datanode_drop_regions(&executor).await.unwrap(); + procedure.on_prepare().await.unwrap(); + procedure.on_datanode_drop_regions().await.unwrap(); let check = |peer: Peer, request: RegionRequest, @@ -191,3 +195,97 @@ async fn test_on_datanode_drop_regions() { let (peer, request) = results.remove(0); check(peer, request, 3, RegionId::new(table_id, 3)); } + +#[tokio::test] +async fn test_on_rollback() { + let datanode_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let ddl_context = new_ddl_context_with_kv_backend(datanode_manager, kv_backend.clone()); + let cluster_id = 1; + // Prepares physical table metadata. + let mut create_physical_table_task = test_create_physical_table_task("phy_table"); + let TableMetadata { + table_id, + table_route, + .. + } = ddl_context + .table_metadata_allocator + .create( + &TableMetadataAllocatorContext { cluster_id }, + &create_physical_table_task, + ) + .await + .unwrap(); + create_physical_table_task.set_table_id(table_id); + create_physical_table_metadata( + &ddl_context, + create_physical_table_task.table_info.clone(), + TableRouteValue::Physical(table_route), + ) + .await; + // The create logical table procedure. + let physical_table_id = table_id; + // Creates the logical table metadata. + let task = test_create_logical_table_task("foo"); + let mut procedure = CreateLogicalTablesProcedure::new( + cluster_id, + vec![task], + physical_table_id, + ddl_context.clone(), + ); + procedure.on_prepare().await.unwrap(); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + procedure.execute(&ctx).await.unwrap(); + // Triggers procedure to create table metadata + let status = procedure.execute(&ctx).await.unwrap(); + let table_ids = status.downcast_output_ref::>().unwrap(); + assert_eq!(*table_ids, vec![1025]); + + let expected_kvs = kv_backend.dump(); + // Drops the physical table + { + let task = DropTableTask { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: "phy_table".to_string(), + table_id: physical_table_id, + drop_if_exists: false, + }; + let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone()); + procedure.on_prepare().await.unwrap(); + procedure.on_delete_metadata().await.unwrap(); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + procedure.rollback(&ctx).await.unwrap(); + // Rollback again + procedure.rollback(&ctx).await.unwrap(); + let kvs = kv_backend.dump(); + assert_eq!(kvs, expected_kvs); + } + + // Drops the logical table + let task = DropTableTask { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: "foo".to_string(), + table_id: table_ids[0], + drop_if_exists: false, + }; + let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone()); + procedure.on_prepare().await.unwrap(); + procedure.on_delete_metadata().await.unwrap(); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + procedure.rollback(&ctx).await.unwrap(); + // Rollback again + procedure.rollback(&ctx).await.unwrap(); + let kvs = kv_backend.dump(); + assert_eq!(kvs, expected_kvs); +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 87ee269946cf..0d23a1dd26f7 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -421,6 +421,9 @@ pub enum Error { #[snafu(display("Invalid role: {}", role))] InvalidRole { role: i32, location: Location }, + #[snafu(display("Atomic key changed: {err_msg}"))] + CasKeyChanged { err_msg: String, location: Location }, + #[snafu(display("Failed to parse {} from utf8", name))] FromUtf8 { name: String, @@ -440,7 +443,8 @@ impl ErrorExt for Error { | EtcdTxnOpResponse { .. } | EtcdFailed { .. } | EtcdTxnFailed { .. } - | ConnectEtcd { .. } => StatusCode::Internal, + | ConnectEtcd { .. } + | CasKeyChanged { .. } => StatusCode::Internal, SerdeJson { .. } | ParseOption { .. } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index ca2b5984b495..a38387198b09 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -56,9 +56,12 @@ pub mod table_region; pub mod table_route; #[cfg(any(test, feature = "testing"))] pub mod test_utils; +// TODO(weny): remove it. +#[allow(dead_code)] +mod tombstone; mod txn_helper; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Debug; use std::ops::Deref; use std::sync::Arc; @@ -83,9 +86,13 @@ use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue}; use self::datanode_table::RegionInfo; use self::schema_name::{SchemaManager, SchemaNameKey, SchemaNameValue}; use self::table_route::{TableRouteManager, TableRouteValue}; +use self::tombstone::TombstoneManager; use crate::ddl::utils::region_storage_path; -use crate::error::{self, Result, SerdeJsonSnafu}; -use crate::kv_backend::txn::{Txn, TxnOpResponse}; +use crate::error::{self, Result, SerdeJsonSnafu, UnexpectedSnafu}; +use crate::key::table_route::TableRouteKey; +use crate::key::tombstone::Key; +use crate::key::txn_helper::TxnOpGetResponseSet; +use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse}; use crate::kv_backend::KvBackendRef; use crate::rpc::router::{region_distribution, RegionRoute, RegionStatus}; use crate::table_name::TableName; @@ -97,7 +104,6 @@ pub const MAINTENANCE_KEY: &str = "maintenance"; const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; const TABLE_REGION_KEY_PREFIX: &str = "__table_region"; -pub const REMOVED_PREFIX: &str = "__removed"; pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name"; pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name"; @@ -145,6 +151,33 @@ pub trait TableMetaKey { fn as_raw_key(&self) -> Vec; } +pub(crate) trait TableMetaKeyGetTxnOp { + fn build_get_op( + &self, + ) -> ( + TxnOp, + impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option>, + ); +} + +impl TableMetaKey for String { + fn as_raw_key(&self) -> Vec { + self.as_bytes().to_vec() + } +} + +impl TableMetaKeyGetTxnOp for String { + fn build_get_op( + &self, + ) -> ( + TxnOp, + impl for<'a> FnMut(&'a mut TxnOpGetResponseSet) -> Option>, + ) { + let key = self.as_raw_key(); + (TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key)) + } +} + pub trait TableMetaValue { fn try_from_raw_value(raw_value: &[u8]) -> Result where @@ -162,6 +195,7 @@ pub struct TableMetadataManager { catalog_manager: CatalogManager, schema_manager: SchemaManager, table_route_manager: TableRouteManager, + tombstone_manager: TombstoneManager, kv_backend: KvBackendRef, } @@ -303,6 +337,7 @@ impl TableMetadataManager { catalog_manager: CatalogManager::new(kv_backend.clone()), schema_manager: SchemaManager::new(kv_backend.clone()), table_route_manager: TableRouteManager::new(kv_backend.clone()), + tombstone_manager: TombstoneManager::new(kv_backend.clone()), kv_backend, } } @@ -363,19 +398,16 @@ impl TableMetadataManager { Option>, Option>, )> { - let (get_table_route_txn, table_route_decoder) = self - .table_route_manager - .table_route_storage() - .build_get_txn(table_id); - let (get_table_info_txn, table_info_decoder) = - self.table_info_manager.build_get_txn(table_id); - - let txn = Txn::merge_all(vec![get_table_route_txn, get_table_info_txn]); - let res = self.kv_backend.txn(txn).await?; - - let table_info_value = table_info_decoder(&res.responses)?; - let table_route_value = table_route_decoder(&res.responses)?; - + let table_info_key = TableInfoKey::new(table_id); + let table_route_key = TableRouteKey::new(table_id); + let (table_info_txn, table_info_filter) = table_info_key.build_get_op(); + let (table_route_txn, table_route_filter) = table_route_key.build_get_op(); + + let txn = Txn::new().and_then(vec![table_info_txn, table_route_txn]); + let mut res = self.kv_backend.txn(txn).await?; + let mut set = TxnOpGetResponseSet::from(&mut res.responses); + let table_info_value = TxnOpGetResponseSet::decode_with(table_info_filter)(&mut set)?; + let table_route_value = TxnOpGetResponseSet::decode_with(table_route_filter)(&mut set)?; Ok((table_info_value, table_route_value)) } @@ -545,47 +577,106 @@ impl TableMetadataManager { Ok(()) } - /// Deletes metadata for table. - /// The caller MUST ensure it has the exclusive access to `TableNameKey`. - pub async fn delete_table_metadata( + fn table_metadata_keys( &self, table_id: TableId, table_name: &TableName, - region_routes: &[RegionRoute], - ) -> Result<()> { - // Deletes table name. + table_route_value: &TableRouteValue, + ) -> Result> { + // Builds keys + let datanode_ids = if table_route_value.is_physical() { + region_distribution(table_route_value.region_routes()?) + .into_keys() + .collect() + } else { + vec![] + }; + let mut keys = Vec::with_capacity(3 + datanode_ids.len()); let table_name = TableNameKey::new( &table_name.catalog_name, &table_name.schema_name, &table_name.table_name, ); + let table_info_key = TableInfoKey::new(table_id); + let table_route_key = TableRouteKey::new(table_id); + let datanode_table_keys = datanode_ids + .into_iter() + .map(|datanode_id| DatanodeTableKey::new(datanode_id, table_id)) + .collect::>(); + + keys.push(Key::compare_and_swap(table_name.as_raw_key())); + keys.push(Key::new(table_info_key.as_raw_key())); + keys.push(Key::new(table_route_key.as_raw_key())); + for key in &datanode_table_keys { + keys.push(Key::new(key.as_raw_key())); + } + Ok(keys) + } - let delete_table_name_txn = self.table_name_manager().build_delete_txn(&table_name)?; - - // Deletes table info. - let delete_table_info_txn = self.table_info_manager().build_delete_txn(table_id)?; - - // Deletes datanode table key value pairs. - let distribution = region_distribution(region_routes); - let delete_datanode_txn = self - .datanode_table_manager() - .build_delete_txn(table_id, distribution)?; + /// Deletes metadata for table **logically**. + /// The caller MUST ensure it has the exclusive access to `TableNameKey`. + pub async fn delete_table_metadata( + &self, + table_id: TableId, + table_name: &TableName, + table_route_value: &TableRouteValue, + ) -> Result<()> { + let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?; + self.tombstone_manager.create(keys).await?; + Ok(()) + } - // Deletes table route. - let delete_table_route_txn = self - .table_route_manager() - .table_route_storage() - .build_delete_txn(table_id)?; + /// Deletes metadata tombstone for table **permanently**. + /// The caller MUST ensure it has the exclusive access to `TableNameKey`. + pub async fn delete_table_metadata_tombstone( + &self, + table_id: TableId, + table_name: &TableName, + table_route_value: &TableRouteValue, + ) -> Result<()> { + let keys = self + .table_metadata_keys(table_id, table_name, table_route_value)? + .into_iter() + .map(|key| key.into_bytes()) + .collect::>(); + self.tombstone_manager.delete(keys).await + } - let txn = Txn::merge_all(vec![ - delete_table_name_txn, - delete_table_info_txn, - delete_datanode_txn, - delete_table_route_txn, - ]); + /// Restores metadata for table. + /// The caller MUST ensure it has the exclusive access to `TableNameKey`. + pub async fn restore_table_metadata( + &self, + table_id: TableId, + table_name: &TableName, + table_route_value: &TableRouteValue, + ) -> Result<()> { + let keys = self.table_metadata_keys(table_id, table_name, table_route_value)?; + self.tombstone_manager.restore(keys).await?; + Ok(()) + } - // It's always successes. - let _ = self.kv_backend.txn(txn).await?; + /// Deletes metadata for table **permanently**. + /// The caller MUST ensure it has the exclusive access to `TableNameKey`. + pub async fn destroy_table_metadata( + &self, + table_id: TableId, + table_name: &TableName, + table_route_value: &TableRouteValue, + ) -> Result<()> { + let operations = self + .table_metadata_keys(table_id, table_name, table_route_value)? + .into_iter() + .map(|key| TxnOp::Delete(key.into_bytes())) + .collect::>(); + + let txn = Txn::new().and_then(operations); + let resp = self.kv_backend.txn(txn).await?; + ensure!( + resp.succeeded, + UnexpectedSnafu { + err_msg: format!("Failed to destroy table metadata: {table_id}") + } + ); Ok(()) } @@ -873,6 +964,38 @@ macro_rules! impl_table_meta_value { } } +macro_rules! impl_table_meta_key_get_txn_op { + ($($key: ty), *) => { + $( + impl $crate::key::TableMetaKeyGetTxnOp for $key { + /// Returns a [TxnOp] to retrieve the corresponding value + /// and a filter to retrieve the value from the [TxnOpGetResponseSet] + fn build_get_op( + &self, + ) -> ( + TxnOp, + impl for<'a> FnMut( + &'a mut TxnOpGetResponseSet, + ) -> Option>, + ) { + let raw_key = self.as_raw_key(); + ( + TxnOp::Get(raw_key.clone()), + TxnOpGetResponseSet::filter(raw_key), + ) + } + } + )* + } +} + +impl_table_meta_key_get_txn_op! { + TableNameKey<'_>, + TableInfoKey, + TableRouteKey, + DatanodeTableKey +} + #[macro_export] macro_rules! impl_optional_meta_value { ($($val_ty: ty), *) => { @@ -907,6 +1030,7 @@ mod tests { use std::sync::Arc; use bytes::Bytes; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_time::util::current_time_millis; use futures::TryStreamExt; use store_api::storage::RegionId; @@ -914,6 +1038,7 @@ mod tests { use super::datanode_table::DatanodeTableKey; use super::test_utils; + use crate::ddl::test_util::create_table::test_create_table_task; use crate::ddl::utils::region_storage_path; use crate::error::Result; use crate::key::datanode_table::RegionInfo; @@ -1155,15 +1280,10 @@ mod tests { table_info.schema_name, table_info.name, ); + let table_route_value = &TableRouteValue::physical(region_routes.clone()); // deletes metadata. table_metadata_manager - .delete_table_metadata(table_id, &table_name, region_routes) - .await - .unwrap(); - - // if metadata was already deleted, it should be ok. - table_metadata_manager - .delete_table_metadata(table_id, &table_name, region_routes) + .delete_table_metadata(table_id, &table_name, table_route_value) .await .unwrap(); @@ -1559,4 +1679,118 @@ mod tests { .await .is_err()); } + + #[tokio::test] + async fn test_destroy_table_metadata() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); + let table_id = 1025; + let table_name = "foo"; + let task = test_create_table_task(table_name, table_id); + let options = [(0, "test".to_string())].into(); + table_metadata_manager + .create_table_metadata( + task.table_info, + TableRouteValue::physical(vec![ + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(5)], + leader_status: None, + leader_down_since: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 2)), + leader_peer: Some(Peer::empty(2)), + follower_peers: vec![Peer::empty(4)], + leader_status: None, + leader_down_since: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 3)), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![], + leader_status: None, + leader_down_since: None, + }, + ]), + options, + ) + .await + .unwrap(); + let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name); + let table_route_value = table_metadata_manager + .table_route_manager + .table_route_storage() + .get_raw(table_id) + .await + .unwrap() + .unwrap(); + table_metadata_manager + .destroy_table_metadata(table_id, &table_name, &table_route_value) + .await + .unwrap(); + assert!(mem_kv.is_empty()); + } + + #[tokio::test] + async fn test_restore_table_metadata() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); + let table_id = 1025; + let table_name = "foo"; + let task = test_create_table_task(table_name, table_id); + let options = [(0, "test".to_string())].into(); + table_metadata_manager + .create_table_metadata( + task.table_info, + TableRouteValue::physical(vec![ + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(5)], + leader_status: None, + leader_down_since: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 2)), + leader_peer: Some(Peer::empty(2)), + follower_peers: vec![Peer::empty(4)], + leader_status: None, + leader_down_since: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 3)), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![], + leader_status: None, + leader_down_since: None, + }, + ]), + options, + ) + .await + .unwrap(); + let expected_result = mem_kv.dump(); + let table_route_value = table_metadata_manager + .table_route_manager + .table_route_storage() + .get_raw(table_id) + .await + .unwrap() + .unwrap(); + let region_routes = table_route_value.region_routes().unwrap(); + let table_name = TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name); + let table_route_value = TableRouteValue::physical(region_routes.clone()); + table_metadata_manager + .delete_table_metadata(table_id, &table_name, &table_route_value) + .await + .unwrap(); + table_metadata_manager + .restore_table_metadata(table_id, &table_name, &table_route_value) + .await + .unwrap(); + let kvs = mem_kv.dump(); + assert_eq!(kvs, expected_result); + } } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 9b5ec097b59b..82ddebc3206b 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -55,6 +55,7 @@ pub struct RegionInfo { pub region_wal_options: HashMap, } +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct DatanodeTableKey { pub datanode_id: DatanodeId, pub table_id: TableId, diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index a96142ded147..f0fe71b82699 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -18,10 +18,11 @@ use serde::{Deserialize, Serialize}; use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; -use super::{txn_helper, DeserializedValueWithBytes, TableMetaValue, TABLE_INFO_KEY_PREFIX}; use crate::error::Result; -use crate::key::TableMetaKey; -use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse}; +use crate::key::{ + txn_helper, DeserializedValueWithBytes, TableMetaKey, TableMetaValue, TABLE_INFO_KEY_PREFIX, +}; +use crate::kv_backend::txn::{Txn, TxnOpResponse}; use crate::kv_backend::KvBackendRef; use crate::rpc::store::BatchGetRequest; use crate::table_name::TableName; @@ -101,20 +102,6 @@ impl TableInfoManager { Self { kv_backend } } - pub(crate) fn build_get_txn( - &self, - table_id: TableId, - ) -> ( - Txn, - impl FnOnce(&Vec) -> Result>>, - ) { - let key = TableInfoKey::new(table_id); - let raw_key = key.as_raw_key(); - let txn = Txn::new().and_then(vec![TxnOp::Get(raw_key.clone())]); - - (txn, txn_helper::build_txn_response_decoder_fn(raw_key)) - } - /// Builds a create table info transaction, it expected the `__table_info/{table_id}` wasn't occupied. pub(crate) fn build_create_txn( &self, @@ -156,16 +143,6 @@ impl TableInfoManager { Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key))) } - /// Builds a delete table info transaction. - pub(crate) fn build_delete_txn(&self, table_id: TableId) -> Result { - let key = TableInfoKey::new(table_id); - let raw_key = key.as_raw_key(); - - let txn = Txn::new().and_then(vec![TxnOp::Delete(raw_key)]); - - Ok(txn) - } - pub async fn get( &self, table_id: TableId, diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index 75b5b86cffd1..83e1cb7fb254 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -194,14 +194,6 @@ impl TableNameManager { Ok(txn) } - /// Builds a delete table name transaction. It only executes while the primary keys comparing successes. - pub(crate) fn build_delete_txn(&self, key: &TableNameKey<'_>) -> Result { - let raw_key = key.as_raw_key(); - let txn = Txn::new().and_then(vec![TxnOp::Delete(raw_key)]); - - Ok(txn) - } - pub async fn get(&self, key: TableNameKey<'_>) -> Result> { let raw_key = key.as_raw_key(); self.kv_backend diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 3a87f2eb9df1..15b2c4f69ac1 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -26,7 +26,7 @@ use crate::error::{ UnexpectedLogicalRouteTableSnafu, }; use crate::key::{RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX}; -use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse}; +use crate::kv_backend::txn::{Txn, TxnOpResponse}; use crate::kv_backend::KvBackendRef; use crate::rpc::router::{region_distribution, RegionRoute}; use crate::rpc::store::BatchGetRequest; @@ -61,6 +61,27 @@ pub struct LogicalTableRouteValue { } impl TableRouteValue { + /// Returns a [TableRouteValue::Physical] if `table_id` equals `physical_table_id`. + /// Otherwise returns a [TableRouteValue::Logical]. + pub(crate) fn new( + table_id: TableId, + physical_table_id: TableId, + region_routes: Vec, + ) -> Self { + if table_id == physical_table_id { + TableRouteValue::physical(region_routes) + } else { + let region_routes = region_routes + .into_iter() + .map(|region| { + debug_assert_eq!(region.region.id.table_id(), physical_table_id); + RegionId::new(table_id, region.region.id.region_number()) + }) + .collect::>(); + TableRouteValue::logical(physical_table_id, region_routes) + } + } + pub fn physical(region_routes: Vec) -> Self { Self::Physical(PhysicalTableRouteValue::new(region_routes)) } @@ -425,21 +446,6 @@ impl TableRouteStorage { Self { kv_backend } } - /// Builds a get table route transaction(readonly). - pub(crate) fn build_get_txn( - &self, - table_id: TableId, - ) -> ( - Txn, - impl FnOnce(&Vec) -> Result>>, - ) { - let key = TableRouteKey::new(table_id); - let raw_key = key.as_raw_key(); - let txn = Txn::new().and_then(vec![TxnOp::Get(raw_key.clone())]); - - (txn, txn_helper::build_txn_response_decoder_fn(raw_key)) - } - /// Builds a create table route transaction, /// it expected the `__table_route/{table_id}` wasn't occupied. pub fn build_create_txn( @@ -483,17 +489,6 @@ impl TableRouteStorage { Ok((txn, txn_helper::build_txn_response_decoder_fn(raw_key))) } - /// Builds a delete table route transaction, - /// it expected the remote value equals the `table_route_value`. - pub(crate) fn build_delete_txn(&self, table_id: TableId) -> Result { - let key = TableRouteKey::new(table_id); - let raw_key = key.as_raw_key(); - - let txn = Txn::new().and_then(vec![TxnOp::Delete(raw_key)]); - - Ok(txn) - } - /// Returns the [`TableRouteValue`]. pub async fn get(&self, table_id: TableId) -> Result> { let key = TableRouteKey::new(table_id); diff --git a/src/common/meta/src/key/tombstone.rs b/src/common/meta/src/key/tombstone.rs new file mode 100644 index 000000000000..045959b6828f --- /dev/null +++ b/src/common/meta/src/key/tombstone.rs @@ -0,0 +1,544 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::{ensure, OptionExt}; + +use super::TableMetaKeyGetTxnOp; +use crate::error::{self, Result}; +use crate::key::txn_helper::TxnOpGetResponseSet; +use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; +use crate::kv_backend::KvBackendRef; + +/// [TombstoneManager] provides the ability to: +/// - logically delete values +/// - restore the deleted values +pub(crate) struct TombstoneManager { + kv_backend: KvBackendRef, +} + +const TOMBSTONE_PREFIX: &str = "__tombstone/"; + +pub(crate) struct TombstoneKey(T); + +fn to_tombstone(key: &[u8]) -> Vec { + [TOMBSTONE_PREFIX.as_bytes(), key].concat() +} + +impl TombstoneKey<&Vec> { + /// Returns the origin key and tombstone key. + fn to_keys(&self) -> (Vec, Vec) { + let key = self.0; + let tombstone_key = to_tombstone(key); + (key.clone(), tombstone_key) + } + + /// Returns the origin key and tombstone key. + fn into_keys(self) -> (Vec, Vec) { + self.to_keys() + } + + /// Returns the tombstone key. + fn to_tombstone_key(&self) -> Vec { + let key = self.0; + to_tombstone(key) + } +} + +impl TableMetaKeyGetTxnOp for TombstoneKey<&Vec> { + fn build_get_op( + &self, + ) -> ( + TxnOp, + impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option>, + ) { + TxnOpGetResponseSet::build_get_op(to_tombstone(self.0)) + } +} + +/// The key used in the [TombstoneManager]. +pub(crate) struct Key { + bytes: Vec, + // Atomic Key: + // The value corresponding to the key remains consistent between two transactions. + atomic: bool, +} + +impl Key { + /// Returns a new atomic key. + pub(crate) fn compare_and_swap>>(key: T) -> Self { + Self { + bytes: key.into(), + atomic: true, + } + } + + /// Returns a new normal key. + pub(crate) fn new>>(key: T) -> Self { + Self { + bytes: key.into(), + atomic: false, + } + } + + /// Into bytes + pub(crate) fn into_bytes(self) -> Vec { + self.bytes + } + + fn get_inner(&self) -> &Vec { + &self.bytes + } + + fn is_atomic(&self) -> bool { + self.atomic + } +} + +impl TableMetaKeyGetTxnOp for Key { + fn build_get_op( + &self, + ) -> ( + TxnOp, + impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option>, + ) { + let key = self.get_inner().clone(); + (TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key)) + } +} + +fn format_on_failure_error_message Option>>( + mut set: TxnOpGetResponseSet, + on_failure_kv_and_filters: Vec<(Vec, Vec, F)>, +) -> String { + on_failure_kv_and_filters + .into_iter() + .flat_map(|(key, value, mut filter)| { + let got = filter(&mut set); + let Some(got) = got else { + return Some(format!( + "For key: {} was expected: {}, but value does not exists", + String::from_utf8_lossy(&key), + String::from_utf8_lossy(&value), + )); + }; + + if got != value { + Some(format!( + "For key: {} was expected: {}, but got: {}", + String::from_utf8_lossy(&key), + String::from_utf8_lossy(&value), + String::from_utf8_lossy(&got), + )) + } else { + None + } + }) + .collect::>() + .join("; ") +} + +fn format_keys(keys: &[Key]) -> String { + keys.iter() + .map(|key| String::from_utf8_lossy(&key.bytes)) + .collect::>() + .join(", ") +} + +impl TombstoneManager { + /// Returns [TombstoneManager]. + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Creates tombstones for keys. + /// + /// Preforms to: + /// - retrieve all values corresponding `keys`. + /// - stores tombstone values. + pub(crate) async fn create(&self, keys: Vec) -> Result<()> { + // Builds transaction to retrieve all values + let (operations, mut filters): (Vec<_>, Vec<_>) = + keys.iter().map(|key| key.build_get_op()).unzip(); + + let txn = Txn::new().and_then(operations); + let mut resp = self.kv_backend.txn(txn).await?; + ensure!( + resp.succeeded, + error::UnexpectedSnafu { + err_msg: format!( + "Failed to retrieves the metadata, keys: {}", + format_keys(&keys) + ), + } + ); + + let mut set = TxnOpGetResponseSet::from(&mut resp.responses); + // Builds the create tombstone transaction. + let mut tombstone_operations = Vec::with_capacity(keys.len() * 2); + let mut tombstone_comparison = vec![]; + let mut on_failure_operations = vec![]; + let mut on_failure_kv_and_filters = vec![]; + for (idx, key) in keys.iter().enumerate() { + let filter = &mut filters[idx]; + let value = filter(&mut set).with_context(|| error::UnexpectedSnafu { + err_msg: format!( + "Missing value, key: {}", + String::from_utf8_lossy(key.get_inner()) + ), + })?; + let (origin_key, tombstone_key) = TombstoneKey(key.get_inner()).into_keys(); + // Compares the atomic key. + if key.is_atomic() { + tombstone_comparison.push(Compare::with_not_exist_value( + tombstone_key.clone(), + CompareOp::Equal, + )); + tombstone_comparison.push(Compare::with_value( + origin_key.clone(), + CompareOp::Equal, + value.clone(), + )); + let (op, filter) = TxnOpGetResponseSet::build_get_op(origin_key.clone()); + on_failure_operations.push(op); + on_failure_kv_and_filters.push((origin_key.clone(), value.clone(), filter)); + } + tombstone_operations.push(TxnOp::Delete(origin_key)); + tombstone_operations.push(TxnOp::Put(tombstone_key, value)); + } + + let txn = if !tombstone_comparison.is_empty() { + Txn::new().when(tombstone_comparison) + } else { + Txn::new() + } + .and_then(tombstone_operations); + + let txn = if !on_failure_operations.is_empty() { + txn.or_else(on_failure_operations) + } else { + txn + }; + + let mut resp = self.kv_backend.txn(txn).await?; + // TODO(weny): add tests for atomic key changed. + if !resp.succeeded { + let set = TxnOpGetResponseSet::from(&mut resp.responses); + let err_msg = format_on_failure_error_message(set, on_failure_kv_and_filters); + return error::CasKeyChangedSnafu { err_msg }.fail(); + } + Ok(()) + } + + /// Restores tombstones for keys. + /// + /// Preforms to: + /// - retrieve all tombstone values corresponding `keys`. + /// - stores tombstone values. + pub(crate) async fn restore(&self, keys: Vec) -> Result<()> { + // Builds transaction to retrieve all tombstone values + let tombstone_keys = keys + .iter() + .map(|key| TombstoneKey(key.get_inner())) + .collect::>(); + let (operations, mut filters): (Vec<_>, Vec<_>) = + tombstone_keys.iter().map(|key| key.build_get_op()).unzip(); + + let txn = Txn::new().and_then(operations); + let mut resp = self.kv_backend.txn(txn).await?; + ensure!( + resp.succeeded, + error::UnexpectedSnafu { + err_msg: format!( + "Failed to retrieves the metadata, keys: {}", + format_keys(&keys) + ), + } + ); + + let mut set = TxnOpGetResponseSet::from(&mut resp.responses); + + // Builds the restore tombstone transaction. + let mut tombstone_operations = Vec::with_capacity(keys.len() * 2); + let mut tombstone_comparison = vec![]; + let mut on_failure_operations = vec![]; + let mut on_failure_kv_and_filters = vec![]; + for (idx, key) in keys.iter().enumerate() { + let filter = &mut filters[idx]; + let value = filter(&mut set).with_context(|| error::UnexpectedSnafu { + err_msg: format!( + "Missing value, key: {}", + String::from_utf8_lossy(key.get_inner()) + ), + })?; + let (origin_key, tombstone_key) = tombstone_keys[idx].to_keys(); + // Compares the atomic key. + if key.is_atomic() { + tombstone_comparison.push(Compare::with_not_exist_value( + origin_key.clone(), + CompareOp::Equal, + )); + tombstone_comparison.push(Compare::with_value( + tombstone_key.clone(), + CompareOp::Equal, + value.clone(), + )); + let (op, filter) = tombstone_keys[idx].build_get_op(); + on_failure_operations.push(op); + on_failure_kv_and_filters.push((tombstone_key.clone(), value.clone(), filter)); + } + tombstone_operations.push(TxnOp::Delete(tombstone_key)); + tombstone_operations.push(TxnOp::Put(origin_key, value)); + } + + let txn = if !tombstone_comparison.is_empty() { + Txn::new().when(tombstone_comparison) + } else { + Txn::new() + } + .and_then(tombstone_operations); + + let txn = if !on_failure_operations.is_empty() { + txn.or_else(on_failure_operations) + } else { + txn + }; + + let mut resp = self.kv_backend.txn(txn).await?; + // TODO(weny): add tests for atomic key changed. + if !resp.succeeded { + let set = TxnOpGetResponseSet::from(&mut resp.responses); + let err_msg = format_on_failure_error_message(set, on_failure_kv_and_filters); + return error::CasKeyChangedSnafu { err_msg }.fail(); + } + + Ok(()) + } + + /// Deletes tombstones for keys. + pub(crate) async fn delete(&self, keys: Vec>) -> Result<()> { + let operations = keys + .iter() + .map(|key| TxnOp::Delete(TombstoneKey(key).to_tombstone_key())) + .collect::>(); + + let txn = Txn::new().and_then(operations); + // Always success. + let _ = self.kv_backend.txn(txn).await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + use std::sync::Arc; + + use crate::key::tombstone::{Key, TombstoneKey, TombstoneManager}; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::KvBackend; + use crate::rpc::store::PutRequest; + + #[tokio::test] + async fn test_create_tombstone() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let tombstone_manager = TombstoneManager::new(kv_backend.clone()); + kv_backend + .put(PutRequest::new().with_key("bar").with_value("baz")) + .await + .unwrap(); + kv_backend + .put(PutRequest::new().with_key("foo").with_value("hi")) + .await + .unwrap(); + tombstone_manager + .create(vec![Key::compare_and_swap("bar"), Key::new("foo")]) + .await + .unwrap(); + assert!(!kv_backend.exists(b"bar").await.unwrap()); + assert!(!kv_backend.exists(b"foo").await.unwrap()); + assert_eq!( + kv_backend + .get(&TombstoneKey(&"bar".into()).to_tombstone_key()) + .await + .unwrap() + .unwrap() + .value, + b"baz" + ); + assert_eq!( + kv_backend + .get(&TombstoneKey(&"foo".into()).to_tombstone_key()) + .await + .unwrap() + .unwrap() + .value, + b"hi" + ); + assert_eq!(kv_backend.len(), 2); + } + + #[tokio::test] + async fn test_create_tombstone_without_atomic_key() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let tombstone_manager = TombstoneManager::new(kv_backend.clone()); + kv_backend + .put(PutRequest::new().with_key("bar").with_value("baz")) + .await + .unwrap(); + kv_backend + .put(PutRequest::new().with_key("foo").with_value("hi")) + .await + .unwrap(); + tombstone_manager + .create(vec![Key::new("bar"), Key::new("foo")]) + .await + .unwrap(); + assert!(!kv_backend.exists(b"bar").await.unwrap()); + assert!(!kv_backend.exists(b"foo").await.unwrap()); + assert_eq!( + kv_backend + .get(&TombstoneKey(&"bar".into()).to_tombstone_key()) + .await + .unwrap() + .unwrap() + .value, + b"baz" + ); + assert_eq!( + kv_backend + .get(&TombstoneKey(&"foo".into()).to_tombstone_key()) + .await + .unwrap() + .unwrap() + .value, + b"hi" + ); + assert_eq!(kv_backend.len(), 2); + } + + #[tokio::test] + async fn test_create_tombstone_origin_value_not_found_err() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let tombstone_manager = TombstoneManager::new(kv_backend.clone()); + + kv_backend + .put(PutRequest::new().with_key("bar").with_value("baz")) + .await + .unwrap(); + kv_backend + .put(PutRequest::new().with_key("foo").with_value("hi")) + .await + .unwrap(); + + let err = tombstone_manager + .create(vec![Key::compare_and_swap("bar"), Key::new("baz")]) + .await + .unwrap_err(); + assert!(err.to_string().contains("Missing value")); + } + + #[tokio::test] + async fn test_restore_tombstone() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let tombstone_manager = TombstoneManager::new(kv_backend.clone()); + kv_backend + .put(PutRequest::new().with_key("bar").with_value("baz")) + .await + .unwrap(); + kv_backend + .put(PutRequest::new().with_key("foo").with_value("hi")) + .await + .unwrap(); + let expected_kvs = kv_backend.dump(); + tombstone_manager + .create(vec![Key::compare_and_swap("bar"), Key::new("foo")]) + .await + .unwrap(); + tombstone_manager + .restore(vec![Key::compare_and_swap("bar"), Key::new("foo")]) + .await + .unwrap(); + assert_eq!(expected_kvs, kv_backend.dump()); + } + + #[tokio::test] + async fn test_restore_tombstone_without_atomic_key() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let tombstone_manager = TombstoneManager::new(kv_backend.clone()); + kv_backend + .put(PutRequest::new().with_key("bar").with_value("baz")) + .await + .unwrap(); + kv_backend + .put(PutRequest::new().with_key("foo").with_value("hi")) + .await + .unwrap(); + let expected_kvs = kv_backend.dump(); + tombstone_manager + .create(vec![Key::compare_and_swap("bar"), Key::new("foo")]) + .await + .unwrap(); + tombstone_manager + .restore(vec![Key::new("bar"), Key::new("foo")]) + .await + .unwrap(); + assert_eq!(expected_kvs, kv_backend.dump()); + } + + #[tokio::test] + async fn test_restore_tombstone_origin_value_not_found_err() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let tombstone_manager = TombstoneManager::new(kv_backend.clone()); + kv_backend + .put(PutRequest::new().with_key("bar").with_value("baz")) + .await + .unwrap(); + kv_backend + .put(PutRequest::new().with_key("foo").with_value("hi")) + .await + .unwrap(); + tombstone_manager + .create(vec![Key::compare_and_swap("bar"), Key::new("foo")]) + .await + .unwrap(); + let err = tombstone_manager + .restore(vec![Key::new("bar"), Key::new("baz")]) + .await + .unwrap_err(); + assert!(err.to_string().contains("Missing value")); + } + + #[tokio::test] + async fn test_delete_tombstone() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let tombstone_manager = TombstoneManager::new(kv_backend.clone()); + kv_backend + .put(PutRequest::new().with_key("bar").with_value("baz")) + .await + .unwrap(); + kv_backend + .put(PutRequest::new().with_key("foo").with_value("hi")) + .await + .unwrap(); + tombstone_manager + .create(vec![Key::compare_and_swap("bar"), Key::new("foo")]) + .await + .unwrap(); + tombstone_manager + .delete(vec![b"bar".to_vec(), b"foo".to_vec()]) + .await + .unwrap(); + assert!(kv_backend.is_empty()); + } +} diff --git a/src/common/meta/src/key/txn_helper.rs b/src/common/meta/src/key/txn_helper.rs index d147aa26d87d..b06c8acee98c 100644 --- a/src/common/meta/src/key/txn_helper.rs +++ b/src/common/meta/src/key/txn_helper.rs @@ -18,7 +18,69 @@ use serde::Serialize; use crate::error::Result; use crate::key::{DeserializedValueWithBytes, TableMetaValue}; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse}; +use crate::rpc::KeyValue; +/// The response set of [TxnOpResponse::ResponseGet] +pub(crate) struct TxnOpGetResponseSet(Vec); + +impl TxnOpGetResponseSet { + /// Returns a [TxnOp] to retrieve the value corresponding `key` and + /// a filter to consume corresponding [KeyValue] from [TxnOpGetResponseSet]. + pub(crate) fn build_get_op>>( + key: T, + ) -> ( + TxnOp, + impl FnMut(&'_ mut TxnOpGetResponseSet) -> Option>, + ) { + let key = key.into(); + (TxnOp::Get(key.clone()), TxnOpGetResponseSet::filter(key)) + } + + /// Returns a filter to consume a [KeyValue] where the key equals `key`. + pub(crate) fn filter(key: Vec) -> impl FnMut(&mut TxnOpGetResponseSet) -> Option> { + move |set| { + let pos = set.0.iter().position(|kv| kv.key == key); + match pos { + Some(pos) => Some(set.0.remove(pos).value), + None => None, + } + } + } + + /// Returns a decoder to decode bytes to `DeserializedValueWithBytes`. + pub(crate) fn decode_with( + mut f: F, + ) -> impl FnMut(&mut TxnOpGetResponseSet) -> Result>> + where + F: FnMut(&mut TxnOpGetResponseSet) -> Option>, + T: Serialize + DeserializeOwned + TableMetaValue, + { + move |set| { + f(set) + .map(|value| DeserializedValueWithBytes::from_inner_slice(&value)) + .transpose() + } + } +} + +impl From<&mut Vec> for TxnOpGetResponseSet { + fn from(value: &mut Vec) -> Self { + let value = value + .extract_if(|resp| matches!(resp, TxnOpResponse::ResponseGet(_))) + .flat_map(|resp| { + // Safety: checked + let TxnOpResponse::ResponseGet(r) = resp else { + unreachable!() + }; + + r.kvs + }) + .collect::>(); + TxnOpGetResponseSet(value) + } +} + +// TODO(weny): using `TxnOpGetResponseSet`. pub(crate) fn build_txn_response_decoder_fn( raw_key: Vec, ) -> impl FnOnce(&Vec) -> Result>> diff --git a/src/common/meta/src/kv_backend/memory.rs b/src/common/meta/src/kv_backend/memory.rs index 6c95bb646991..5c269a06a5ec 100644 --- a/src/common/meta/src/kv_backend/memory.rs +++ b/src/common/meta/src/kv_backend/memory.rs @@ -70,6 +70,25 @@ impl MemoryKvBackend { let mut kvs = self.kvs.write().unwrap(); kvs.clear(); } + + #[cfg(test)] + /// Returns true if the `kvs` is empty. + pub fn is_empty(&self) -> bool { + self.kvs.read().unwrap().is_empty() + } + + #[cfg(test)] + /// Returns the `kvs`. + pub fn dump(&self) -> BTreeMap, Vec> { + let kvs = self.kvs.read().unwrap(); + kvs.clone() + } + + #[cfg(test)] + /// Returns the length of `kvs` + pub fn len(&self) -> usize { + self.kvs.read().unwrap().len() + } } #[async_trait] diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 3737bd94a7e4..655e6d27c1bd 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -16,6 +16,7 @@ #![feature(btree_extract_if)] #![feature(async_closure)] #![feature(let_chains)] +#![feature(extract_if)] pub mod cache_invalidator; pub mod cluster; diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 295fafd358c7..f1d2f33f11df 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -27,6 +27,7 @@ use crate::ddl::DdlContext; use crate::error::Result; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; +use crate::kv_backend::KvBackendRef; use crate::peer::Peer; use crate::region_keeper::MemoryRegionKeeper; use crate::sequence::SequenceBuilder; @@ -86,6 +87,14 @@ impl DatanodeManager for MockDatanodeManager DdlContext { let kv_backend = Arc::new(MemoryKvBackend::new()); + new_ddl_context_with_kv_backend(datanode_manager, kv_backend) +} + +/// Returns a test purpose [DdlContext] with a specified [KvBackendRef]. +pub fn new_ddl_context_with_kv_backend( + datanode_manager: DatanodeManagerRef, + kv_backend: KvBackendRef, +) -> DdlContext { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); DdlContext { diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 0dc9845850b2..d9dc267b3c6a 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -545,9 +545,7 @@ impl RegionServerInner { match region_change { RegionChange::None => {} RegionChange::Register(_, _) | RegionChange::Deregisters => { - self.region_map - .remove(®ion_id) - .map(|(id, engine)| engine.set_writable(id, false)); + self.region_map.remove(®ion_id); } } } diff --git a/tests/cases/standalone/common/create/create_metric_table.result b/tests/cases/standalone/common/create/create_metric_table.result index 5256f9d1b91b..5384723ca7bd 100644 --- a/tests/cases/standalone/common/create/create_metric_table.result +++ b/tests/cases/standalone/common/create/create_metric_table.result @@ -71,7 +71,26 @@ DESC TABLE t2; | val | Float64 | | YES | | FIELD | +--------+----------------------+-----+------+---------+---------------+ --- TODO(ruihang): add a case that drops phy before t1 +-- should be failed +-- SQLNESS REPLACE (region\s\d+\(\d+\,\s\d+\)) region +DROP TABLE phy; + +Error: 1004(InvalidArguments), Physical region is busy, there are still some logical regions using it + +-- metadata should be restored +DESC TABLE phy; + ++------------+----------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++------------+----------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | | NO | | FIELD | +| val | Float64 | | YES | | FIELD | +| __table_id | UInt32 | PRI | NO | | TAG | +| __tsid | UInt64 | PRI | NO | | TAG | +| host | String | PRI | YES | | TAG | +| job | String | PRI | YES | | TAG | ++------------+----------------------+-----+------+---------+---------------+ + DROP TABLE t1; Affected Rows: 0 diff --git a/tests/cases/standalone/common/create/create_metric_table.sql b/tests/cases/standalone/common/create/create_metric_table.sql index a5be70c95502..af1acdac0551 100644 --- a/tests/cases/standalone/common/create/create_metric_table.sql +++ b/tests/cases/standalone/common/create/create_metric_table.sql @@ -16,7 +16,11 @@ DESC TABLE t1; DESC TABLE t2; --- TODO(ruihang): add a case that drops phy before t1 +-- should be failed +-- SQLNESS REPLACE (region\s\d+\(\d+\,\s\d+\)) region +DROP TABLE phy; +-- metadata should be restored +DESC TABLE phy; DROP TABLE t1;