Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): Support Mongodb sink #17102

Merged
merged 27 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8a9e865
save work
ly9chee May 21, 2024
b307b1a
save work
ly9chee May 21, 2024
861a654
save work
ly9chee May 25, 2024
2fda3b7
save work
ly9chee Jun 4, 2024
e23c399
Merge branch 'mongodb-sink' of https://github.com/ly9chee/risingwave …
ly9chee Jun 4, 2024
1901d80
add e2e tests
ly9chee Jun 4, 2024
d5d799c
fix e2e tests
ly9chee Jun 4, 2024
a1763b8
update Cargo.lock
ly9chee Jun 4, 2024
4c36a53
merge main
ly9chee Jun 4, 2024
2404c76
fix fmt
ly9chee Jun 4, 2024
63bc960
fix dylint
ly9chee Jun 4, 2024
05a186a
merge pk checking logic
ly9chee Jun 7, 2024
27bb797
support compound pk & fix upsert
ly9chee Jun 8, 2024
4c87a9b
fix fmt
ly9chee Jun 8, 2024
c7d9151
moving encoding logic to bson.rs & minor refactors
ly9chee Jun 12, 2024
e5f7ad0
add license header
ly9chee Jun 13, 2024
73c88e0
Merge branch 'main' into mongodb-sink
ly9chee Jun 13, 2024
7c4ad31
warning when non-insert op received in append-only mode
ly9chee Jun 13, 2024
2d413a7
refactor is_append_only dispatching logic
ly9chee Jun 14, 2024
b461ce3
add comment for MONGODB_BULK_WRITE_SIZE_LIMIT
ly9chee Jun 14, 2024
b8ec448
Merge branch 'main' into mongodb-sink
wenym1 Jun 17, 2024
6e9d968
remove preserve_order from serde_json introduced by bson
ly9chee Jun 17, 2024
623c7e4
Merge branch 'mongodb-sink' of https://github.com/ly9chee/risingwave …
ly9chee Jun 17, 2024
de04e01
Merge branch 'main' into mongodb-sink
wenym1 Jun 18, 2024
19e43b2
Merge branch 'main' into mongodb-sink
ly9chee Jun 18, 2024
90bc3f3
fix lint
ly9chee Jun 18, 2024
d3d31db
Merge branch 'main' into mongodb-sink
ly9chee Jun 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 228 additions & 42 deletions Cargo.lock

Large diffs are not rendered by default.

64 changes: 32 additions & 32 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ services:
ports:
- 5432
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U postgres" ]
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5
command: [ "postgres", "-c", "wal_level=logical" ]
command: ["postgres", "-c", "wal_level=logical"]

mysql:
image: mysql:8.0
Expand All @@ -25,11 +25,7 @@ services:
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
healthcheck:
test:
[
"CMD-SHELL",
"mysqladmin ping -h 127.0.0.1 -u root -p123456"
]
test: ["CMD-SHELL", "mysqladmin ping -h 127.0.0.1 -u root -p123456"]
interval: 5s
timeout: 5s
retries: 5
Expand Down Expand Up @@ -100,10 +96,11 @@ services:
- doris-server
- starrocks-fe-server
- starrocks-be-server
- mongodb
- mongodb-setup
volumes:
- ..:/risingwave


rw-build-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240514-1
volumes:
Expand Down Expand Up @@ -173,7 +170,7 @@ services:

redis-server:
container_name: redis-server
image: 'redis:latest'
image: "redis:latest"
expose:
- 6379
ports:
Expand Down Expand Up @@ -208,8 +205,7 @@ services:
container_name: starrocks-fe-server
image: starrocks/fe-ubuntu:3.1.7
hostname: starrocks-fe-server
command:
/opt/starrocks/fe/bin/start_fe.sh
command: /opt/starrocks/fe/bin/start_fe.sh
ports:
- 28030:8030
- 29020:9020
Expand All @@ -236,10 +232,10 @@ services:
depends_on:
- starrocks-fe-server

