diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml index 5ccdb00501796..a87b116a658c2 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml @@ -241,11 +241,11 @@ │ │ │ │ │ │ │ └─LogicalProject { exprs: [rw_system_tables.id, rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] } │ │ │ │ │ │ │ └─LogicalSysScan { table: rw_system_tables, columns: [rw_system_tables.id, rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] } │ │ │ │ │ │ └─LogicalProject { exprs: [rw_sources.id, rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner, rw_sources.definition, rw_sources.acl] } - │ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.associated_table_id, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version, rw_sources.is_shared] } + │ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.associated_table_id, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version, rw_sources.is_shared, rw_sources.connector_props, rw_sources.format_encode_options] } │ │ │ │ │ └─LogicalProject { exprs: [rw_indexes.id, rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl] } │ │ │ │ │ └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.key_columns, rw_indexes.include_columns, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at, rw_indexes.initialized_at_cluster_version, rw_indexes.created_at_cluster_version] } │ │ │ │ └─LogicalProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.definition, rw_sinks.acl] } - │ │ │ │ └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at, rw_sinks.initialized_at_cluster_version, rw_sinks.created_at_cluster_version] } + │ │ │ │ └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at, rw_sinks.initialized_at_cluster_version, rw_sinks.created_at_cluster_version, rw_sinks.connector_props, rw_sinks.format_encode_options] } │ │ │ └─LogicalProject { exprs: [rw_subscriptions.id, rw_subscriptions.name, 'subscription':Varchar, rw_subscriptions.schema_id, rw_subscriptions.owner, rw_subscriptions.definition, rw_subscriptions.acl] } │ │ │ └─LogicalSysScan { table: rw_subscriptions, columns: [rw_subscriptions.id, rw_subscriptions.name, rw_subscriptions.schema_id, rw_subscriptions.owner, rw_subscriptions.definition, rw_subscriptions.acl, rw_subscriptions.initialized_at, rw_subscriptions.created_at, rw_subscriptions.initialized_at_cluster_version, rw_subscriptions.created_at_cluster_version] } │ │ └─LogicalProject { exprs: [rw_materialized_views.id, rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl] } diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index f453a89204501..86cefe03c1495 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -105,7 +105,7 @@ impl From<&PbSource> for SourceCatalog { .into_iter() .map(Into::into) .collect(); - let options_with_secrets = + let connector_props_with_secrets = WithOptionsSecResolved::new(prost.with_properties.clone(), prost.secret_refs.clone()); let columns = prost_columns.into_iter().map(ColumnCatalog::from).collect(); let row_id_index = prost.row_id_index.map(|idx| idx as _); @@ -131,7 +131,7 @@ impl From<&PbSource> for SourceCatalog { owner, info: prost.info.clone().unwrap(), row_id_index, - with_properties: options_with_secrets, + with_properties: connector_props_with_secrets, watermark_descs, associated_table_id: associated_table_id.map(|x| x.into()), definition: prost.definition.clone(), diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs index e382a5b7dfaf3..609850b9951a1 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::{Fields, Timestamptz}; +use risingwave_common::types::{Fields, JsonbVal, Timestamptz}; +use risingwave_connector::WithOptionsSecResolved; use risingwave_frontend_macro::system_catalog; use risingwave_pb::user::grant_privilege::Object; +use crate::catalog::system_catalog::rw_catalog::rw_sources::serialize_props_with_secret; use crate::catalog::system_catalog::{get_acl_items, SysCatalogReaderImpl}; use crate::error::Result; use crate::handler::create_source::UPSTREAM_SOURCE_KEY; @@ -36,6 +38,11 @@ struct RwSink { created_at: Option, initialized_at_cluster_version: Option, created_at_cluster_version: Option, + + // connector properties in json format + connector_props: JsonbVal, + // format and encode properties in json format + format_encode_options: JsonbVal, } #[system_catalog(table, "rw_catalog.rw_sinks")] @@ -48,30 +55,53 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result> { Ok(schemas .flat_map(|schema| { - schema.iter_sink().map(|sink| RwSink { - id: sink.id.sink_id as i32, - name: sink.name.clone(), - schema_id: schema.id() as i32, - owner: sink.owner.user_id as i32, - connector: sink - .properties - .get(UPSTREAM_SOURCE_KEY) - .cloned() - .unwrap_or("".to_string()) - .to_uppercase(), - sink_type: sink.sink_type.to_proto().as_str_name().into(), - connection_id: sink.connection_id.map(|id| id.connection_id() as i32), - definition: sink.create_sql(), - acl: get_acl_items( - &Object::SinkId(sink.id.sink_id), - false, - &users, - username_map, - ), - initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()), - created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()), - initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(), - created_at_cluster_version: sink.created_at_cluster_version.clone(), + schema.iter_sink().map(|sink| { + let connector_props = serialize_props_with_secret( + schema, + WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()), + ) + .into(); + let format_encode_options = sink + .format_desc + .as_ref() + .map(|desc| { + serialize_props_with_secret( + schema, + WithOptionsSecResolved::new( + desc.options.clone(), + desc.secret_refs.clone(), + ), + ) + }) + .unwrap_or_else(jsonbb::Value::null) + .into(); + RwSink { + id: sink.id.sink_id as i32, + name: sink.name.clone(), + schema_id: schema.id() as i32, + owner: sink.owner.user_id as i32, + connector: sink + .properties + .get(UPSTREAM_SOURCE_KEY) + .cloned() + .unwrap_or("".to_string()) + .to_uppercase(), + sink_type: sink.sink_type.to_proto().as_str_name().into(), + connection_id: sink.connection_id.map(|id| id.connection_id() as i32), + definition: sink.create_sql(), + acl: get_acl_items( + &Object::SinkId(sink.id.sink_id), + false, + &users, + username_map, + ), + initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()), + created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()), + initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(), + created_at_cluster_version: sink.created_at_cluster_version.clone(), + connector_props, + format_encode_options, + } }) }) .collect()) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs index bdcfe355057a4..303ce645fbd0f 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs @@ -12,13 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::{Fields, Timestamptz}; +use risingwave_common::catalog::SecretId; +use risingwave_common::types::{Fields, JsonbVal, Timestamptz}; use risingwave_frontend_macro::system_catalog; use risingwave_pb::user::grant_privilege::Object; +use serde_json::{json, Map as JsonMap}; +use crate::catalog::schema_catalog::SchemaCatalog; use crate::catalog::system_catalog::{get_acl_items, SysCatalogReaderImpl}; use crate::error::Result; use crate::handler::create_source::UPSTREAM_SOURCE_KEY; +use crate::WithOptionsSecResolved; #[derive(Fields)] struct RwSource { @@ -41,6 +45,10 @@ struct RwSource { initialized_at_cluster_version: Option, created_at_cluster_version: Option, is_shared: bool, + // connector properties in json format + connector_props: JsonbVal, + // format-encode options in json format + format_encode_options: JsonbVal, } #[system_catalog(table, "rw_catalog.rw_sources")] @@ -53,39 +61,83 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result> Ok(schemas .flat_map(|schema| { - schema.iter_source().map(|source| RwSource { - id: source.id as i32, - name: source.name.clone(), - schema_id: schema.id() as i32, - owner: source.owner as i32, - connector: source - .with_properties - .get(UPSTREAM_SOURCE_KEY) - .cloned() - .unwrap_or("".to_string()) - .to_uppercase(), - columns: source.columns.iter().map(|c| c.name().into()).collect(), - format: source - .info - .get_format() - .ok() - .map(|format| format.as_str_name().into()), - row_encode: source - .info - .get_row_encode() - .ok() - .map(|row_encode| row_encode.as_str_name().into()), - append_only: source.append_only, - associated_table_id: source.associated_table_id.map(|id| id.table_id as i32), - connection_id: source.connection_id.map(|id| id as i32), - definition: source.create_sql(), - acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map), - initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()), - created_at: source.created_at_epoch.map(|e| e.as_timestamptz()), - initialized_at_cluster_version: source.initialized_at_cluster_version.clone(), - created_at_cluster_version: source.created_at_cluster_version.clone(), - is_shared: source.info.is_shared(), + schema.iter_source().map(|source| { + let format_encode_props_with_secrets = WithOptionsSecResolved::new( + source.info.format_encode_options.clone(), + source.info.format_encode_secret_refs.clone(), + ); + RwSource { + id: source.id as i32, + name: source.name.clone(), + schema_id: schema.id() as i32, + owner: source.owner as i32, + connector: source + .with_properties + .get(UPSTREAM_SOURCE_KEY) + .cloned() + .unwrap_or("".to_string()) + .to_uppercase(), + columns: source.columns.iter().map(|c| c.name().into()).collect(), + format: source + .info + .get_format() + .ok() + .map(|format| format.as_str_name().into()), + row_encode: source + .info + .get_row_encode() + .ok() + .map(|row_encode| row_encode.as_str_name().into()), + append_only: source.append_only, + associated_table_id: source.associated_table_id.map(|id| id.table_id as i32), + connection_id: source.connection_id.map(|id| id as i32), + definition: source.create_sql(), + acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map), + initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()), + created_at: source.created_at_epoch.map(|e| e.as_timestamptz()), + initialized_at_cluster_version: source.initialized_at_cluster_version.clone(), + created_at_cluster_version: source.created_at_cluster_version.clone(), + is_shared: source.info.is_shared(), + + connector_props: serialize_props_with_secret( + schema, + source.with_properties.clone(), + ) + .into(), + format_encode_options: serialize_props_with_secret( + schema, + format_encode_props_with_secrets, + ) + .into(), + } }) }) .collect()) } + +pub fn serialize_props_with_secret( + schema: &SchemaCatalog, + props_with_secret: WithOptionsSecResolved, +) -> jsonbb::Value { + let (inner, secret_ref) = props_with_secret.into_parts(); + // if not secret, {"some key": {"type": "plaintext", "value": "xxxx"}} + // if secret, {"some key": {"type": "secret", "value": {"value": ""}}} + let mut result: JsonMap = JsonMap::new(); + + for (k, v) in inner { + result.insert(k, json!({"type": "plaintext", "value": v})); + } + for (k, v) in secret_ref { + let secret_name = schema + .get_secret_by_id(&SecretId(v.secret_id)) + .unwrap() + .name + .clone(); + result.insert( + k, + json!({"type": "secret", "value": {"value": secret_name}}), + ); + } + + jsonbb::Value::from(serde_json::Value::Object(result)) +}