From 2561ec6f2a7be92766d3cd9b30dc6113639c59b4 Mon Sep 17 00:00:00 2001 From: tabversion Date: Thu, 5 Dec 2024 16:01:19 +0800 Subject: [PATCH 1/7] source --- src/frontend/src/catalog/source_catalog.rs | 4 +- .../system_catalog/rw_catalog/rw_sources.rs | 114 +++++++++++++----- 2 files changed, 84 insertions(+), 34 deletions(-) diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index f453a8920450..86cefe03c149 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -105,7 +105,7 @@ impl From<&PbSource> for SourceCatalog { .into_iter() .map(Into::into) .collect(); - let options_with_secrets = + let connector_props_with_secrets = WithOptionsSecResolved::new(prost.with_properties.clone(), prost.secret_refs.clone()); let columns = prost_columns.into_iter().map(ColumnCatalog::from).collect(); let row_id_index = prost.row_id_index.map(|idx| idx as _); @@ -131,7 +131,7 @@ impl From<&PbSource> for SourceCatalog { owner, info: prost.info.clone().unwrap(), row_id_index, - with_properties: options_with_secrets, + with_properties: connector_props_with_secrets, watermark_descs, associated_table_id: associated_table_id.map(|x| x.into()), definition: prost.definition.clone(), diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs index bdcfe355057a..73d69fc07d3e 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs @@ -12,13 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::catalog::SecretId; use risingwave_common::types::{Fields, Timestamptz}; use risingwave_frontend_macro::system_catalog; use risingwave_pb::user::grant_privilege::Object; +use serde_json::{json, Map as JsonMap, Value}; +use crate::catalog::schema_catalog::SchemaCatalog; use crate::catalog::system_catalog::{get_acl_items, SysCatalogReaderImpl}; use crate::error::Result; use crate::handler::create_source::UPSTREAM_SOURCE_KEY; +use crate::WithOptionsSecResolved; #[derive(Fields)] struct RwSource { @@ -41,6 +45,10 @@ struct RwSource { initialized_at_cluster_version: Option, created_at_cluster_version: Option, is_shared: bool, + // connection_props in json format + connection_props: String, + // format-encode options in json format + format_encode_options: String, } #[system_catalog(table, "rw_catalog.rw_sources")] @@ -53,39 +61,81 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result> Ok(schemas .flat_map(|schema| { - schema.iter_source().map(|source| RwSource { - id: source.id as i32, - name: source.name.clone(), - schema_id: schema.id() as i32, - owner: source.owner as i32, - connector: source - .with_properties - .get(UPSTREAM_SOURCE_KEY) - .cloned() - .unwrap_or("".to_string()) - .to_uppercase(), - columns: source.columns.iter().map(|c| c.name().into()).collect(), - format: source - .info - .get_format() - .ok() - .map(|format| format.as_str_name().into()), - row_encode: source - .info - .get_row_encode() - .ok() - .map(|row_encode| row_encode.as_str_name().into()), - append_only: source.append_only, - associated_table_id: source.associated_table_id.map(|id| id.table_id as i32), - connection_id: source.connection_id.map(|id| id as i32), - definition: source.create_sql(), - acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map), - initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()), - created_at: source.created_at_epoch.map(|e| e.as_timestamptz()), - initialized_at_cluster_version: source.initialized_at_cluster_version.clone(), - created_at_cluster_version: source.created_at_cluster_version.clone(), - is_shared: source.info.is_shared(), + schema.iter_source().map(|source| { + let format_encode_props_with_secrets = WithOptionsSecResolved::new( + source.info.format_encode_options.clone(), + source.info.format_encode_secret_refs.clone(), + ); + RwSource { + id: source.id as i32, + name: source.name.clone(), + schema_id: schema.id() as i32, + owner: source.owner as i32, + connector: source + .with_properties + .get(UPSTREAM_SOURCE_KEY) + .cloned() + .unwrap_or("".to_string()) + .to_uppercase(), + columns: source.columns.iter().map(|c| c.name().into()).collect(), + format: source + .info + .get_format() + .ok() + .map(|format| format.as_str_name().into()), + row_encode: source + .info + .get_row_encode() + .ok() + .map(|row_encode| row_encode.as_str_name().into()), + append_only: source.append_only, + associated_table_id: source.associated_table_id.map(|id| id.table_id as i32), + connection_id: source.connection_id.map(|id| id as i32), + definition: source.create_sql(), + acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map), + initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()), + created_at: source.created_at_epoch.map(|e| e.as_timestamptz()), + initialized_at_cluster_version: source.initialized_at_cluster_version.clone(), + created_at_cluster_version: source.created_at_cluster_version.clone(), + is_shared: source.info.is_shared(), + + connection_props: handle_props_with_secret( + schema, + source.with_properties.clone(), + ), + format_encode_options: handle_props_with_secret( + schema, + format_encode_props_with_secrets, + ), + } }) }) .collect()) } + +fn handle_props_with_secret( + schema: &SchemaCatalog, + props_with_secret: WithOptionsSecResolved, +) -> String { + let (inner, secret_ref) = props_with_secret.into_parts(); + // if not secret, {"some key": {"type": "plaintext", "value": "xxxx"}} + // if secret, {"some key": {"type": "secret", "value": {"value": ""}}} + let mut result: JsonMap = JsonMap::new(); + + for (k, v) in inner { + result.insert(k, json!({"type": "plaintext", "value": v})); + } + for (k, v) in secret_ref { + let secret_name = schema + .get_secret_by_id(&SecretId(v.secret_id)) + .unwrap() + .name + .clone(); + result.insert( + k, + json!({"type": "secret", "value": {"value": secret_name}}), + ); + } + + serde_json::to_string(&Value::Object(result)).unwrap() +} From 7c48e61985d450850625095bad4b07854e201875 Mon Sep 17 00:00:00 2001 From: tabversion Date: Thu, 5 Dec 2024 16:05:48 +0800 Subject: [PATCH 2/7] rename --- .../src/catalog/system_catalog/rw_catalog/rw_sources.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs index 73d69fc07d3e..b40b0f4071f1 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs @@ -45,8 +45,8 @@ struct RwSource { initialized_at_cluster_version: Option, created_at_cluster_version: Option, is_shared: bool, - // connection_props in json format - connection_props: String, + // connector properties in json format + connector_props: String, // format-encode options in json format format_encode_options: String, } @@ -99,7 +99,7 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result> created_at_cluster_version: source.created_at_cluster_version.clone(), is_shared: source.info.is_shared(), - connection_props: handle_props_with_secret( + connector_props: handle_props_with_secret( schema, source.with_properties.clone(), ), From 46320e4348fc782185c6854dcbc337c57017cfab Mon Sep 17 00:00:00 2001 From: tabversion Date: Thu, 5 Dec 2024 16:19:14 +0800 Subject: [PATCH 3/7] sink --- .../system_catalog/rw_catalog/rw_sinks.rs | 76 +++++++++++++------ .../system_catalog/rw_catalog/rw_sources.rs | 2 +- 2 files changed, 53 insertions(+), 25 deletions(-) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs index e382a5b7dfaf..3bd4baff0568 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs @@ -13,9 +13,11 @@ // limitations under the License. use risingwave_common::types::{Fields, Timestamptz}; +use risingwave_connector::WithOptionsSecResolved; use risingwave_frontend_macro::system_catalog; use risingwave_pb::user::grant_privilege::Object; +use crate::catalog::system_catalog::rw_catalog::rw_sources::handle_props_with_secret; use crate::catalog::system_catalog::{get_acl_items, SysCatalogReaderImpl}; use crate::error::Result; use crate::handler::create_source::UPSTREAM_SOURCE_KEY; @@ -36,6 +38,11 @@ struct RwSink { created_at: Option, initialized_at_cluster_version: Option, created_at_cluster_version: Option, + + // connector properties in json format + connector_props: String, + // format and encode properties in json format + format_encode_options: String, } #[system_catalog(table, "rw_catalog.rw_sinks")] @@ -48,30 +55,51 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result> { Ok(schemas .flat_map(|schema| { - schema.iter_sink().map(|sink| RwSink { - id: sink.id.sink_id as i32, - name: sink.name.clone(), - schema_id: schema.id() as i32, - owner: sink.owner.user_id as i32, - connector: sink - .properties - .get(UPSTREAM_SOURCE_KEY) - .cloned() - .unwrap_or("".to_string()) - .to_uppercase(), - sink_type: sink.sink_type.to_proto().as_str_name().into(), - connection_id: sink.connection_id.map(|id| id.connection_id() as i32), - definition: sink.create_sql(), - acl: get_acl_items( - &Object::SinkId(sink.id.sink_id), - false, - &users, - username_map, - ), - initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()), - created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()), - initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(), - created_at_cluster_version: sink.created_at_cluster_version.clone(), + schema.iter_sink().map(|sink| { + let connector_props = handle_props_with_secret( + schema, + WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()), + ); + let format_encode_options = sink + .format_desc + .as_ref() + .map(|desc| { + handle_props_with_secret( + schema, + WithOptionsSecResolved::new( + desc.options.clone(), + desc.secret_refs.clone(), + ), + ) + }) + .unwrap_or_default(); + RwSink { + id: sink.id.sink_id as i32, + name: sink.name.clone(), + schema_id: schema.id() as i32, + owner: sink.owner.user_id as i32, + connector: sink + .properties + .get(UPSTREAM_SOURCE_KEY) + .cloned() + .unwrap_or("".to_string()) + .to_uppercase(), + sink_type: sink.sink_type.to_proto().as_str_name().into(), + connection_id: sink.connection_id.map(|id| id.connection_id() as i32), + definition: sink.create_sql(), + acl: get_acl_items( + &Object::SinkId(sink.id.sink_id), + false, + &users, + username_map, + ), + initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()), + created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()), + initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(), + created_at_cluster_version: sink.created_at_cluster_version.clone(), + connector_props, + format_encode_options, + } }) }) .collect()) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs index b40b0f4071f1..f1d79fddf111 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs @@ -113,7 +113,7 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result> .collect()) } -fn handle_props_with_secret( +pub fn handle_props_with_secret( schema: &SchemaCatalog, props_with_secret: WithOptionsSecResolved, ) -> String { From 7e71244a93b542b38c1d1aeb9b9a23d5b35ef27f Mon Sep 17 00:00:00 2001 From: tabversion Date: Thu, 5 Dec 2024 16:27:25 +0800 Subject: [PATCH 4/7] rename --- .../src/catalog/system_catalog/rw_catalog/rw_sinks.rs | 6 +++--- .../src/catalog/system_catalog/rw_catalog/rw_sources.rs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs index 3bd4baff0568..5e38c26238cf 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs @@ -17,7 +17,7 @@ use risingwave_connector::WithOptionsSecResolved; use risingwave_frontend_macro::system_catalog; use risingwave_pb::user::grant_privilege::Object; -use crate::catalog::system_catalog::rw_catalog::rw_sources::handle_props_with_secret; +use crate::catalog::system_catalog::rw_catalog::rw_sources::print_props_with_secret; use crate::catalog::system_catalog::{get_acl_items, SysCatalogReaderImpl}; use crate::error::Result; use crate::handler::create_source::UPSTREAM_SOURCE_KEY; @@ -56,7 +56,7 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result> { Ok(schemas .flat_map(|schema| { schema.iter_sink().map(|sink| { - let connector_props = handle_props_with_secret( + let connector_props = print_props_with_secret( schema, WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()), ); @@ -64,7 +64,7 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result> { .format_desc .as_ref() .map(|desc| { - handle_props_with_secret( + print_props_with_secret( schema, WithOptionsSecResolved::new( desc.options.clone(), diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs index f1d79fddf111..d5933a875830 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs @@ -99,11 +99,11 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result> created_at_cluster_version: source.created_at_cluster_version.clone(), is_shared: source.info.is_shared(), - connector_props: handle_props_with_secret( + connector_props: print_props_with_secret( schema, source.with_properties.clone(), ), - format_encode_options: handle_props_with_secret( + format_encode_options: print_props_with_secret( schema, format_encode_props_with_secrets, ), @@ -113,7 +113,7 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result> .collect()) } -pub fn handle_props_with_secret( +pub fn print_props_with_secret( schema: &SchemaCatalog, props_with_secret: WithOptionsSecResolved, ) -> String { From a6f27ce246f4eec9ba5ff80f08892847e72ce32e Mon Sep 17 00:00:00 2001 From: tabversion Date: Thu, 5 Dec 2024 17:06:39 +0800 Subject: [PATCH 5/7] fix --- .../system_catalog/rw_catalog/rw_sinks.rs | 12 ++++++----- .../system_catalog/rw_catalog/rw_sources.rs | 20 ++++++++++--------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs index 5e38c26238cf..c2742068f568 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::{Fields, Timestamptz}; +use risingwave_common::types::{Fields, JsonbVal, Timestamptz}; use risingwave_connector::WithOptionsSecResolved; use risingwave_frontend_macro::system_catalog; use risingwave_pb::user::grant_privilege::Object; @@ -40,9 +40,9 @@ struct RwSink { created_at_cluster_version: Option, // connector properties in json format - connector_props: String, + connector_props: JsonbVal, // format and encode properties in json format - format_encode_options: String, + format_encode_options: JsonbVal, } #[system_catalog(table, "rw_catalog.rw_sinks")] @@ -59,7 +59,8 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result> { let connector_props = print_props_with_secret( schema, WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()), - ); + ) + .into(); let format_encode_options = sink .format_desc .as_ref() @@ -72,7 +73,8 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result> { ), ) }) - .unwrap_or_default(); + .unwrap_or_else(jsonbb::Value::null) + .into(); RwSink { id: sink.id.sink_id as i32, name: sink.name.clone(), diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs index d5933a875830..4d4b413d6772 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs @@ -13,10 +13,10 @@ // limitations under the License. use risingwave_common::catalog::SecretId; -use risingwave_common::types::{Fields, Timestamptz}; +use risingwave_common::types::{Fields, JsonbVal, Timestamptz}; use risingwave_frontend_macro::system_catalog; use risingwave_pb::user::grant_privilege::Object; -use serde_json::{json, Map as JsonMap, Value}; +use serde_json::{json, Map as JsonMap}; use crate::catalog::schema_catalog::SchemaCatalog; use crate::catalog::system_catalog::{get_acl_items, SysCatalogReaderImpl}; @@ -46,9 +46,9 @@ struct RwSource { created_at_cluster_version: Option, is_shared: bool, // connector properties in json format - connector_props: String, + connector_props: JsonbVal, // format-encode options in json format - format_encode_options: String, + format_encode_options: JsonbVal, } #[system_catalog(table, "rw_catalog.rw_sources")] @@ -102,11 +102,13 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result> connector_props: print_props_with_secret( schema, source.with_properties.clone(), - ), + ) + .into(), format_encode_options: print_props_with_secret( schema, format_encode_props_with_secrets, - ), + ) + .into(), } }) }) @@ -116,11 +118,11 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result> pub fn print_props_with_secret( schema: &SchemaCatalog, props_with_secret: WithOptionsSecResolved, -) -> String { +) -> jsonbb::Value { let (inner, secret_ref) = props_with_secret.into_parts(); // if not secret, {"some key": {"type": "plaintext", "value": "xxxx"}} // if secret, {"some key": {"type": "secret", "value": {"value": ""}}} - let mut result: JsonMap = JsonMap::new(); + let mut result: JsonMap = JsonMap::new(); for (k, v) in inner { result.insert(k, json!({"type": "plaintext", "value": v})); @@ -137,5 +139,5 @@ pub fn print_props_with_secret( ); } - serde_json::to_string(&Value::Object(result)).unwrap() + jsonbb::Value::from(serde_json::Value::Object(result)) } From 54d9d9a946287c37d674488562d67ab62760bf4b Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 10 Dec 2024 16:35:05 +0800 Subject: [PATCH 6/7] fix planner test --- src/frontend/planner_test/tests/testdata/output/subquery.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml index 5ccdb0050179..a87b116a658c 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml @@ -241,11 +241,11 @@ │ │ │ │ │ │ │ └─LogicalProject { exprs: [rw_system_tables.id, rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] } │ │ │ │ │ │ │ └─LogicalSysScan { table: rw_system_tables, columns: [rw_system_tables.id, rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] } │ │ │ │ │ │ └─LogicalProject { exprs: [rw_sources.id, rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner, rw_sources.definition, rw_sources.acl] } - │ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.associated_table_id, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version, rw_sources.is_shared] } + │ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.associated_table_id, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version, rw_sources.is_shared, rw_sources.connector_props, rw_sources.format_encode_options] } │ │ │ │ │ └─LogicalProject { exprs: [rw_indexes.id, rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl] } │ │ │ │ │ └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.key_columns, rw_indexes.include_columns, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at, rw_indexes.initialized_at_cluster_version, rw_indexes.created_at_cluster_version] } │ │ │ │ └─LogicalProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.definition, rw_sinks.acl] } - │ │ │ │ └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at, rw_sinks.initialized_at_cluster_version, rw_sinks.created_at_cluster_version] } + │ │ │ │ └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at, rw_sinks.initialized_at_cluster_version, rw_sinks.created_at_cluster_version, rw_sinks.connector_props, rw_sinks.format_encode_options] } │ │ │ └─LogicalProject { exprs: [rw_subscriptions.id, rw_subscriptions.name, 'subscription':Varchar, rw_subscriptions.schema_id, rw_subscriptions.owner, rw_subscriptions.definition, rw_subscriptions.acl] } │ │ │ └─LogicalSysScan { table: rw_subscriptions, columns: [rw_subscriptions.id, rw_subscriptions.name, rw_subscriptions.schema_id, rw_subscriptions.owner, rw_subscriptions.definition, rw_subscriptions.acl, rw_subscriptions.initialized_at, rw_subscriptions.created_at, rw_subscriptions.initialized_at_cluster_version, rw_subscriptions.created_at_cluster_version] } │ │ └─LogicalProject { exprs: [rw_materialized_views.id, rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl] } From eed83644d14cbe801ee6af26284465c8d702235c Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 10 Dec 2024 19:15:51 +0800 Subject: [PATCH 7/7] rename --- .../src/catalog/system_catalog/rw_catalog/rw_sinks.rs | 6 +++--- .../src/catalog/system_catalog/rw_catalog/rw_sources.rs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs index c2742068f568..609850b9951a 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs @@ -17,7 +17,7 @@ use risingwave_connector::WithOptionsSecResolved; use risingwave_frontend_macro::system_catalog; use risingwave_pb::user::grant_privilege::Object; -use crate::catalog::system_catalog::rw_catalog::rw_sources::print_props_with_secret; +use crate::catalog::system_catalog::rw_catalog::rw_sources::serialize_props_with_secret; use crate::catalog::system_catalog::{get_acl_items, SysCatalogReaderImpl}; use crate::error::Result; use crate::handler::create_source::UPSTREAM_SOURCE_KEY; @@ -56,7 +56,7 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result> { Ok(schemas .flat_map(|schema| { schema.iter_sink().map(|sink| { - let connector_props = print_props_with_secret( + let connector_props = serialize_props_with_secret( schema, WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()), ) @@ -65,7 +65,7 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result> { .format_desc .as_ref() .map(|desc| { - print_props_with_secret( + serialize_props_with_secret( schema, WithOptionsSecResolved::new( desc.options.clone(), diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs index 4d4b413d6772..303ce645fbd0 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs @@ -99,12 +99,12 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result> created_at_cluster_version: source.created_at_cluster_version.clone(), is_shared: source.info.is_shared(), - connector_props: print_props_with_secret( + connector_props: serialize_props_with_secret( schema, source.with_properties.clone(), ) .into(), - format_encode_options: print_props_with_secret( + format_encode_options: serialize_props_with_secret( schema, format_encode_props_with_secrets, ) @@ -115,7 +115,7 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result> .collect()) } -pub fn print_props_with_secret( +pub fn serialize_props_with_secret( schema: &SchemaCatalog, props_with_secret: WithOptionsSecResolved, ) -> jsonbb::Value {