Skip to content

Commit

Permalink
fix: ban types when creating sink (risingwavelabs#9008)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <tabvision@bupt.icu>
  • Loading branch information
tabVersion authored Apr 6, 2023
1 parent 4c0a0b4 commit b19e651
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 10 deletions.
2 changes: 2 additions & 0 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export PGPASSWORD=postgres
psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';"
createdb -h db -U postgres test
psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);"
psql -h db -U postgres -d test -c "create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float4, v5 float8, v6 decimal, v7 varchar, v8 timestamp, v9 boolean);"
psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql

node_port=50051
Expand Down Expand Up @@ -98,6 +99,7 @@ echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/append_only_sink.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/create_sink_as.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/blackhole_sink.slt'
sqllogictest -p 4566 -d dev './e2e_test/sink/remote/types.slt'
sleep 1

# check sink destination postgres
Expand Down
30 changes: 30 additions & 0 deletions e2e_test/sink/remote/types.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
statement ok
create table t5 (v1 smallint primary key, v2 int, v3 bigint, v4 float, v5 double, v6 decimal, v7 varchar, v8 timestamp, v9 boolean);

statement ok
create sink s from t5 with (
connector = 'jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name = 't5',
type = 'upsert'
);

statement ok
drop sink s;

statement ok
drop table t5;

statement ok
create table t6 (v1 smallint primary key, v2 int, v3 bigint, v4 float, v5 double, v6 decimal, v7 varchar, v8 timestamp, v9 boolean, v10 date, v11 struct<v12 time, v13 timestamptz>, v14 varchar[]);

statement error
create sink s from t6 with (
connector = 'jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name = 't6',
type = 'upsert'
);

statement ok
drop table t6;
41 changes: 31 additions & 10 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use risingwave_common::array::StreamChunk;
#[cfg(test)]
use risingwave_common::catalog::Field;
use risingwave_common::catalog::Schema;
#[cfg(test)]
use risingwave_common::types::DataType;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::connector_service::sink_stream_request::write_batch::json_payload::RowOp;
Expand Down Expand Up @@ -187,6 +186,37 @@ impl<const APPEND_ONLY: bool> RemoteSink<APPEND_ONLY> {
sink_catalog: SinkCatalog,
connector_rpc_endpoint: Option<String>,
) -> Result<()> {
// FIXME: support struct and array in stream sink
let columns = sink_catalog
.columns
.iter()
.map(|column| {
if matches!(
column.column_desc.data_type,
DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Float32
| DataType::Float64
| DataType::Boolean
| DataType::Decimal
| DataType::Timestamp
| DataType::Varchar
) {
Ok( Column {
name: column.column_desc.name.clone(),
data_type: column.column_desc.data_type.to_protobuf().type_name,
})
} else {
Err(SinkError::Remote(format!(
"remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Timestamp and Varchar, got {:?}: {:?}",
column.column_desc.name,
column.column_desc.data_type
)))
}
})
.collect::<Result<Vec<_>>>()?;

let address = connector_rpc_endpoint.ok_or_else(|| {
SinkError::Remote("connector sink endpoint not specified".parse().unwrap())
})?;
Expand All @@ -197,15 +227,6 @@ impl<const APPEND_ONLY: bool> RemoteSink<APPEND_ONLY> {
&address, err
))
})?;

let columns = sink_catalog
.columns
.iter()
.map(|column| Column {
name: column.column_desc.name.clone(),
data_type: column.column_desc.data_type.to_protobuf().type_name,
})
.collect_vec();
let table_schema = TableSchema {
columns,
pk_indices: sink_catalog
Expand Down

0 comments on commit b19e651

Please sign in to comment.