Skip to content

Commit

Permalink
feat(meta): support group notification (risingwavelabs#8741)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Mar 24, 2023
1 parent be9723e commit 7302424
Show file tree
Hide file tree
Showing 9 changed files with 380 additions and 182 deletions.
22 changes: 16 additions & 6 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,21 @@ message MetaSnapshot {
SnapshotVersion version = 13;
}

message Relation {
oneof relation_info {
catalog.Table table = 1;
catalog.Source source = 2;
catalog.Sink sink = 3;
catalog.Index index = 4;
catalog.View view = 5;
catalog.Function function = 6;
}
}

message RelationGroup {
repeated Relation relations = 1;
}

message SubscribeResponse {
enum Operation {
UNSPECIFIED = 0;
Expand All @@ -265,12 +280,6 @@ message SubscribeResponse {
oneof info {
catalog.Database database = 4;
catalog.Schema schema = 5;
catalog.Table table = 6;
catalog.Source source = 7;
catalog.Sink sink = 8;
catalog.Index index = 9;
catalog.View view = 10;
catalog.Function function = 18;
user.UserInfo user = 11;
FragmentParallelUnitMapping parallel_unit_mapping = 12;
common.WorkerNode node = 13;
Expand All @@ -280,6 +289,7 @@ message SubscribeResponse {
backup_service.MetaBackupManifestId meta_backup_manifest_id = 17;
SystemParams system_params = 19;
hummock.WriteLimits hummock_write_limits = 20;
RelationGroup relation_group = 21;
}
}

Expand Down
10 changes: 1 addition & 9 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,7 @@ where
};

notification_vec.retain_mut(|notification| match notification.info.as_ref().unwrap() {
Info::Database(_)
| Info::Schema(_)
| Info::Table(_)
| Info::Source(_)
| Info::Sink(_)
| Info::Index(_)
| Info::View(_)
| Info::Function(_)
| Info::User(_) => {
Info::Database(_) | Info::Schema(_) | Info::RelationGroup(_) | Info::User(_) => {
notification.version > info.version.as_ref().unwrap().catalog_version
}
Info::ParallelUnitMapping(_) => {
Expand Down
124 changes: 71 additions & 53 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common_service::observer_manager::{ObserverState, SubscribeFrontend};
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::{FragmentParallelUnitMapping, SubscribeResponse};
use tokio::sync::watch::Sender;
Expand Down Expand Up @@ -49,14 +50,7 @@ impl ObserverState for FrontendObserverNode {
};

match info.to_owned() {
Info::Database(_)
| Info::Schema(_)
| Info::Table(_)
| Info::Source(_)
| Info::Index(_)
| Info::Sink(_)
| Info::Function(_)
| Info::View(_) => {
Info::Database(_) | Info::Schema(_) | Info::RelationGroup(_) => {
self.handle_catalog_notification(resp);
}
Info::Node(node) => {
Expand Down Expand Up @@ -193,52 +187,76 @@ impl FrontendObserverNode {
Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id),
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::Table(table) => match resp.operation() {
Operation::Add => catalog_guard.create_table(table),
Operation::Delete => {
catalog_guard.drop_table(table.database_id, table.schema_id, table.id.into())
}
Operation::Update => catalog_guard.update_table(table),
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::Source(source) => match resp.operation() {
Operation::Add => catalog_guard.create_source(source),
Operation::Delete => {
catalog_guard.drop_source(source.database_id, source.schema_id, source.id)
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::Sink(sink) => match resp.operation() {
Operation::Add => catalog_guard.create_sink(sink),
Operation::Delete => {
catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::Index(index) => match resp.operation() {
Operation::Add => catalog_guard.create_index(index),
Operation::Delete => {
catalog_guard.drop_index(index.database_id, index.schema_id, index.id.into())
}
Operation::Update => catalog_guard.update_index(index),
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::View(view) => match resp.operation() {
Operation::Add => catalog_guard.create_view(view),
Operation::Delete => {
catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
Info::RelationGroup(relation_group) => {
for relation in &relation_group.relations {
let Some(relation) = relation.relation_info.as_ref() else {
continue;
};
match relation {
RelationInfo::Table(table) => match resp.operation() {
Operation::Add => catalog_guard.create_table(table),
Operation::Delete => catalog_guard.drop_table(
table.database_id,
table.schema_id,
table.id.into(),
),
Operation::Update => {
let old_table =
catalog_guard.get_table_by_id(&table.id.into()).unwrap();
catalog_guard.update_table(table);
assert!(old_table.fragment_id != table.fragment_id);
// FIXME: the frontend node delete its fragment for the update
// operation by itself.
self.worker_node_manager
.remove_fragment_mapping(&old_table.fragment_id);
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Source(source) => match resp.operation() {
Operation::Add => catalog_guard.create_source(source),
Operation::Delete => catalog_guard.drop_source(
source.database_id,
source.schema_id,
source.id,
),
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Sink(sink) => match resp.operation() {
Operation::Add => catalog_guard.create_sink(sink),
Operation::Delete => {
catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Index(index) => match resp.operation() {
Operation::Add => catalog_guard.create_index(index),
Operation::Delete => catalog_guard.drop_index(
index.database_id,
index.schema_id,
index.id.into(),
),
Operation::Update => catalog_guard.update_index(index),
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::View(view) => match resp.operation() {
Operation::Add => catalog_guard.create_view(view),
Operation::Delete => {
catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Function(function) => match resp.operation() {
Operation::Add => catalog_guard.create_function(function),
Operation::Delete => catalog_guard.drop_function(
function.database_id,
function.schema_id,
function.id.into(),
),
_ => panic!("receive an unsupported notify {:?}", resp),
},
}
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::Function(function) => match resp.operation() {
Operation::Add => catalog_guard.create_function(function),
Operation::Delete => catalog_guard.drop_function(
function.database_id,
function.schema_id,
function.id.into(),
),
_ => panic!("receive an unsupported notify {:?}", resp),
},
}
_ => unreachable!(),
}
assert!(
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ use risingwave_hummock_sdk::table_stats::{
use risingwave_pb::catalog::Table;
use risingwave_pb::hummock::version_update_payload::Payload;
use risingwave_pb::hummock::PbCompactionGroupInfo;
use risingwave_pb::meta::relation::RelationInfo;

/// Acquire write lock of the lock with `lock_name`.
/// The macro will use macro `function_name` to get the name of the function of method that calls
Expand Down Expand Up @@ -1732,11 +1733,11 @@ where
for table in table_catalogs {
self.env
.notification_manager()
.notify_hummock(Operation::Add, Info::Table(table.clone()))
.notify_hummock_relation_info(Operation::Add, RelationInfo::Table(table.clone()))
.await;
self.env
.notification_manager()
.notify_compactor(Operation::Add, Info::Table(table))
.notify_compactor_relation_info(Operation::Add, RelationInfo::Table(table))
.await;
}

Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ where
// FIXME: we use a dummy table ID for new table fragments, so we can drop the old fragments
// with the real table ID, then replace the dummy table ID with the real table ID. This is a
// workaround for not having the version info in the fragment manager.
#[allow(unused_variables)]
let old_table_fragment = table_fragments
.remove(table_id)
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;
Expand Down Expand Up @@ -344,8 +345,10 @@ where
// Commit changes and notify about the changes.
commit_meta!(self, table_fragments)?;

self.notify_fragment_mapping(&old_table_fragment, Operation::Delete)
.await;
// FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
// catalog and need to access the old fragment. Let frontend nodes delete the old fragment
// when they receive table catalog change. self.notify_fragment_mapping(&
// old_table_fragment, Operation::Delete) .await;
self.notify_fragment_mapping(&table_fragment, Operation::Add)
.await;

Expand Down
Loading

0 comments on commit 7302424

Please sign in to comment.