From bf954734311b4eb791f23e5be634ce1f0a0879eb Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 21 Mar 2023 18:14:20 +0800 Subject: [PATCH] fix(connector): support more data types in JDBC sink (#8678) Signed-off-by: tabVersion Co-authored-by: tabVersion --- ci/scripts/e2e-sink-test.sh | 17 +++++------ e2e_test/sink/remote/jdbc.check.pg.slt | 10 +++---- e2e_test/sink/remote/jdbc.load.slt | 24 +++++++++++++--- e2e_test/sink/remote/mysql_create_table.sql | 11 ++++++++ .../sink/remote/mysql_expected_result.tsv | 5 ++++ e2e_test/sink/remote/pg_create_table.sql | 11 ++++++++ .../connector/JsonDeserializer.java | 28 +++++++++++++++++++ .../risingwave/connector/JDBCSinkFactory.java | 4 ++- java/tools/maven/checkstyle.xml | 5 ---- 9 files changed, 90 insertions(+), 25 deletions(-) create mode 100644 e2e_test/sink/remote/mysql_create_table.sql create mode 100644 e2e_test/sink/remote/mysql_expected_result.tsv create mode 100644 e2e_test/sink/remote/pg_create_table.sql diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 4c04bdba70cf..2b481feb38de 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -31,7 +31,8 @@ mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug -export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk +# TODO: Switch to stream_chunk encoding once it's completed, and then remove json encoding as well as this env var. +export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=json echo "--- Download connector node package" buildkite-agent artifact download risingwave-connector.tar.gz ./ @@ -55,7 +56,7 @@ mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE DATABASE IF NOT EXIST # grant access to `test` for ci test user mysql --host=mysql --port=3306 -u root -p123456 -e "GRANT ALL PRIVILEGES ON test.* TO 'mysqluser'@'%';" # create a table named t_remote -mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE TABLE IF NOT EXISTS test.t_remote (id INT, name VARCHAR(255), PRIMARY KEY (id));" +mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/mysql_create_table.sql echo "--- preparing postgresql" @@ -65,7 +66,7 @@ export PGPASSWORD=postgres psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';" createdb -h db -U postgres test psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);" -psql -h db -U postgres -d test -c "CREATE TABLE t_remote (id serial PRIMARY KEY, name VARCHAR (50) NOT NULL);" +psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql node_port=50051 node_timeout=10 @@ -106,13 +107,9 @@ sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt' sleep 1 # check sink destination mysql using shell -if mysql --host=mysql --port=3306 -u root -p123456 -sN -e "SELECT * FROM test.t_remote ORDER BY id;" | awk '{ -if ($1 == 1 && $2 == "Alex") c1++; - if ($1 == 3 && $2 == "Carl") c2++; - if ($1 == 4 && $2 == "Doris") c3++; - if ($1 == 5 && $2 == "Eve") c4++; - if ($1 == 6 && $2 == "Frank") c5++; } - END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1); }'; then +diff -u ./e2e_test/sink/remote/mysql_expected_result.tsv \ +<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_remote ORDER BY id") +if [ $? -eq 0 ]; then echo "mysql sink check passed" else echo "The output is not as expected." diff --git a/e2e_test/sink/remote/jdbc.check.pg.slt b/e2e_test/sink/remote/jdbc.check.pg.slt index 601a0b29e57b..bd00a7938a89 100644 --- a/e2e_test/sink/remote/jdbc.check.pg.slt +++ b/e2e_test/sink/remote/jdbc.check.pg.slt @@ -3,8 +3,8 @@ query I select * from t_remote order by id; ---- -1 Alex -3 Carl -4 Doris -5 Eve -6 Frank \ No newline at end of file +1 Alex 28208 281620391 4986480304337356800 28162.0391 2.03 28162.0391 2023-03-20 10:18:30 +3 Carl 18300 1702307129 7878292368468104192 17023.07129 23.07 17023.07129 2023-03-20 10:18:32 +4 Doris 17250 151951802 3946135584462581760 1519518.02 18.02 1519518.02 2023-03-21 10:18:30 +5 Eve 9725 698160808 524334216698825600 69.8160808 69.81 69.8160808 2023-03-21 10:18:31 +6 Frank 28131 1233587627 8492820454814063616 123358.7627 58.76 123358.7627 2023-03-21 10:18:32 diff --git a/e2e_test/sink/remote/jdbc.load.slt b/e2e_test/sink/remote/jdbc.load.slt index 88bec6048187..9adbb40a1b97 100644 --- a/e2e_test/sink/remote/jdbc.load.slt +++ b/e2e_test/sink/remote/jdbc.load.slt @@ -1,5 +1,15 @@ statement ok -create table t_remote (id integer primary key, name varchar); +create table t_remote ( + id integer primary key, + v_varchar varchar, + v_smallint smallint, + v_integer integer, + v_bigint bigint, + v_decimal decimal, + v_float float, + v_double double, + v_timestamp timestamp +); statement ok create materialized view mv_remote as select * from t_remote; @@ -19,16 +29,22 @@ CREATE SINK s_mysql FROM mv_remote WITH ( ); statement ok -INSERT INTO t_remote VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carl'); +INSERT INTO t_remote VALUES + (1, 'Alice', 28208, 281620391, 4986480304337356659, 28162.0391, 2.03, 28162.0391, '2023-03-20 10:18:30'), + (2, 'Bob', 10580, 2131030003, 3074255027698877876, 21310.30003, 10.3, 21310.30003, '2023-03-20 10:18:31'), + (3, 'Carl', 18300, 1702307129, 7878292368468104216, 17023.07129, 23.07, 17023.07129, '2023-03-20 10:18:32'); statement ok -INSERT INTO t_remote VALUES (4, 'Doris'), (5, 'Eve'), (6, 'Frank'); +INSERT INTO t_remote VALUES + (4, 'Doris', 17250, 151951802, 3946135584462581863, 1519518.02, 18.02, 1519518.02, '2023-03-21 10:18:30'), + (5, 'Eve', 9725, 698160808, 524334216698825611, 69.8160808, 69.81, 69.8160808, '2023-03-21 10:18:31'), + (6, 'Frank', 28131, 1233587627, 8492820454814063326, 123358.7627, 58.76, 123358.7627, '2023-03-21 10:18:32'); statement ok FLUSH; statement ok -UPDATE t_remote SET name = 'Alex' WHERE id = 1; +UPDATE t_remote SET v_varchar = 'Alex' WHERE id = 1; statement ok DELETE FROM t_remote WHERE id = 2; diff --git a/e2e_test/sink/remote/mysql_create_table.sql b/e2e_test/sink/remote/mysql_create_table.sql new file mode 100644 index 000000000000..491072eb59d6 --- /dev/null +++ b/e2e_test/sink/remote/mysql_create_table.sql @@ -0,0 +1,11 @@ +CREATE TABLE t_remote ( + id integer PRIMARY KEY, + v_varchar varchar(100), + v_smallint smallint, + v_integer integer, + v_bigint bigint, + v_decimal decimal, + v_float float, + v_double double, + v_timestamp timestamp +); \ No newline at end of file diff --git a/e2e_test/sink/remote/mysql_expected_result.tsv b/e2e_test/sink/remote/mysql_expected_result.tsv new file mode 100644 index 000000000000..8e738579032d --- /dev/null +++ b/e2e_test/sink/remote/mysql_expected_result.tsv @@ -0,0 +1,5 @@ +1 Alex 28208 281620391 4986480304337356800 28162 2.03 28162.0391 2023-03-20 10:18:30 +3 Carl 18300 1702307129 7878292368468104192 17023 23.07 17023.07129 2023-03-20 10:18:32 +4 Doris 17250 151951802 3946135584462581760 1519518 18.02 1519518.02 2023-03-21 10:18:30 +5 Eve 9725 698160808 524334216698825600 70 69.81 69.8160808 2023-03-21 10:18:31 +6 Frank 28131 1233587627 8492820454814063616 123359 58.76 123358.7627 2023-03-21 10:18:32 diff --git a/e2e_test/sink/remote/pg_create_table.sql b/e2e_test/sink/remote/pg_create_table.sql new file mode 100644 index 000000000000..c20e3386e8d0 --- /dev/null +++ b/e2e_test/sink/remote/pg_create_table.sql @@ -0,0 +1,11 @@ +CREATE TABLE t_remote ( + id integer PRIMARY KEY, + v_varchar varchar(100), + v_smallint smallint, + v_integer integer, + v_bigint bigint, + v_decimal decimal, + v_float real, + v_double double precision, + v_timestamp timestamp +); \ No newline at end of file diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java index 097872dcef75..837989120eab 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java @@ -22,6 +22,8 @@ import com.risingwave.proto.ConnectorServiceProto; import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload; import com.risingwave.proto.Data; +import java.math.BigDecimal; +import java.sql.Timestamp; import java.util.Map; public class JsonDeserializer implements Deserializer { @@ -31,6 +33,8 @@ public JsonDeserializer(TableSchema tableSchema) { this.tableSchema = tableSchema; } + // Encoding here should be consistent with `datum_to_json_object()` in + // src/connector/src/sink/mod.rs @Override public CloseableIterator deserialize( ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch) { @@ -113,6 +117,19 @@ private static Double castDouble(Object value) { } } + private static BigDecimal castDecimal(Object value) { + if (value instanceof String) { + // FIXME(eric): See `datum_to_json_object()` in src/connector/src/sink/mod.rs + return new BigDecimal((String) value); + } else if (value instanceof BigDecimal) { + return (BigDecimal) value; + } else { + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription("unable to cast into double from " + value.getClass()) + .asRuntimeException(); + } + } + private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) { switch (typeName) { case INT16: @@ -132,6 +149,8 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj return castDouble(value); case FLOAT: return castDouble(value).floatValue(); + case DECIMAL: + return castDecimal(value); case BOOLEAN: if (!(value instanceof Boolean)) { throw io.grpc.Status.INVALID_ARGUMENT @@ -139,6 +158,15 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj .asRuntimeException(); } return value; + case TIMESTAMP: + case TIMESTAMPTZ: + if (!(value instanceof String)) { + throw io.grpc.Status.INVALID_ARGUMENT + .withDescription( + "Expected timestamp in string, got " + value.getClass()) + .asRuntimeException(); + } + return Timestamp.valueOf((String) value); default: throw io.grpc.Status.INVALID_ARGUMENT .withDescription("unsupported type " + typeName) diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java index 1bdd8d4cd271..8e03db003243 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java @@ -75,7 +75,9 @@ public void validate( jdbcPk.add(pkResultSet.getString("COLUMN_NAME")); } } catch (SQLException e) { - throw Status.INTERNAL.withCause(e).asRuntimeException(); + throw Status.INVALID_ARGUMENT + .withDescription("failed to connect to target database: " + e.getSQLState()) + .asRuntimeException(); } if (!jdbcTableNames.contains(tableName)) { diff --git a/java/tools/maven/checkstyle.xml b/java/tools/maven/checkstyle.xml index 33649434a326..4bd0d510e0fa 100644 --- a/java/tools/maven/checkstyle.xml +++ b/java/tools/maven/checkstyle.xml @@ -165,11 +165,6 @@ This file is based on the checkstyle file of Apache Beam. - - - - -