# # Temporary workaround for json schema registry test since redpanda only supports
# # protobuf/avro schema registry. Should be removed after the support.
# # Related tracking issue:
# # https://github.com/redpanda-data/redpanda/issues/1878
# # Temporary workaround for json schema registry test since redpanda only supports
# # protobuf/avro schema registry. Should be removed after the support.
# # Related tracking issue:
# # https://github.com/redpanda-data/redpanda/issues/1878
schemaregistry:
container_name: schemaregistry
hostname: schemaregistry
Expand All @@ -252,7 +248,7 @@ services:
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry:8082
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: message_queue:29092
SCHEMA_REGISTRY_DEBUG: 'true'
SCHEMA_REGISTRY_DEBUG: "true"

pulsar-server:
container_name: pulsar-server
Expand All @@ -265,7 +261,7 @@ services:
- "8080"
- "6650"
healthcheck:
test: [ "CMD-SHELL", "bin/pulsar-admin brokers healthcheck"]
test: ["CMD-SHELL", "bin/pulsar-admin brokers healthcheck"]
interval: 5s
timeout: 5s
retries: 5
Expand All @@ -291,33 +287,37 @@ services:
[
"bash",
"-c",
"sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10"
"sleep 10 && mongo --host mongodb:27017 /config-replica.js && sleep 10",
]
restart: "no"
volumes:
- ./mongodb/config-replica.js:/config-replica.js

mongo_data_generator:
build:
context: .
dockerfile: ./mongodb/Dockerfile.generator
container_name: mongo_data_generator
depends_on:
- mongodb
environment:
MONGO_HOST: mongodb
MONGO_PORT: 27017
MONGO_DB_NAME: random_data
build:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert these unintended changes, maybe by your formatter.

context: .
dockerfile: ./mongodb/Dockerfile.generator
container_name: mongo_data_generator
depends_on:
- mongodb
environment:
MONGO_HOST: mongodb
MONGO_PORT: 27017
MONGO_DB_NAME: random_data
mqtt-server:
image: eclipse-mosquitto
command:
- sh
- -c
- echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf
- sh
- -c
- echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf
ports:
- 1883:1883
healthcheck:
test: ["CMD-SHELL", "(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0"]
test:
[
"CMD-SHELL",
"(mosquitto_sub -h localhost -p 1883 -t 'topic' -E -i probe 2>&1 | grep Error) && exit 1 || exit 0",
]
interval: 10s
timeout: 10s
retries: 6
72 changes: 72 additions & 0 deletions ci/scripts/e2e-mongodb-sink-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.sh

while getopts 'p:' opt; do
case ${opt} in
p )
profile=$OPTARG
;;
\? )
echo "Invalid Option: -$OPTARG" 1>&2
exit 1
;;
: )
echo "Invalid option: $OPTARG requires an argument" 1>&2
;;
esac
done
shift $((OPTIND -1))

download_and_prepare_rw "$profile" source

echo "--- starting risingwave cluster"
cargo make ci-start ci-sink-test
sleep 1

# install the mongo shell
wget http://archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
wget https://repo.mongodb.org/apt/ubuntu/dists/focal/mongodb-org/4.4/multiverse/binary-amd64/mongodb-org-shell_4.4.28_amd64.deb
dpkg -i libssl1.1_1.1.1f-1ubuntu2_amd64.deb
dpkg -i mongodb-org-shell_4.4.28_amd64.deb

echo '> ping mongodb'
echo 'db.runCommand({ping: 1})' | mongo mongodb://mongodb:27017
echo '> rs config'
echo 'rs.conf()' | mongo mongodb://mongodb:27017
echo '> run mongodb sink test..'

sqllogictest -p 4566 -d dev './e2e_test/sink/mongodb_sink.slt'
sleep 1

