Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add connector props and format options in rw_catalog #19689

Merged
merged 11 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/subquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,11 @@
│ │ │ │ │ │ │ └─LogicalProject { exprs: [rw_system_tables.id, rw_system_tables.name, 'system table':Varchar, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] }
│ │ │ │ │ │ │ └─LogicalSysScan { table: rw_system_tables, columns: [rw_system_tables.id, rw_system_tables.name, rw_system_tables.schema_id, rw_system_tables.owner, rw_system_tables.definition, rw_system_tables.acl] }
│ │ │ │ │ │ └─LogicalProject { exprs: [rw_sources.id, rw_sources.name, 'source':Varchar, rw_sources.schema_id, rw_sources.owner, rw_sources.definition, rw_sources.acl] }
│ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.associated_table_id, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version, rw_sources.is_shared] }
│ │ │ │ │ │ └─LogicalSysScan { table: rw_sources, columns: [rw_sources.id, rw_sources.name, rw_sources.schema_id, rw_sources.owner, rw_sources.connector, rw_sources.columns, rw_sources.format, rw_sources.row_encode, rw_sources.append_only, rw_sources.associated_table_id, rw_sources.connection_id, rw_sources.definition, rw_sources.acl, rw_sources.initialized_at, rw_sources.created_at, rw_sources.initialized_at_cluster_version, rw_sources.created_at_cluster_version, rw_sources.is_shared, rw_sources.connector_props, rw_sources.format_encode_options] }
│ │ │ │ │ └─LogicalProject { exprs: [rw_indexes.id, rw_indexes.name, 'index':Varchar, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl] }
│ │ │ │ │ └─LogicalSysScan { table: rw_indexes, columns: [rw_indexes.id, rw_indexes.name, rw_indexes.primary_table_id, rw_indexes.key_columns, rw_indexes.include_columns, rw_indexes.schema_id, rw_indexes.owner, rw_indexes.definition, rw_indexes.acl, rw_indexes.initialized_at, rw_indexes.created_at, rw_indexes.initialized_at_cluster_version, rw_indexes.created_at_cluster_version] }
│ │ │ │ └─LogicalProject { exprs: [rw_sinks.id, rw_sinks.name, 'sink':Varchar, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.definition, rw_sinks.acl] }
│ │ │ │ └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at, rw_sinks.initialized_at_cluster_version, rw_sinks.created_at_cluster_version] }
│ │ │ │ └─LogicalSysScan { table: rw_sinks, columns: [rw_sinks.id, rw_sinks.name, rw_sinks.schema_id, rw_sinks.owner, rw_sinks.connector, rw_sinks.sink_type, rw_sinks.connection_id, rw_sinks.definition, rw_sinks.acl, rw_sinks.initialized_at, rw_sinks.created_at, rw_sinks.initialized_at_cluster_version, rw_sinks.created_at_cluster_version, rw_sinks.connector_props, rw_sinks.format_encode_options] }
│ │ │ └─LogicalProject { exprs: [rw_subscriptions.id, rw_subscriptions.name, 'subscription':Varchar, rw_subscriptions.schema_id, rw_subscriptions.owner, rw_subscriptions.definition, rw_subscriptions.acl] }
│ │ │ └─LogicalSysScan { table: rw_subscriptions, columns: [rw_subscriptions.id, rw_subscriptions.name, rw_subscriptions.schema_id, rw_subscriptions.owner, rw_subscriptions.definition, rw_subscriptions.acl, rw_subscriptions.initialized_at, rw_subscriptions.created_at, rw_subscriptions.initialized_at_cluster_version, rw_subscriptions.created_at_cluster_version] }
│ │ └─LogicalProject { exprs: [rw_materialized_views.id, rw_materialized_views.name, 'materialized view':Varchar, rw_materialized_views.schema_id, rw_materialized_views.owner, rw_materialized_views.definition, rw_materialized_views.acl] }
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/catalog/source_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl From<&PbSource> for SourceCatalog {
.into_iter()
.map(Into::into)
.collect();
let options_with_secrets =
let connector_props_with_secrets =
WithOptionsSecResolved::new(prost.with_properties.clone(), prost.secret_refs.clone());
let columns = prost_columns.into_iter().map(ColumnCatalog::from).collect();
let row_id_index = prost.row_id_index.map(|idx| idx as _);
Expand All @@ -131,7 +131,7 @@ impl From<&PbSource> for SourceCatalog {
owner,
info: prost.info.clone().unwrap(),
row_id_index,
with_properties: options_with_secrets,
with_properties: connector_props_with_secrets,
watermark_descs,
associated_table_id: associated_table_id.map(|x| x.into()),
definition: prost.definition.clone(),
Expand Down
80 changes: 55 additions & 25 deletions src/frontend/src/catalog/system_catalog/rw_catalog/rw_sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{Fields, Timestamptz};
use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
use risingwave_connector::WithOptionsSecResolved;
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::user::grant_privilege::Object;

use crate::catalog::system_catalog::rw_catalog::rw_sources::print_props_with_secret;
use crate::catalog::system_catalog::{get_acl_items, SysCatalogReaderImpl};
use crate::error::Result;
use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
Expand All @@ -36,6 +38,11 @@ struct RwSink {
created_at: Option<Timestamptz>,
initialized_at_cluster_version: Option<String>,
created_at_cluster_version: Option<String>,

// connector properties in json format
connector_props: JsonbVal,
// format and encode properties in json format
format_encode_options: JsonbVal,
}

#[system_catalog(table, "rw_catalog.rw_sinks")]
Expand All @@ -48,30 +55,53 @@ fn read_rw_sinks_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSink>> {

Ok(schemas
.flat_map(|schema| {
schema.iter_sink().map(|sink| RwSink {
id: sink.id.sink_id as i32,
name: sink.name.clone(),
schema_id: schema.id() as i32,
owner: sink.owner.user_id as i32,
connector: sink
.properties
.get(UPSTREAM_SOURCE_KEY)
.cloned()
.unwrap_or("".to_string())
.to_uppercase(),
sink_type: sink.sink_type.to_proto().as_str_name().into(),
connection_id: sink.connection_id.map(|id| id.connection_id() as i32),
definition: sink.create_sql(),
acl: get_acl_items(
&Object::SinkId(sink.id.sink_id),
false,
&users,
username_map,
),
initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(),
created_at_cluster_version: sink.created_at_cluster_version.clone(),
schema.iter_sink().map(|sink| {
let connector_props = print_props_with_secret(
schema,
WithOptionsSecResolved::new(sink.properties.clone(), sink.secret_refs.clone()),
)
.into();
let format_encode_options = sink
.format_desc
.as_ref()
.map(|desc| {
print_props_with_secret(
schema,
WithOptionsSecResolved::new(
desc.options.clone(),
desc.secret_refs.clone(),
),
)
})
.unwrap_or_else(jsonbb::Value::null)
.into();
RwSink {
id: sink.id.sink_id as i32,
name: sink.name.clone(),
schema_id: schema.id() as i32,
owner: sink.owner.user_id as i32,
connector: sink
.properties
.get(UPSTREAM_SOURCE_KEY)
.cloned()
.unwrap_or("".to_string())
.to_uppercase(),
sink_type: sink.sink_type.to_proto().as_str_name().into(),
connection_id: sink.connection_id.map(|id| id.connection_id() as i32),
definition: sink.create_sql(),
acl: get_acl_items(
&Object::SinkId(sink.id.sink_id),
false,
&users,
username_map,
),
initialized_at: sink.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: sink.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: sink.initialized_at_cluster_version.clone(),
created_at_cluster_version: sink.created_at_cluster_version.clone(),
connector_props,
format_encode_options,
}
})
})
.collect())
Expand Down
118 changes: 85 additions & 33 deletions src/frontend/src/catalog/system_catalog/rw_catalog/rw_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{Fields, Timestamptz};
use risingwave_common::catalog::SecretId;
use risingwave_common::types::{Fields, JsonbVal, Timestamptz};
use risingwave_frontend_macro::system_catalog;
use risingwave_pb::user::grant_privilege::Object;
use serde_json::{json, Map as JsonMap};

use crate::catalog::schema_catalog::SchemaCatalog;
use crate::catalog::system_catalog::{get_acl_items, SysCatalogReaderImpl};
use crate::error::Result;
use crate::handler::create_source::UPSTREAM_SOURCE_KEY;
use crate::WithOptionsSecResolved;

#[derive(Fields)]
struct RwSource {
Expand All @@ -41,6 +45,10 @@ struct RwSource {
initialized_at_cluster_version: Option<String>,
created_at_cluster_version: Option<String>,
is_shared: bool,
// connector properties in json format
connector_props: JsonbVal,
// format-encode options in json format
format_encode_options: JsonbVal,
}

#[system_catalog(table, "rw_catalog.rw_sources")]
Expand All @@ -53,39 +61,83 @@ fn read_rw_sources_info(reader: &SysCatalogReaderImpl) -> Result<Vec<RwSource>>

Ok(schemas
.flat_map(|schema| {
schema.iter_source().map(|source| RwSource {
id: source.id as i32,
name: source.name.clone(),
schema_id: schema.id() as i32,
owner: source.owner as i32,
connector: source
.with_properties
.get(UPSTREAM_SOURCE_KEY)
.cloned()
.unwrap_or("".to_string())
.to_uppercase(),
columns: source.columns.iter().map(|c| c.name().into()).collect(),
format: source
.info
.get_format()
.ok()
.map(|format| format.as_str_name().into()),
row_encode: source
.info
.get_row_encode()
.ok()
.map(|row_encode| row_encode.as_str_name().into()),
append_only: source.append_only,
associated_table_id: source.associated_table_id.map(|id| id.table_id as i32),
connection_id: source.connection_id.map(|id| id as i32),
definition: source.create_sql(),
acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map),
initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: source.initialized_at_cluster_version.clone(),
created_at_cluster_version: source.created_at_cluster_version.clone(),
is_shared: source.info.is_shared(),
schema.iter_source().map(|source| {
let format_encode_props_with_secrets = WithOptionsSecResolved::new(
source.info.format_encode_options.clone(),
source.info.format_encode_secret_refs.clone(),
);
RwSource {
id: source.id as i32,
name: source.name.clone(),
schema_id: schema.id() as i32,
owner: source.owner as i32,
connector: source
.with_properties
.get(UPSTREAM_SOURCE_KEY)
.cloned()
.unwrap_or("".to_string())
.to_uppercase(),
columns: source.columns.iter().map(|c| c.name().into()).collect(),
format: source
.info
.get_format()
.ok()
.map(|format| format.as_str_name().into()),
row_encode: source
.info
.get_row_encode()
.ok()
.map(|row_encode| row_encode.as_str_name().into()),
append_only: source.append_only,
associated_table_id: source.associated_table_id.map(|id| id.table_id as i32),
connection_id: source.connection_id.map(|id| id as i32),
definition: source.create_sql(),
acl: get_acl_items(&Object::SourceId(source.id), false, &users, username_map),
initialized_at: source.initialized_at_epoch.map(|e| e.as_timestamptz()),
created_at: source.created_at_epoch.map(|e| e.as_timestamptz()),
initialized_at_cluster_version: source.initialized_at_cluster_version.clone(),
created_at_cluster_version: source.created_at_cluster_version.clone(),
is_shared: source.info.is_shared(),

connector_props: print_props_with_secret(
schema,
source.with_properties.clone(),
)
.into(),
format_encode_options: print_props_with_secret(
schema,
format_encode_props_with_secrets,
)
.into(),
}
})
})
.collect())
}

pub fn print_props_with_secret(
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
schema: &SchemaCatalog,
props_with_secret: WithOptionsSecResolved,
) -> jsonbb::Value {
let (inner, secret_ref) = props_with_secret.into_parts();
// if not secret, {"some key": {"type": "plaintext", "value": "xxxx"}}
// if secret, {"some key": {"type": "secret", "value": {"value": "<secret name>"}}}
let mut result: JsonMap<String, serde_json::Value> = JsonMap::new();

for (k, v) in inner {
result.insert(k, json!({"type": "plaintext", "value": v}));
}
for (k, v) in secret_ref {
let secret_name = schema
.get_secret_by_id(&SecretId(v.secret_id))
.unwrap()
.name
.clone();
result.insert(
k,
json!({"type": "secret", "value": {"value": secret_name}}),
);
}

jsonbb::Value::from(serde_json::Value::Object(result))
}
Loading