Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): introduce sink validation in meta #8417

Merged
merged 10 commits into from
Mar 14, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,8 @@ public void handle(ConnectorServiceProto.ValidateSinkRequest request) {
.build());
responseObserver.onCompleted();
}

responseObserver.onNext(ConnectorServiceProto.ValidateSinkResponse.newBuilder().build());
responseObserver.onCompleted();
}
}
23 changes: 11 additions & 12 deletions src/common/src/util/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
use std::net::SocketAddr;
use std::str::FromStr;

use anyhow::anyhow;
use risingwave_pb::common::HostAddress as ProstHostAddress;

use crate::error::{internal_error, Result};

/// General host address and port.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct HostAddr {
Expand All @@ -41,33 +40,33 @@ impl From<SocketAddr> for HostAddr {
}

impl TryFrom<&str> for HostAddr {
type Error = crate::error::RwError;
type Error = anyhow::Error;
xx01cyx marked this conversation as resolved.
Show resolved Hide resolved

fn try_from(s: &str) -> Result<Self> {
let addr = url::Url::parse(&format!("http://{}", s))
.map_err(|e| internal_error(format!("{}: {}", e, s)))?;
fn try_from(s: &str) -> Result<Self, Self::Error> {
let addr =
url::Url::parse(&format!("http://{}", s)).map_err(|e| anyhow!("{}: {}", e, s))?;
Ok(HostAddr {
host: addr
.host()
.ok_or_else(|| internal_error("invalid host"))?
.ok_or_else(|| anyhow!("invalid host"))?
.to_string(),
port: addr.port().ok_or_else(|| internal_error("invalid port"))?,
port: addr.port().ok_or_else(|| anyhow!("invalid port"))?,
})
}
}

impl TryFrom<&String> for HostAddr {
type Error = crate::error::RwError;
type Error = anyhow::Error;

fn try_from(s: &String) -> Result<Self> {
fn try_from(s: &String) -> Result<Self, Self::Error> {
Self::try_from(s.as_str())
}
}

impl FromStr for HostAddr {
type Err = crate::error::RwError;
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self> {
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::try_from(s)
}
}
Expand Down
17 changes: 16 additions & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ pub mod desc;
use std::collections::HashMap;

use itertools::Itertools;
use risingwave_common::catalog::{ColumnCatalog, DatabaseId, SchemaId, TableId, UserId};
use risingwave_common::catalog::{
ColumnCatalog, DatabaseId, Field, Schema, SchemaId, TableId, UserId,
};
use risingwave_common::util::sort_util::OrderPair;
use risingwave_pb::catalog::{Sink as ProstSink, SinkType as ProstSinkType};

Expand Down Expand Up @@ -165,6 +167,19 @@ impl SinkCatalog {
sink_type: self.sink_type.to_proto() as i32,
}
}

pub fn schema(&self) -> Schema {
let fields = self
.columns
.iter()
.map(|column| Field::from(column.column_desc.clone()))
.collect_vec();
Schema { fields }
}

pub fn pk_indices(&self) -> Vec<usize> {
self.pk.iter().map(|k| k.column_idx).collect_vec()
}
}

impl From<ProstSink> for SinkCatalog {
Expand Down
33 changes: 32 additions & 1 deletion src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;
pub use tracing;

use self::catalog::SinkType;
use self::catalog::{SinkCatalog, SinkType};
use crate::sink::console::{ConsoleConfig, ConsoleSink, CONSOLE_SINK};
use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK};
use crate::sink::redis::{RedisConfig, RedisSink};
Expand Down Expand Up @@ -160,6 +160,37 @@ impl SinkImpl {
SinkConfig::BlackHole => SinkImpl::Blackhole,
})
}

pub async fn validate(
cfg: SinkConfig,
sink_catalog: SinkCatalog,
connector_rpc_endpoint: Option<String>,
) -> Result<()> {
match cfg {
SinkConfig::Redis(cfg) => RedisSink::new(cfg, sink_catalog.schema()).map(|_| ()),
SinkConfig::Kafka(cfg) => {
// We simply call `KafkaSink::new` here to validate a Kafka sink.
if sink_catalog.sink_type.is_append_only() {
KafkaSink::<true>::new(*cfg, sink_catalog.schema(), sink_catalog.pk_indices())
.await
.map(|_| ())
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
} else {
KafkaSink::<false>::new(*cfg, sink_catalog.schema(), sink_catalog.pk_indices())
.await
.map(|_| ())
}
}
SinkConfig::Remote(cfg) => {
if sink_catalog.sink_type.is_append_only() {
RemoteSink::<true>::validate(cfg, sink_catalog, connector_rpc_endpoint).await
} else {
RemoteSink::<false>::validate(cfg, sink_catalog, connector_rpc_endpoint).await
}
}
SinkConfig::Console(_) => Ok(()),
SinkConfig::BlackHole => Ok(()),
}
}
}

macro_rules! impl_sink {
Expand Down
Loading