Skip to content

Commit

Permalink
fix(connector): support more data types in JDBC sink (risingwavelabs#…
Browse files Browse the repository at this point in the history
…8678)

Signed-off-by: tabVersion <tabvision@bupt.icu>
Co-authored-by: tabVersion <tabvision@bupt.icu>
  • Loading branch information
fuyufjh and tabVersion authored Mar 21, 2023
1 parent 694c446 commit bf95473
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 25 deletions.
17 changes: 7 additions & 10 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so

export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk
# TODO: Switch to stream_chunk encoding once it's completed, and then remove json encoding as well as this env var.
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=json

echo "--- Download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
Expand All @@ -55,7 +56,7 @@ mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE DATABASE IF NOT EXIST
# grant access to `test` for ci test user
mysql --host=mysql --port=3306 -u root -p123456 -e "GRANT ALL PRIVILEGES ON test.* TO 'mysqluser'@'%';"
# create a table named t_remote
mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE TABLE IF NOT EXISTS test.t_remote (id INT, name VARCHAR(255), PRIMARY KEY (id));"
mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/mysql_create_table.sql

echo "--- preparing postgresql"

Expand All @@ -65,7 +66,7 @@ export PGPASSWORD=postgres
psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';"
createdb -h db -U postgres test
psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);"
psql -h db -U postgres -d test -c "CREATE TABLE t_remote (id serial PRIMARY KEY, name VARCHAR (50) NOT NULL);"
psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql

node_port=50051
node_timeout=10
Expand Down Expand Up @@ -106,13 +107,9 @@ sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
sleep 1

# check sink destination mysql using shell
if mysql --host=mysql --port=3306 -u root -p123456 -sN -e "SELECT * FROM test.t_remote ORDER BY id;" | awk '{
if ($1 == 1 && $2 == "Alex") c1++;
if ($1 == 3 && $2 == "Carl") c2++;
if ($1 == 4 && $2 == "Doris") c3++;
if ($1 == 5 && $2 == "Eve") c4++;
if ($1 == 6 && $2 == "Frank") c5++; }
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1); }'; then
diff -u ./e2e_test/sink/remote/mysql_expected_result.tsv \
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_remote ORDER BY id")
if [ $? -eq 0 ]; then
echo "mysql sink check passed"
else
echo "The output is not as expected."
Expand Down
10 changes: 5 additions & 5 deletions e2e_test/sink/remote/jdbc.check.pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
query I
select * from t_remote order by id;
----
1 Alex
3 Carl
4 Doris
5 Eve
6 Frank
1 Alex 28208 281620391 4986480304337356800 28162.0391 2.03 28162.0391 2023-03-20 10:18:30
3 Carl 18300 1702307129 7878292368468104192 17023.07129 23.07 17023.07129 2023-03-20 10:18:32
4 Doris 17250 151951802 3946135584462581760 1519518.02 18.02 1519518.02 2023-03-21 10:18:30
5 Eve 9725 698160808 524334216698825600 69.8160808 69.81 69.8160808 2023-03-21 10:18:31
6 Frank 28131 1233587627 8492820454814063616 123358.7627 58.76 123358.7627 2023-03-21 10:18:32
24 changes: 20 additions & 4 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
statement ok
create table t_remote (id integer primary key, name varchar);
create table t_remote (
id integer primary key,
v_varchar varchar,
v_smallint smallint,
v_integer integer,
v_bigint bigint,
v_decimal decimal,
v_float float,
v_double double,
v_timestamp timestamp
);

statement ok
create materialized view mv_remote as select * from t_remote;
Expand All @@ -19,16 +29,22 @@ CREATE SINK s_mysql FROM mv_remote WITH (
);

statement ok
INSERT INTO t_remote VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carl');
INSERT INTO t_remote VALUES
(1, 'Alice', 28208, 281620391, 4986480304337356659, 28162.0391, 2.03, 28162.0391, '2023-03-20 10:18:30'),
(2, 'Bob', 10580, 2131030003, 3074255027698877876, 21310.30003, 10.3, 21310.30003, '2023-03-20 10:18:31'),
(3, 'Carl', 18300, 1702307129, 7878292368468104216, 17023.07129, 23.07, 17023.07129, '2023-03-20 10:18:32');

