Skip to content

Commit

Permalink
feat(drop_table): support to rollback table metadata (#3692)
Browse files Browse the repository at this point in the history
* feat: support to rollback table metadata

* refactor: store table route value instead of physical table route

* feat(drop_table): support to rollback table metadata

* test: add rollback tests for drop table

* fix: do not set region to readonly

* test: add sqlness tests

* feat: implement TombstoneManager

* test: add tests for TombstoneManager

* refactor: using TombstoneManager

* chore: remove unused code

* fix: fix typo

* refactor: using `on_restore_metadata`

* refactor: add `executor` to `DropTableProcedure`

* refactor: simplify the `TombstoneManager`

* refactor: refactor `Key`

* refactor: carry more info

* feat: add `destroy_table_metadata`

* refactor: remove redundant table_route_value

* feat: ensure the key is empty

* feat: introcude `table_metadata_keys`

* chore: carry more info

* chore: remove clone

* chore: apply suggestions from CR

* feat: delete metadata tombstone
  • Loading branch information
WenyXu authored Apr 16, 2024
1 parent 64941d8 commit d123791
Show file tree
Hide file tree
Showing 21 changed files with 1,308 additions and 221 deletions.
7 changes: 2 additions & 5 deletions src/cmd/src/cli/bench/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,11 @@ impl TableMetadataBencher {
.unwrap();
let start = Instant::now();
let table_info = table_info.unwrap();
let table_route = table_route.unwrap();
let table_id = table_info.table_info.ident.table_id;
let _ = self
.table_metadata_manager
.delete_table_metadata(
table_id,
&table_info.table_name(),
table_route.unwrap().region_routes().unwrap(),
)
.delete_table_metadata(table_id, &table_info.table_name(), &table_route)
.await;
start.elapsed()
},
Expand Down
4 changes: 3 additions & 1 deletion src/common/meta/src/ddl/drop_database/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl DropDatabaseCursor {
.await?;
Ok((
Box::new(DropDatabaseExecutor::new(
table_id,
table_id,
TableName::new(&ctx.catalog, &ctx.schema, &table_name),
table_route.region_routes,
Expand All @@ -86,6 +87,7 @@ impl DropDatabaseCursor {
}
(DropTableTarget::Physical, TableRouteValue::Physical(table_route)) => Ok((
Box::new(DropDatabaseExecutor::new(
table_id,
table_id,
TableName::new(&ctx.catalog, &ctx.schema, &table_name),
table_route.region_routes,
Expand Down Expand Up @@ -220,7 +222,7 @@ mod tests {
.get_physical_table_route(physical_table_id)
.await
.unwrap();
assert_eq!(table_route.region_routes, executor.region_routes);
assert_eq!(table_route.region_routes, executor.physical_region_routes);
assert_eq!(executor.target, DropTableTarget::Logical);
}

Expand Down
63 changes: 54 additions & 9 deletions src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@ use crate::ddl::drop_database::State;
use crate::ddl::drop_table::executor::DropTableExecutor;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_route::TableRouteValue;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::router::{operating_leader_regions, RegionRoute};
use crate::table_name::TableName;

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct DropDatabaseExecutor {
table_id: TableId,
physical_table_id: TableId,
table_name: TableName,
pub(crate) region_routes: Vec<RegionRoute>,
/// The physical table region routes.
pub(crate) physical_region_routes: Vec<RegionRoute>,
pub(crate) target: DropTableTarget,
#[serde(skip)]
dropping_regions: Vec<OperatingRegionGuard>,
Expand All @@ -44,14 +47,16 @@ impl DropDatabaseExecutor {
/// Returns a new [DropDatabaseExecutor].
pub fn new(
table_id: TableId,
physical_table_id: TableId,
table_name: TableName,
region_routes: Vec<RegionRoute>,
physical_region_routes: Vec<RegionRoute>,
target: DropTableTarget,
) -> Self {
Self {
table_name,
table_id,
region_routes,
physical_table_id,
table_name,
physical_region_routes,
target,
dropping_regions: vec![],
}
Expand All @@ -60,7 +65,7 @@ impl DropDatabaseExecutor {

impl DropDatabaseExecutor {
fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
let dropping_regions = operating_leader_regions(&self.region_routes);
let dropping_regions = operating_leader_regions(&self.physical_region_routes);
let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
for (region_id, datanode_id) in dropping_regions {
let guard = ddl_ctx
Expand All @@ -87,12 +92,18 @@ impl State for DropDatabaseExecutor {
) -> Result<(Box<dyn State>, Status)> {
self.register_dropping_regions(ddl_ctx)?;
let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
// Deletes metadata for table permanently.
let table_route_value = TableRouteValue::new(
self.table_id,
self.physical_table_id,
self.physical_region_routes.clone(),
);
executor
.on_remove_metadata(ddl_ctx, &self.region_routes)
.on_destroy_metadata(ddl_ctx, &table_route_value)
.await?;
executor.invalidate_table_cache(ddl_ctx).await?;
executor
.on_drop_regions(ddl_ctx, &self.region_routes)
.on_drop_regions(ddl_ctx, &self.physical_region_routes)
.await?;
info!("Table: {}({}) is dropped", self.table_name, self.table_id);

Expand Down Expand Up @@ -122,7 +133,9 @@ mod tests {
use crate::ddl::drop_database::{DropDatabaseContext, DropTableTarget, State};
use crate::ddl::test_util::{create_logical_table, create_physical_table};
use crate::error::{self, Error, Result};
use crate::key::datanode_table::DatanodeTableKey;
use crate::peer::Peer;
use crate::rpc::router::region_distribution;
use crate::table_name::TableName;
use crate::test_util::{new_ddl_context, MockDatanodeHandler, MockDatanodeManager};

Expand Down Expand Up @@ -157,6 +170,7 @@ mod tests {
.unwrap();
{
let mut state = DropDatabaseExecutor::new(
physical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes.clone(),
Expand All @@ -181,9 +195,10 @@ mod tests {
tables: None,
};
let mut state = DropDatabaseExecutor::new(
physical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes,
table_route.region_routes.clone(),
DropTableTarget::Physical,
);
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
Expand All @@ -207,6 +222,7 @@ mod tests {
.unwrap();
{
let mut state = DropDatabaseExecutor::new(
logical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
table_route.region_routes.clone(),
Expand All @@ -231,15 +247,43 @@ mod tests {
tables: None,
};
let mut state = DropDatabaseExecutor::new(
logical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "metric"),
table_route.region_routes,
DropTableTarget::Logical,
);
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Logical);
// Checks table info
ddl_context
.table_metadata_manager
.table_info_manager()
.get(physical_table_id)
.await
.unwrap()
.unwrap();
// Checks table route
let table_route = ddl_context
.table_metadata_manager
.table_route_manager()
.table_route_storage()
.get(physical_table_id)
.await
.unwrap()
.unwrap();
let region_routes = table_route.region_routes().unwrap();
for datanode_id in region_distribution(region_routes).into_keys() {
ddl_context
.table_metadata_manager
.datanode_table_manager()
.get(&DatanodeTableKey::new(datanode_id, physical_table_id))
.await
.unwrap()
.unwrap();
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -279,6 +323,7 @@ mod tests {
.await
.unwrap();
let mut state = DropDatabaseExecutor::new(
physical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes,
Expand Down
Loading

0 comments on commit d123791

Please sign in to comment.