append_only_result=$(mongo mongodb://mongodb:27017 --eval 'db.getSiblingDB("demo").t1.countDocuments({})' | tail -n 1)
if [ "$append_only_result" != "1" ]; then
echo "The append-only output is not as expected."
exit 1
fi

upsert_and_dynamic_coll_result1=$(mongo mongodb://mongodb:27017 --eval 'db.getSiblingDB("demo").t2.countDocuments({})' | tail -n 1)
if [ "$upsert_and_dynamic_coll_result1" != "1" ]; then
echo "The upsert output is not as expected."
exit 1
fi

upsert_and_dynamic_coll_result2=$(mongo mongodb://mongodb:27017 --eval 'db.getSiblingDB("shard_2024_01").tenant_1.countDocuments({})' | tail -n 1)
if [ "$upsert_and_dynamic_coll_result2" != "1" ]; then
echo "The upsert output is not as expected."
exit 1
fi

compound_pk_result=$(mongo mongodb://mongodb:27017 --eval 'db.getSiblingDB("demo").t3.countDocuments({})' | tail -n 1)
if [ "$compound_pk_result" != "1" ]; then
echo "The upsert output is not as expected."
exit 1
fi

echo "Mongodb sink check passed"

echo "--- Kill cluster"
risedev ci-kill
19 changes: 19 additions & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,25 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end mongodb sink test"
key: "e2e-mongodb-sink-tests"
command: "ci/scripts/e2e-mongodb-sink-test.sh -p ci-release"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-mongodb-sink-tests"
|| build.env("CI_STEPS") =~ /(^|,)e2e-mongodb-sink-tests?(,|$$)/
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v5.1.0:
run: sink-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "connector node integration test Java {{matrix.java_version}}"
key: "connector-node-integration-test"
command: "ci/scripts/connector-node-integration-test.sh -p ci-release -v {{matrix.java_version}}"
Expand Down
15 changes: 15 additions & 0 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,21 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end mongodb sink test"
if: build.pull_request.labels includes "ci/run-e2e-mongodb-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-mongodb-sink-tests?(,|$$)/
command: "ci/scripts/e2e-mongodb-sink-test.sh -p ci-dev"
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v5.1.0:
run: sink-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "e2e java-binding test"
if: build.pull_request.labels includes "ci/run-java-binding-tests" || build.env("CI_STEPS") =~ /(^|,)java-binding-tests?(,|$$)/
command: "ci/scripts/java-binding-test.sh -p ci-dev"
Expand Down
106 changes: 106 additions & 0 deletions e2e_test/sink/mongodb_sink.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
statement ok
create table t1(
a smallint,
b int,
c bigint,
d rw_int256,
e real,
f double precision,
g varchar,
h bytea,
i date,
j time,
k timestamp,
l timestamptz,
m interval,
n STRUCT <b STRUCT<c INTEGER>, d INTEGER>,
o varchar[],
p jsonb
) append only;

statement ok
create sink t1_sink from t1
with (
connector='mongodb',
type = 'append-only',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'demo.t1',
mongodb.bulk_write.max_entries = '1024'
);

statement ok
insert into t1 values(1, 2, 3, 4, 5.0, 6.0, '7', '\xDe00BeEf', date '2022-04-08', time '18:20:49',
'2022-03-13 01:00:00'::timestamp, '2022-03-13 01:00:00Z'::timestamptz, interval '4 hour',
ROW(ROW(8), 9), ARRAY['a', 'b', 'c'], '{"a": [{"b": 1}], "c": true}'::jsonb);

statement ok
create table t2(
_id bigint primary key,
collection_name varchar,
value varchar
);

statement ok
create sink t2_sink from t2
with (
connector='mongodb',
type = 'upsert',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'demo.t2',
mongodb.bulk_write.max_entries = '1024',
collection.name.field = 'collection_name',
collection.name.field.drop = 'true',
primary_key='_id'
);

statement ok
insert into t2 values(1, 'shard_2024_01.tenant_1', 'data');

statement ok
insert into t2 values(2, '', 'data');

statement ok
create table t3(
a int,
b int,
value text,
primary key (a,b)
);

statement ok
create sink t3_sink from t3
with (
connector='mongodb',
type = 'upsert',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'demo.t3',
mongodb.bulk_write.max_entries = '1024',
primary_key='a,b'
);

statement ok
delete from t3 where a = 1 and b = 2;

statement ok
insert into t3 values(1, 2, 'abc');

statement ok
FLUSH;

statement ok
DROP SINK t1_sink;

statement ok
DROP TABLE t1;

statement ok
DROP SINK t2_sink;

statement ok
DROP TABLE t2;

statement ok
DROP SINK t3_sink;

statement ok
DROP TABLE t3;
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ jsonwebtoken = "9.2.0"
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
maplit = "1.0.2"
moka = { version = "0.12", features = ["future"] }
mongodb = { version = "2.8.2", features = ["tokio-runtime"] }
mysql_async = { version = "0.34", default-features = false, features = [
"default",
] }
Expand Down
Loading
Loading