statement ok
INSERT INTO t_remote VALUES (4, 'Doris'), (5, 'Eve'), (6, 'Frank');
INSERT INTO t_remote VALUES
(4, 'Doris', 17250, 151951802, 3946135584462581863, 1519518.02, 18.02, 1519518.02, '2023-03-21 10:18:30'),
(5, 'Eve', 9725, 698160808, 524334216698825611, 69.8160808, 69.81, 69.8160808, '2023-03-21 10:18:31'),
(6, 'Frank', 28131, 1233587627, 8492820454814063326, 123358.7627, 58.76, 123358.7627, '2023-03-21 10:18:32');

statement ok
FLUSH;

statement ok
UPDATE t_remote SET name = 'Alex' WHERE id = 1;
UPDATE t_remote SET v_varchar = 'Alex' WHERE id = 1;

statement ok
DELETE FROM t_remote WHERE id = 2;
Expand Down
11 changes: 11 additions & 0 deletions e2e_test/sink/remote/mysql_create_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE t_remote (
id integer PRIMARY KEY,
v_varchar varchar(100),
v_smallint smallint,
v_integer integer,
v_bigint bigint,
v_decimal decimal,
v_float float,
v_double double,
v_timestamp timestamp
);
5 changes: 5 additions & 0 deletions e2e_test/sink/remote/mysql_expected_result.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1 Alex 28208 281620391 4986480304337356800 28162 2.03 28162.0391 2023-03-20 10:18:30
3 Carl 18300 1702307129 7878292368468104192 17023 23.07 17023.07129 2023-03-20 10:18:32
4 Doris 17250 151951802 3946135584462581760 1519518 18.02 1519518.02 2023-03-21 10:18:30
5 Eve 9725 698160808 524334216698825600 70 69.81 69.8160808 2023-03-21 10:18:31
6 Frank 28131 1233587627 8492820454814063616 123359 58.76 123358.7627 2023-03-21 10:18:32
11 changes: 11 additions & 0 deletions e2e_test/sink/remote/pg_create_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE t_remote (
id integer PRIMARY KEY,
v_varchar varchar(100),
v_smallint smallint,
v_integer integer,
v_bigint bigint,
v_decimal decimal,
v_float real,
v_double double precision,
v_timestamp timestamp
);
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload;
import com.risingwave.proto.Data;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Map;

public class JsonDeserializer implements Deserializer {
Expand All @@ -31,6 +33,8 @@ public JsonDeserializer(TableSchema tableSchema) {
this.tableSchema = tableSchema;
}

// Encoding here should be consistent with `datum_to_json_object()` in
// src/connector/src/sink/mod.rs
@Override
public CloseableIterator<SinkRow> deserialize(
ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch) {
Expand Down Expand Up @@ -113,6 +117,19 @@ private static Double castDouble(Object value) {
}
}

private static BigDecimal castDecimal(Object value) {
if (value instanceof String) {
// FIXME(eric): See `datum_to_json_object()` in src/connector/src/sink/mod.rs
return new BigDecimal((String) value);
} else if (value instanceof BigDecimal) {
return (BigDecimal) value;
} else {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unable to cast into double from " + value.getClass())
.asRuntimeException();
}
}

private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) {
switch (typeName) {
case INT16:
Expand All @@ -132,13 +149,24 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj
return castDouble(value);
case FLOAT:
return castDouble(value).floatValue();
case DECIMAL:
return castDecimal(value);
case BOOLEAN:
if (!(value instanceof Boolean)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected boolean, got " + value.getClass())
.asRuntimeException();
}
return value;
case TIMESTAMP:
case TIMESTAMPTZ:
if (!(value instanceof String)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription(
"Expected timestamp in string, got " + value.getClass())
.asRuntimeException();
}
return Timestamp.valueOf((String) value);
default:
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unsupported type " + typeName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public void validate(
jdbcPk.add(pkResultSet.getString("COLUMN_NAME"));
}
} catch (SQLException e) {
throw Status.INTERNAL.withCause(e).asRuntimeException();
throw Status.INVALID_ARGUMENT
.withDescription("failed to connect to target database: " + e.getSQLState())
.asRuntimeException();
}

if (!jdbcTableNames.contains(tableName)) {
Expand Down
5 changes: 0 additions & 5 deletions java/tools/maven/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,6 @@ This file is based on the checkstyle file of Apache Beam.
<!-- Enforce Java-style array declarations -->
<module name="ArrayTypeStyle"/>

<module name="TodoComment">
<!-- Checks that disallowed strings are not used in comments. -->
<property name="format" value="(FIXME)|(XXX)|(@author)"/>
</module>

<!--
IMPORT CHECKS
Expand Down

0 comments on commit bf95473

Please sign in to comment.