Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): introduce sink validation in meta #8417

Merged
merged 10 commits into from
Mar 14, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void handle(ConnectorServiceProto.ValidateSinkRequest request) {
ConnectorServiceProto.ValidateSinkResponse.newBuilder()
.setError(
ConnectorServiceProto.ValidationError.newBuilder()
.setErrorMessage(e.toString())
.setErrorMessage(e.getMessage())
.build())
.build());
responseObserver.onCompleted();
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use itertools::Itertools;
use risingwave_common::catalog::{
ColumnCatalog, DatabaseId, Field, Schema, SchemaId, TableId, UserId,
};
use risingwave_common::util::sort_util::OrderPair;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{Sink as ProstSink, SinkType as ProstSinkType};

#[derive(Clone, Copy, Debug, Default, Hash, PartialOrd, PartialEq, Eq)]
Expand Down Expand Up @@ -178,7 +178,7 @@ impl SinkCatalog {
}

pub fn pk_indices(&self) -> Vec<usize> {
self.pk.iter().map(|k| k.column_idx).collect_vec()
self.pk.iter().map(|k| k.column_index).collect_vec()
}
}

Expand Down
1 change: 0 additions & 1 deletion src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ pub struct KafkaConfig {

pub format: String, // accept "append_only", "debezium", or "upsert"

#[serde(default)]
pub identifier: String,

#[serde(
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub enum SinkError {

impl From<RpcError> for SinkError {
fn from(value: RpcError) -> Self {
SinkError::Remote(format!("{:?}", value))
SinkError::Remote(format!("{}", value))
}
}

Expand Down
22 changes: 0 additions & 22 deletions src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,4 @@ impl StreamingJob {
None
}
}

/// Returns the optional [`Source`] if this is a `Table` streaming job.
///
/// Only used for registering sources for creating tables with connectors.
pub fn source(&self) -> Option<&Source> {
match self {
Self::MaterializedView(_) => None,
Self::Sink(_) => None,
Self::Table(source, _) => source.as_ref(),
Self::Index(_, _) => None,
}
}

/// Returns the [`Sink`] if this is a `Sink` streaming job.
pub fn sink(&self) -> Option<&Sink> {
match self {
Self::MaterializedView(_) => None,
Self::Sink(sink) => Some(sink),
Self::Table(_, _) => None,
Self::Index(_, _) => None,
}
}
}
17 changes: 11 additions & 6 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,18 @@ where

internal_tables = ctx.internal_tables();

if let Some(source) = stream_job.source() {
// Register the source on the connector node.
self.source_manager.register_source(source).await?;
} else if let Some(sink) = stream_job.sink() {
// Validate the sink on the connector node.
validate_sink(sink, self.env.opts.connector_rpc_endpoint.clone()).await?;
match &stream_job {
StreamingJob::Table(Some(source), _) => {
// Register the source on the connector node.
self.source_manager.register_source(source).await?;
}
StreamingJob::Sink(sink) => {
// Validate the sink on the connector node.
validate_sink(sink, self.env.opts.connector_rpc_endpoint.clone()).await?;
}
_ => {}
}

self.stream_manager
.create_streaming_job(table_fragments, ctx)
.await?;
Expand Down
6 changes: 5 additions & 1 deletion src/meta/src/stream/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ pub async fn validate_sink(
connector_rpc_endpoint: Option<String>,
) -> MetaResult<()> {
let sink_catalog = SinkCatalog::from(prost_sink_catalog);
let sink_config = SinkConfig::from_hashmap(sink_catalog.properties.clone())
let mut properties = sink_catalog.properties.clone();
// Insert a value as the `identifier` field to get parsed by serde.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this line mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The identifier field is supposed to be inserted when we create the sink executor. We insert a dummy value here in meta to ensure a successful parsing. (The field is originally defined here: https://github.com/risingwavelabs/risingwave/blob/main/src/connector/src/sink/kafka.rs#L66)

properties.insert("identifier".to_string(), u64::MAX.to_string());
let sink_config = SinkConfig::from_hashmap(properties)
.map_err(|err| MetaError::from(anyhow!(err.to_string())))?;

SinkImpl::validate(sink_config, sink_catalog, connector_rpc_endpoint)
.await
.map_err(|err| MetaError::from(anyhow!(err.to_string())))
Expand Down
7 changes: 6 additions & 1 deletion src/rpc_client/src/connector_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,12 @@ impl ConnectorClient {
.into_inner();
response.error.map_or_else(
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
|| Ok(()), // If there is no error message, return Ok here.
|err| Err(RpcError::Internal(anyhow!(err.error_message))),
|err| {
Err(RpcError::Internal(anyhow!(format!(
"sink cannot pass validation: {}",
err.error_message
))))
},
)
}
}
Expand Down