From 730242436b57583292ed2fdd604df674e0157e24 Mon Sep 17 00:00:00 2001 From: Dylan Date: Fri, 24 Mar 2023 18:34:47 +0800 Subject: [PATCH] feat(meta): support group notification (#8741) --- proto/meta.proto | 22 +- .../common_service/src/observer_manager.rs | 10 +- src/frontend/src/observer/observer_manager.rs | 124 ++++---- src/meta/src/hummock/manager/mod.rs | 5 +- src/meta/src/manager/catalog/fragment.rs | 7 +- src/meta/src/manager/catalog/mod.rs | 281 ++++++++++++------ src/meta/src/manager/notification.rs | 56 +++- .../compactor_observer/observer_manager.rs | 26 +- src/storage/src/hummock/observer_manager.rs | 31 +- 9 files changed, 380 insertions(+), 182 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index aa7bece1845b..cd9f044d4f98 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -251,6 +251,21 @@ message MetaSnapshot { SnapshotVersion version = 13; } +message Relation { + oneof relation_info { + catalog.Table table = 1; + catalog.Source source = 2; + catalog.Sink sink = 3; + catalog.Index index = 4; + catalog.View view = 5; + catalog.Function function = 6; + } +} + +message RelationGroup { + repeated Relation relations = 1; +} + message SubscribeResponse { enum Operation { UNSPECIFIED = 0; @@ -265,12 +280,6 @@ message SubscribeResponse { oneof info { catalog.Database database = 4; catalog.Schema schema = 5; - catalog.Table table = 6; - catalog.Source source = 7; - catalog.Sink sink = 8; - catalog.Index index = 9; - catalog.View view = 10; - catalog.Function function = 18; user.UserInfo user = 11; FragmentParallelUnitMapping parallel_unit_mapping = 12; common.WorkerNode node = 13; @@ -280,6 +289,7 @@ message SubscribeResponse { backup_service.MetaBackupManifestId meta_backup_manifest_id = 17; SystemParams system_params = 19; hummock.WriteLimits hummock_write_limits = 20; + RelationGroup relation_group = 21; } } diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index 156cd7882974..695e3b4d4411 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -116,15 +116,7 @@ where }; notification_vec.retain_mut(|notification| match notification.info.as_ref().unwrap() { - Info::Database(_) - | Info::Schema(_) - | Info::Table(_) - | Info::Source(_) - | Info::Sink(_) - | Info::Index(_) - | Info::View(_) - | Info::Function(_) - | Info::User(_) => { + Info::Database(_) | Info::Schema(_) | Info::RelationGroup(_) | Info::User(_) => { notification.version > info.version.as_ref().unwrap().catalog_version } Info::ParallelUnitMapping(_) => { diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index 13fd1eb2c387..353e9fe4c2e4 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -20,6 +20,7 @@ use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; use risingwave_common_service::observer_manager::{ObserverState, SubscribeFrontend}; use risingwave_pb::common::WorkerNode; +use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::{FragmentParallelUnitMapping, SubscribeResponse}; use tokio::sync::watch::Sender; @@ -49,14 +50,7 @@ impl ObserverState for FrontendObserverNode { }; match info.to_owned() { - Info::Database(_) - | Info::Schema(_) - | Info::Table(_) - | Info::Source(_) - | Info::Index(_) - | Info::Sink(_) - | Info::Function(_) - | Info::View(_) => { + Info::Database(_) | Info::Schema(_) | Info::RelationGroup(_) => { self.handle_catalog_notification(resp); } Info::Node(node) => { @@ -193,52 +187,76 @@ impl FrontendObserverNode { Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id), _ => panic!("receive an unsupported notify {:?}", resp), }, - Info::Table(table) => match resp.operation() { - Operation::Add => catalog_guard.create_table(table), - Operation::Delete => { - catalog_guard.drop_table(table.database_id, table.schema_id, table.id.into()) - } - Operation::Update => catalog_guard.update_table(table), - _ => panic!("receive an unsupported notify {:?}", resp), - }, - Info::Source(source) => match resp.operation() { - Operation::Add => catalog_guard.create_source(source), - Operation::Delete => { - catalog_guard.drop_source(source.database_id, source.schema_id, source.id) - } - _ => panic!("receive an unsupported notify {:?}", resp), - }, - Info::Sink(sink) => match resp.operation() { - Operation::Add => catalog_guard.create_sink(sink), - Operation::Delete => { - catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id) - } - _ => panic!("receive an unsupported notify {:?}", resp), - }, - Info::Index(index) => match resp.operation() { - Operation::Add => catalog_guard.create_index(index), - Operation::Delete => { - catalog_guard.drop_index(index.database_id, index.schema_id, index.id.into()) - } - Operation::Update => catalog_guard.update_index(index), - _ => panic!("receive an unsupported notify {:?}", resp), - }, - Info::View(view) => match resp.operation() { - Operation::Add => catalog_guard.create_view(view), - Operation::Delete => { - catalog_guard.drop_view(view.database_id, view.schema_id, view.id) + Info::RelationGroup(relation_group) => { + for relation in &relation_group.relations { + let Some(relation) = relation.relation_info.as_ref() else { + continue; + }; + match relation { + RelationInfo::Table(table) => match resp.operation() { + Operation::Add => catalog_guard.create_table(table), + Operation::Delete => catalog_guard.drop_table( + table.database_id, + table.schema_id, + table.id.into(), + ), + Operation::Update => { + let old_table = + catalog_guard.get_table_by_id(&table.id.into()).unwrap(); + catalog_guard.update_table(table); + assert!(old_table.fragment_id != table.fragment_id); + // FIXME: the frontend node delete its fragment for the update + // operation by itself. + self.worker_node_manager + .remove_fragment_mapping(&old_table.fragment_id); + } + _ => panic!("receive an unsupported notify {:?}", resp), + }, + RelationInfo::Source(source) => match resp.operation() { + Operation::Add => catalog_guard.create_source(source), + Operation::Delete => catalog_guard.drop_source( + source.database_id, + source.schema_id, + source.id, + ), + _ => panic!("receive an unsupported notify {:?}", resp), + }, + RelationInfo::Sink(sink) => match resp.operation() { + Operation::Add => catalog_guard.create_sink(sink), + Operation::Delete => { + catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id) + } + _ => panic!("receive an unsupported notify {:?}", resp), + }, + RelationInfo::Index(index) => match resp.operation() { + Operation::Add => catalog_guard.create_index(index), + Operation::Delete => catalog_guard.drop_index( + index.database_id, + index.schema_id, + index.id.into(), + ), + Operation::Update => catalog_guard.update_index(index), + _ => panic!("receive an unsupported notify {:?}", resp), + }, + RelationInfo::View(view) => match resp.operation() { + Operation::Add => catalog_guard.create_view(view), + Operation::Delete => { + catalog_guard.drop_view(view.database_id, view.schema_id, view.id) + } + _ => panic!("receive an unsupported notify {:?}", resp), + }, + RelationInfo::Function(function) => match resp.operation() { + Operation::Add => catalog_guard.create_function(function), + Operation::Delete => catalog_guard.drop_function( + function.database_id, + function.schema_id, + function.id.into(), + ), + _ => panic!("receive an unsupported notify {:?}", resp), + }, + } } - _ => panic!("receive an unsupported notify {:?}", resp), - }, - Info::Function(function) => match resp.operation() { - Operation::Add => catalog_guard.create_function(function), - Operation::Delete => catalog_guard.drop_function( - function.database_id, - function.schema_id, - function.id.into(), - ), - _ => panic!("receive an unsupported notify {:?}", resp), - }, + } _ => unreachable!(), } assert!( diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index ca045bb03cde..c2e1aefabc0c 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -166,6 +166,7 @@ use risingwave_hummock_sdk::table_stats::{ use risingwave_pb::catalog::Table; use risingwave_pb::hummock::version_update_payload::Payload; use risingwave_pb::hummock::PbCompactionGroupInfo; +use risingwave_pb::meta::relation::RelationInfo; /// Acquire write lock of the lock with `lock_name`. /// The macro will use macro `function_name` to get the name of the function of method that calls @@ -1732,11 +1733,11 @@ where for table in table_catalogs { self.env .notification_manager() - .notify_hummock(Operation::Add, Info::Table(table.clone())) + .notify_hummock_relation_info(Operation::Add, RelationInfo::Table(table.clone())) .await; self.env .notification_manager() - .notify_compactor(Operation::Add, Info::Table(table)) + .notify_compactor_relation_info(Operation::Add, RelationInfo::Table(table)) .await; } diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 12bf734001d2..51d823d65d93 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -279,6 +279,7 @@ where // FIXME: we use a dummy table ID for new table fragments, so we can drop the old fragments // with the real table ID, then replace the dummy table ID with the real table ID. This is a // workaround for not having the version info in the fragment manager. + #[allow(unused_variables)] let old_table_fragment = table_fragments .remove(table_id) .with_context(|| format!("table_fragment not exist: id={}", table_id))?; @@ -344,8 +345,10 @@ where // Commit changes and notify about the changes. commit_meta!(self, table_fragments)?; - self.notify_fragment_mapping(&old_table_fragment, Operation::Delete) - .await; + // FIXME: Do not notify frontend currently, because frontend nodes might refer to old table + // catalog and need to access the old fragment. Let frontend nodes delete the old fragment + // when they receive table catalog change. self.notify_fragment_mapping(& + // old_table_fragment, Operation::Delete) .await; self.notify_fragment_mapping(&table_fragment, Operation::Add) .await; diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 0e82b6e9734c..85de6c7d08eb 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -90,7 +90,8 @@ macro_rules! commit_meta { pub(crate) use commit_meta; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::expr::expr_node::RexNode; -use risingwave_pb::meta::CreatingJobInfo; +use risingwave_pb::meta::relation::RelationInfo; +use risingwave_pb::meta::{CreatingJobInfo, Relation, RelationGroup}; pub type CatalogManagerRef = Arc>; @@ -452,7 +453,7 @@ where } let version = self - .notify_frontend(Operation::Add, Info::View(view.to_owned())) + .notify_frontend_relation_info(Operation::Add, RelationInfo::View(view.to_owned())) .await; Ok(version) @@ -488,7 +489,7 @@ where .await; } let version = self - .notify_frontend(Operation::Delete, Info::View(view)) + .notify_frontend_relation_info(Operation::Delete, RelationInfo::View(view)) .await; Ok(version) @@ -516,7 +517,10 @@ where user_core.increase_ref(function.owner); let version = self - .notify_frontend(Operation::Add, Info::Function(function.to_owned())) + .notify_frontend_relation_info( + Operation::Add, + RelationInfo::Function(function.to_owned()), + ) .await; Ok(version) @@ -546,7 +550,7 @@ where } let version = self - .notify_frontend(Operation::Delete, Info::Function(function)) + .notify_frontend_relation_info(Operation::Delete, RelationInfo::Function(function)) .await; Ok(version) @@ -577,8 +581,11 @@ where let core = &mut self.core.lock().await.database; core.mark_creating_tables(creating_tables); for table in creating_tables { - self.notify_hummock_and_compactor(Operation::Add, Info::Table(table.to_owned())) - .await; + self.notify_hummock_and_compactor_relation_info( + Operation::Add, + RelationInfo::Table(table.to_owned()), + ) + .await; } } @@ -587,9 +594,10 @@ where core.unmark_creating_tables(creating_table_ids); if need_notify { for table_id in creating_table_ids { - self.notify_hummock_and_compactor( + // TODO: use group notification? + self.notify_hummock_and_compactor_relation_info( Operation::Delete, - Info::Table(Table { + RelationInfo::Table(Table { id: *table_id, ..Default::default() }), @@ -599,15 +607,19 @@ where } } - async fn notify_hummock_and_compactor(&self, operation: Operation, info: Info) { + async fn notify_hummock_and_compactor_relation_info( + &self, + operation: Operation, + relation_info: RelationInfo, + ) { self.env .notification_manager() - .notify_hummock(operation, info.clone()) + .notify_hummock_relation_info(operation, relation_info.clone()) .await; self.env .notification_manager() - .notify_compactor(operation, info) + .notify_compactor_relation_info(operation, relation_info) .await; } @@ -666,13 +678,20 @@ where } commit_meta!(self, tables)?; - for internal_table in internal_tables { - self.notify_frontend(Operation::Add, Info::Table(internal_table)) - .await; - } - let version = self - .notify_frontend(Operation::Add, Info::Table(table.to_owned())) + .notify_frontend( + Operation::Add, + Info::RelationGroup(RelationGroup { + relations: vec![Relation { + relation_info: RelationInfo::Table(table.to_owned()).into(), + }] + .into_iter() + .chain(internal_tables.into_iter().map(|internal_table| Relation { + relation_info: RelationInfo::Table(internal_table).into(), + })) + .collect_vec(), + }), + ) .await; Ok(version) @@ -778,22 +797,10 @@ where }); user_core.decrease_ref(table.owner); - for index in indexes_removed { - self.notify_frontend(Operation::Delete, Info::Index(index)) - .await; - } - - for index_table in index_tables { + for index_table in &index_tables { for dependent_relation_id in &index_table.dependent_relations { database_core.decrease_ref_count(*dependent_relation_id); } - self.notify_frontend(Operation::Delete, Info::Table(index_table)) - .await; - } - - for internal_table in internal_tables { - self.notify_frontend(Operation::Delete, Info::Table(internal_table)) - .await; } for user in users_need_update { @@ -806,7 +813,28 @@ where } let version = self - .notify_frontend(Operation::Delete, Info::Table(table)) + .notify_frontend( + Operation::Delete, + Info::RelationGroup(RelationGroup { + relations: indexes_removed + .into_iter() + .map(|index| Relation { + relation_info: RelationInfo::Index(index).into(), + }) + .chain( + internal_tables + .into_iter() + .chain(index_tables.into_iter()) + .map(|internal_table| Relation { + relation_info: RelationInfo::Table(internal_table).into(), + }), + ) + .chain(vec![Relation { + relation_info: RelationInfo::Table(table.to_owned()).into(), + }]) + .collect_vec(), + }), + ) .await; let catalog_deleted_ids = index_table_ids @@ -876,15 +904,25 @@ where .await; } - self.notify_frontend(Operation::Delete, Info::Table(table)) - .await; - for dependent_relation_id in dependent_relations { database_core.decrease_ref_count(dependent_relation_id); } let version = self - .notify_frontend(Operation::Delete, Info::Index(index)) + .notify_frontend( + Operation::Delete, + Info::RelationGroup(RelationGroup { + relations: vec![ + Relation { + relation_info: RelationInfo::Table(table.to_owned()) + .into(), + }, + Relation { + relation_info: RelationInfo::Index(index).into(), + }, + ], + }), + ) .await; Ok(version) @@ -944,7 +982,7 @@ where commit_meta!(self, sources)?; let version = self - .notify_frontend(Operation::Add, Info::Source(source.to_owned())) + .notify_frontend_relation_info(Operation::Add, RelationInfo::Source(source.to_owned())) .await; Ok(version) @@ -992,7 +1030,10 @@ where .await; } let version = self - .notify_frontend(Operation::Delete, Info::Source(source)) + .notify_frontend_relation_info( + Operation::Delete, + RelationInfo::Source(source), + ) .await; Ok(version) @@ -1076,18 +1117,27 @@ where } commit_meta!(self, sources, tables)?; - for internal_table in internal_tables { - self.notify_frontend(Operation::Add, Info::Table(internal_table)) - .await; - } - - self.notify_frontend(Operation::Add, Info::Table(mview.to_owned())) - .await; - - // Currently frontend uses source's version let version = self - .notify_frontend(Operation::Add, Info::Source(source.to_owned())) + .notify_frontend( + Operation::Add, + Info::RelationGroup(RelationGroup { + relations: vec![ + Relation { + relation_info: RelationInfo::Table(mview.to_owned()).into(), + }, + Relation { + relation_info: RelationInfo::Source(source.to_owned()).into(), + }, + ] + .into_iter() + .chain(internal_tables.into_iter().map(|internal_table| Relation { + relation_info: RelationInfo::Table(internal_table).into(), + })) + .collect_vec(), + }), + ) .await; + Ok(version) } @@ -1207,17 +1257,10 @@ where user_core.decrease_ref_count(mview.owner, 2); // source and mview. - for index in indexes_removed { - self.notify_frontend(Operation::Delete, Info::Index(index)) - .await; - } - - for index_table in index_tables { + for index_table in &index_tables { for dependent_relation_id in &index_table.dependent_relations { database_core.decrease_ref_count(*dependent_relation_id); } - self.notify_frontend(Operation::Delete, Info::Table(index_table)) - .await; } for &dependent_relation_id in &mview.dependent_relations { @@ -1227,15 +1270,37 @@ where self.notify_frontend(Operation::Update, Info::User(user)) .await; } - self.notify_frontend(Operation::Delete, Info::Table(mview)) - .await; - for internal_table in internal_tables { - self.notify_frontend(Operation::Delete, Info::Table(internal_table)) - .await; - } let version = self - .notify_frontend(Operation::Delete, Info::Source(source)) + .notify_frontend( + Operation::Delete, + Info::RelationGroup(RelationGroup { + relations: indexes_removed + .into_iter() + .map(|index| Relation { + relation_info: RelationInfo::Index(index).into(), + }) + .chain( + internal_tables + .into_iter() + .chain(index_tables.into_iter()) + .map(|internal_table| Relation { + relation_info: RelationInfo::Table(internal_table) + .into(), + }), + ) + .chain(vec![ + Relation { + relation_info: RelationInfo::Table(mview.to_owned()).into(), + }, + Relation { + relation_info: RelationInfo::Source(source.to_owned()) + .into(), + }, + ]) + .collect_vec(), + }), + ) .await; let catalog_deleted_ids = index_table_ids @@ -1332,11 +1397,20 @@ where commit_meta!(self, indexes, tables)?; - self.notify_frontend(Operation::Add, Info::Table(table.to_owned())) - .await; - let version = self - .notify_frontend(Operation::Add, Info::Index(index.to_owned())) + .notify_frontend( + Operation::Add, + Info::RelationGroup(RelationGroup { + relations: vec![ + Relation { + relation_info: RelationInfo::Table(table.to_owned()).into(), + }, + Relation { + relation_info: RelationInfo::Index(index.to_owned()).into(), + }, + ], + }), + ) .await; Ok(version) @@ -1397,13 +1471,20 @@ where } commit_meta!(self, sinks, tables)?; - for internal_table in internal_tables { - self.notify_frontend(Operation::Add, Info::Table(internal_table)) - .await; - } - let version = self - .notify_frontend(Operation::Add, Info::Sink(sink.to_owned())) + .notify_frontend( + Operation::Add, + Info::RelationGroup(RelationGroup { + relations: vec![Relation { + relation_info: RelationInfo::Sink(sink.to_owned()).into(), + }] + .into_iter() + .chain(internal_tables.into_iter().map(|internal_table| Relation { + relation_info: RelationInfo::Table(internal_table).into(), + })) + .collect_vec(), + }), + ) .await; Ok(version) @@ -1481,13 +1562,20 @@ where database_core.decrease_ref_count(dependent_relation_id); } - for internal_table in internal_tables { - self.notify_frontend(Operation::Delete, Info::Table(internal_table)) - .await; - } - let version = self - .notify_frontend(Operation::Delete, Info::Sink(sink)) + .notify_frontend( + Operation::Delete, + Info::RelationGroup(RelationGroup { + relations: vec![Relation { + relation_info: RelationInfo::Sink(sink.to_owned()).into(), + }] + .into_iter() + .chain(internal_tables.into_iter().map(|internal_table| Relation { + relation_info: RelationInfo::Table(internal_table).into(), + })) + .collect_vec(), + }), + ) .await; Ok(version) @@ -1584,17 +1672,23 @@ where tables.insert(table.id, table.clone()); commit_meta!(self, tables, indexes)?; - // TODO: support group notification. - let mut version = self - .notify_frontend(Operation::Update, Info::Table(table.to_owned())) + // Group notification + let version = self + .notify_frontend( + Operation::Update, + Info::RelationGroup(RelationGroup { + relations: vec![Relation { + relation_info: RelationInfo::Table(table.to_owned()).into(), + }] + .into_iter() + .chain(updated_indexes.into_iter().map(|index| Relation { + relation_info: RelationInfo::Index(index).into(), + })) + .collect_vec(), + }), + ) .await; - for index in updated_indexes { - version = self - .notify_frontend(Operation::Update, Info::Index(index)) - .await; - } - Ok(version) } @@ -1680,6 +1774,17 @@ where .notify_frontend(operation, info) .await } + + async fn notify_frontend_relation_info( + &self, + operation: Operation, + relation_info: RelationInfo, + ) -> NotificationVersion { + self.env + .notification_manager() + .notify_frontend_relation_info(operation, relation_info) + .await + } } // User related methods diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index a6423e77139e..ffbe9ca70285 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -19,8 +19,11 @@ use std::sync::Arc; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::hummock::CompactTask; +use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use risingwave_pb::meta::{MetaSnapshot, SubscribeResponse, SubscribeType}; +use risingwave_pb::meta::{ + MetaSnapshot, Relation, RelationGroup, SubscribeResponse, SubscribeType, +}; use tokio::sync::mpsc::{self, UnboundedSender}; use tokio::sync::Mutex; use tonic::Status; @@ -170,16 +173,67 @@ where .await } + pub async fn notify_frontend_relation_info( + &self, + operation: Operation, + relation_info: RelationInfo, + ) -> NotificationVersion { + self.notify_with_version( + SubscribeType::Frontend.into(), + operation, + Info::RelationGroup(RelationGroup { + relations: vec![Relation { + relation_info: relation_info.into(), + }], + }), + ) + .await + } + pub async fn notify_hummock(&self, operation: Operation, info: Info) -> NotificationVersion { self.notify_with_version(SubscribeType::Hummock.into(), operation, info) .await } + pub async fn notify_hummock_relation_info( + &self, + operation: Operation, + relation_info: RelationInfo, + ) -> NotificationVersion { + self.notify_with_version( + SubscribeType::Hummock.into(), + operation, + Info::RelationGroup(RelationGroup { + relations: vec![Relation { + relation_info: relation_info.into(), + }], + }), + ) + .await + } + pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion { self.notify_with_version(SubscribeType::Compactor.into(), operation, info) .await } + pub async fn notify_compactor_relation_info( + &self, + operation: Operation, + relation_info: RelationInfo, + ) -> NotificationVersion { + self.notify_with_version( + SubscribeType::Compactor.into(), + operation, + Info::RelationGroup(RelationGroup { + relations: vec![Relation { + relation_info: relation_info.into(), + }], + }), + ) + .await + } + pub async fn notify_compute(&self, operation: Operation, info: Info) -> NotificationVersion { self.notify_with_version(SubscribeType::Compute.into(), operation, info) .await diff --git a/src/storage/compactor/src/compactor_observer/observer_manager.rs b/src/storage/compactor/src/compactor_observer/observer_manager.rs index 2e9996ffaa69..6b539ecadfa2 100644 --- a/src/storage/compactor/src/compactor_observer/observer_manager.rs +++ b/src/storage/compactor/src/compactor_observer/observer_manager.rs @@ -21,6 +21,7 @@ use risingwave_hummock_sdk::filter_key_extractor::{ FilterKeyExtractorImpl, FilterKeyExtractorManagerRef, }; use risingwave_pb::catalog::Table; +use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::SubscribeResponse; @@ -39,17 +40,24 @@ impl ObserverState for CompactorObserverNode { }; match info.to_owned() { - Info::Table(table_catalog) => { - assert!( - resp.version > self.version, - "resp version={:?}, current version={:?}", - resp.version, - self.version - ); + Info::RelationGroup(relation_group) => { + for relation in relation_group.relations { + match relation.relation_info.unwrap() { + RelationInfo::Table(table_catalog) => { + assert!( + resp.version > self.version, + "resp version={:?}, current version={:?}", + resp.version, + self.version + ); - self.handle_catalog_notification(resp.operation(), table_catalog); + self.handle_catalog_notification(resp.operation(), table_catalog); - self.version = resp.version; + self.version = resp.version; + } + _ => panic!("error type notification"), + }; + } } Info::HummockVersionDeltas(_) => {} Info::SystemParams(p) => { diff --git a/src/storage/src/hummock/observer_manager.rs b/src/storage/src/hummock/observer_manager.rs index 43cd67fb4f89..5065b4f2619b 100644 --- a/src/storage/src/hummock/observer_manager.rs +++ b/src/storage/src/hummock/observer_manager.rs @@ -21,6 +21,7 @@ use risingwave_hummock_sdk::filter_key_extractor::{ }; use risingwave_pb::catalog::Table; use risingwave_pb::hummock::version_update_payload; +use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::SubscribeResponse; use tokio::sync::mpsc::UnboundedSender; @@ -46,19 +47,25 @@ impl ObserverState for HummockObserverNode { }; match info.to_owned() { - Info::Table(table_catalog) => { - assert!( - resp.version > self.version, - "resp version={:?}, current version={:?}", - resp.version, - self.version - ); - - self.handle_catalog_notification(resp.operation(), table_catalog); - - self.version = resp.version; + Info::RelationGroup(relation_group) => { + for relation in relation_group.relations { + match relation.relation_info.unwrap() { + RelationInfo::Table(table_catalog) => { + assert!( + resp.version > self.version, + "resp version={:?}, current version={:?}", + resp.version, + self.version + ); + + self.handle_catalog_notification(resp.operation(), table_catalog); + + self.version = resp.version; + } + _ => panic!("error type notification"), + }; + } } - Info::HummockVersionDeltas(hummock_version_deltas) => { let _ = self .version_update_sender