Skip to content

Commit

Permalink
fix: fix some check and clean for drop database and schema (risingwav…
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Apr 6, 2023
1 parent 8f8c191 commit 4c0a0b4
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 37 deletions.
8 changes: 7 additions & 1 deletion src/frontend/src/catalog/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::catalog::PG_CATALOG_SCHEMA_NAME;
use risingwave_pb::catalog::{PbDatabase, PbSchema};

use crate::catalog::schema_catalog::SchemaCatalog;
use crate::catalog::{DatabaseId, SchemaId};
use crate::catalog::{DatabaseId, SchemaId, TableId};

#[derive(Clone, Debug)]
pub struct DatabaseCatalog {
Expand Down Expand Up @@ -51,6 +51,12 @@ impl DatabaseCatalog {
self.schema_by_name.keys().cloned().collect_vec()
}

pub fn iter_all_table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
self.schema_by_name
.values()
.flat_map(|schema| schema.iter_all().map(|t| t.id()))
}

pub fn get_all_schema_info(&self) -> Vec<PbSchema> {
self.schema_by_name
.values()
Expand Down
34 changes: 21 additions & 13 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub struct Catalog {
database_by_name: HashMap<String, DatabaseCatalog>,
db_name_by_id: HashMap<DatabaseId, String>,
/// all table catalogs in the cluster identified by universal unique table id.
table_by_id: HashMap<TableId, TableCatalog>,
table_by_id: HashMap<TableId, Arc<TableCatalog>>,
connection_by_id: HashMap<ConnectionId, ConnectionCatalog>,
connection_id_by_name: HashMap<String, ConnectionId>,
}
Expand Down Expand Up @@ -152,12 +152,13 @@ impl Catalog {
}

pub fn create_table(&mut self, proto: &PbTable) {
self.table_by_id.insert(proto.id.into(), proto.into());
self.get_database_mut(proto.database_id)
let table = self
.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.create_table(proto);
self.table_by_id.insert(proto.id.into(), table);
}

pub fn create_index(&mut self, proto: &PbIndex) {
Expand Down Expand Up @@ -215,7 +216,10 @@ impl Catalog {

pub fn drop_database(&mut self, db_id: DatabaseId) {
let name = self.db_name_by_id.remove(&db_id).unwrap();
let _database = self.database_by_name.remove(&name).unwrap();
let database = self.database_by_name.remove(&name).unwrap();
database.iter_all_table_ids().for_each(|table| {
self.table_by_id.remove(&table);
});
}

pub fn drop_schema(&mut self, db_id: DatabaseId, schema_id: SchemaId) {
Expand All @@ -232,12 +236,13 @@ impl Catalog {
}

pub fn update_table(&mut self, proto: &PbTable) {
self.table_by_id.insert(proto.id.into(), proto.into());
self.get_database_mut(proto.database_id)
let table = self
.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.update_table(proto);
self.table_by_id.insert(proto.id.into(), table);
}

pub fn update_index(&mut self, proto: &PbIndex) {
Expand Down Expand Up @@ -363,7 +368,8 @@ impl Catalog {
}

pub fn get_table_name_by_id(&self, table_id: TableId) -> CatalogResult<String> {
self.get_table_by_id(&table_id).map(|table| table.name)
self.get_table_by_id(&table_id)
.map(|table| table.name.clone())
}

pub fn get_schema_by_id(
Expand Down Expand Up @@ -414,10 +420,9 @@ impl Catalog {
.ok_or_else(|| CatalogError::NotFound("table", table_name.to_string()))
}

pub fn get_table_by_id(&self, table_id: &TableId) -> CatalogResult<TableCatalog> {
pub fn get_table_by_id(&self, table_id: &TableId) -> CatalogResult<&Arc<TableCatalog>> {
self.table_by_id
.get(table_id)
.cloned()
.ok_or_else(|| CatalogError::NotFound("table id", table_id.to_string()))
}

Expand All @@ -439,20 +444,23 @@ impl Catalog {
}

if found {
let mut table = self.get_table_by_id(table_id).unwrap();
let mut table = self
.get_table_by_id(table_id)
.unwrap()
.to_prost(schema_id, database_id);
table.name = table_name.to_string();
self.update_table(&table.to_prost(schema_id, database_id));
self.update_table(&table);
}
}

#[cfg(test)]
pub fn insert_table_id_mapping(&mut self, table_id: TableId, fragment_id: super::FragmentId) {
self.table_by_id.insert(
table_id,
TableCatalog {
Arc::new(TableCatalog {
fragment_id,
..Default::default()
},
}),
);
}

Expand Down
14 changes: 10 additions & 4 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct SchemaCatalog {
}

impl SchemaCatalog {
pub fn create_table(&mut self, prost: &PbTable) {
pub fn create_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
let name = prost.name.clone();
let id = prost.id.into();
let table: TableCatalog = prost.into();
Expand All @@ -63,7 +63,8 @@ impl SchemaCatalog {
self.table_by_name
.try_insert(name, table_ref.clone())
.unwrap();
self.table_by_id.try_insert(id, table_ref).unwrap();
self.table_by_id.try_insert(id, table_ref.clone()).unwrap();
table_ref
}

pub fn create_sys_table(&mut self, sys_table: SystemCatalog) {
Expand All @@ -72,7 +73,7 @@ impl SchemaCatalog {
.unwrap();
}

pub fn update_table(&mut self, prost: &PbTable) {
pub fn update_table(&mut self, prost: &PbTable) -> Arc<TableCatalog> {
let name = prost.name.clone();
let id = prost.id.into();
let table: TableCatalog = prost.into();
Expand All @@ -84,7 +85,8 @@ impl SchemaCatalog {
self.table_by_name.remove(old_table.name());
}
self.table_by_name.insert(name, table_ref.clone());
self.table_by_id.insert(id, table_ref);
self.table_by_id.insert(id, table_ref.clone());
table_ref
}

pub fn update_index(&mut self, prost: &PbIndex) {
Expand Down Expand Up @@ -295,6 +297,10 @@ impl SchemaCatalog {
.expect("function not found by argument types");
}

pub fn iter_all(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
self.table_by_name.values()
}

pub fn iter_table(&self) -> impl Iterator<Item = &Arc<TableCatalog>> {
self.table_by_name
.iter()
Expand Down
10 changes: 6 additions & 4 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,16 @@ impl FrontendObserverNode {
table.id.into(),
),
Operation::Update => {
let old_table =
catalog_guard.get_table_by_id(&table.id.into()).unwrap();
let old_fragment_id = catalog_guard
.get_table_by_id(&table.id.into())
.unwrap()
.fragment_id;
catalog_guard.update_table(table);
if old_table.fragment_id != table.fragment_id {
if old_fragment_id != table.fragment_id {
// FIXME: the frontend node delete its fragment for the update
// operation by itself.
self.worker_node_manager
.remove_fragment_mapping(&old_table.fragment_id);
.remove_fragment_mapping(&old_fragment_id);
}
}
_ => panic!("receive an unsupported notify {:?}", resp),
Expand Down
17 changes: 4 additions & 13 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,17 +403,9 @@ impl LocalQueryExecution {
.inner_side_table_desc
.as_ref()
.expect("no side table desc");
let table = self
.front_env
.catalog_reader()
.read_guard()
.get_table_by_id(&side_table_desc.table_id.into())
.context("side table not found")?;
let mapping = self
.front_env
.worker_node_manager()
.get_fragment_mapping(&table.fragment_id)
.context("fragment mapping not found")?;
.get_vnode_mapping(&side_table_desc.table_id.into())
.context("side table not found")?;

// TODO: should we use `pb::ParallelUnitMapping` here?
node.inner_side_vnode_mapping = mapping.to_expanded();
Expand Down Expand Up @@ -454,9 +446,8 @@ impl LocalQueryExecution {

#[inline(always)]
fn get_vnode_mapping(&self, table_id: &TableId) -> Option<ParallelUnitMapping> {
self.front_env
.catalog_reader()
.read_guard()
let reader = self.front_env.catalog_reader().read_guard();
reader
.get_table_by_id(table_id)
.map(|table| {
self.front_env
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/scheduler/plan_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,7 @@ impl BatchPlanFragmenter {
.catalog_reader
.read_guard()
.get_table_by_id(&table_desc.table_id)
.cloned()
.map_err(RwError::from)?;
let vnode_mapping = self
.worker_node_manager
Expand Down Expand Up @@ -941,6 +942,7 @@ impl BatchPlanFragmenter {
.catalog_reader
.read_guard()
.get_table_by_id(&table_desc.table_id)
.cloned()
.map_err(RwError::from)?;
let vnode_mapping = self
.worker_node_manager
Expand Down
12 changes: 12 additions & 0 deletions src/meta/src/manager/catalog/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,18 @@ impl DatabaseManager {
}
}

pub fn has_creation_in_database(&self, database_id: DatabaseId) -> bool {
self.in_progress_creation_tracker
.iter()
.any(|relation_key| relation_key.0 == database_id)
}

pub fn has_creation_in_schema(&self, schema_id: SchemaId) -> bool {
self.in_progress_creation_tracker
.iter()
.any(|relation_key| relation_key.1 == schema_id)
}

pub fn has_in_progress_creation(&self, relation: &RelationKey) -> bool {
self.in_progress_creation_tracker
.contains(&relation.clone())
Expand Down
26 changes: 24 additions & 2 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,13 @@ where
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let user_core = &mut core.user;

if database_core.has_creation_in_database(database_id) {
return Err(MetaError::permission_denied(
"Some relations are creating in the target database, try again later".into(),
));
}

let mut databases = BTreeMapTransaction::new(&mut database_core.databases);
let mut schemas = BTreeMapTransaction::new(&mut database_core.schemas);
let mut sources = BTreeMapTransaction::new(&mut database_core.sources);
Expand All @@ -239,6 +246,7 @@ where
let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes);
let mut views = BTreeMapTransaction::new(&mut database_core.views);
let mut users = BTreeMapTransaction::new(&mut user_core.user_info);
let mut functions = BTreeMapTransaction::new(&mut database_core.functions);

/// `drop_by_database_id` provides a wrapper for dropping relations by database id, it will
/// return the relation ids that dropped.
Expand Down Expand Up @@ -267,6 +275,7 @@ where
let tables_to_drop = drop_by_database_id!(tables, database_id);
let indexes_to_drop = drop_by_database_id!(indexes, database_id);
let views_to_drop = drop_by_database_id!(views, database_id);
let functions_to_drop = drop_by_database_id!(functions, database_id);

let objects = std::iter::once(Object::DatabaseId(database_id))
.chain(
Expand All @@ -281,6 +290,11 @@ where
.iter()
.map(|source| Object::SourceId(source.id)),
)
.chain(
functions_to_drop
.iter()
.map(|function| Object::FunctionId(function.id)),
)
.collect_vec();
let users_need_update = Self::update_user_privileges(&mut users, &objects);

Expand All @@ -298,6 +312,7 @@ where
)
.chain(indexes_to_drop.iter().map(|index| index.owner))
.chain(views_to_drop.iter().map(|view| view.owner))
.chain(functions_to_drop.iter().map(|function| function.owner))
.for_each(|owner_id| user_core.decrease_ref(owner_id));

// Update relation ref count.
Expand All @@ -310,7 +325,7 @@ where
for view in &views_to_drop {
database_core.relation_ref_count.remove(&view.id);
}

// FIXME: resolve function refer count.
for user in users_need_update {
self.notify_frontend(Operation::Update, Info::User(user))
.await;
Expand Down Expand Up @@ -411,8 +426,15 @@ where
if !database_core.schemas.contains_key(&schema_id) {
return Err(MetaError::catalog_id_not_found("schema", schema_id));
}
if database_core.has_creation_in_schema(schema_id) {
return Err(MetaError::permission_denied(
"Some relations are creating in the target schema, try again later".into(),
));
}
if !database_core.schema_is_empty(schema_id) {
bail!("schema is not empty!");
return Err(MetaError::permission_denied(
"The schema is not empty, try dropping them first".into(),
));
}
let mut schemas = BTreeMapTransaction::new(&mut database_core.schemas);
let mut users = BTreeMapTransaction::new(&mut user_core.user_info);
Expand Down

0 comments on commit 4c0a0b4

Please sign in to comment.