Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): Support Mongodb sink #17102

Merged
merged 27 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8a9e865
save work
ly9chee May 21, 2024
b307b1a
save work
ly9chee May 21, 2024
861a654
save work
ly9chee May 25, 2024
2fda3b7
save work
ly9chee Jun 4, 2024
e23c399
Merge branch 'mongodb-sink' of https://github.com/ly9chee/risingwave …
ly9chee Jun 4, 2024
1901d80
add e2e tests
ly9chee Jun 4, 2024
d5d799c
fix e2e tests
ly9chee Jun 4, 2024
a1763b8
update Cargo.lock
ly9chee Jun 4, 2024
4c36a53
merge main
ly9chee Jun 4, 2024
2404c76
fix fmt
ly9chee Jun 4, 2024
63bc960
fix dylint
ly9chee Jun 4, 2024
05a186a
merge pk checking logic
ly9chee Jun 7, 2024
27bb797
support compound pk & fix upsert
ly9chee Jun 8, 2024
4c87a9b
fix fmt
ly9chee Jun 8, 2024
c7d9151
moving encoding logic to bson.rs & minor refactors
ly9chee Jun 12, 2024
e5f7ad0
add license header
ly9chee Jun 13, 2024
73c88e0
Merge branch 'main' into mongodb-sink
ly9chee Jun 13, 2024
7c4ad31
warning when non-insert op received in append-only mode
ly9chee Jun 13, 2024
2d413a7
refactor is_append_only dispatching logic
ly9chee Jun 14, 2024
b461ce3
add comment for MONGODB_BULK_WRITE_SIZE_LIMIT
ly9chee Jun 14, 2024
b8ec448
Merge branch 'main' into mongodb-sink
wenym1 Jun 17, 2024
6e9d968
remove preserve_order from serde_json introduced by bson
ly9chee Jun 17, 2024
623c7e4
Merge branch 'mongodb-sink' of https://github.com/ly9chee/risingwave …
ly9chee Jun 17, 2024
de04e01
Merge branch 'main' into mongodb-sink
wenym1 Jun 18, 2024
19e43b2
Merge branch 'main' into mongodb-sink
ly9chee Jun 18, 2024
90bc3f3
fix lint
ly9chee Jun 18, 2024
d3d31db
Merge branch 'main' into mongodb-sink
ly9chee Jun 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 227 additions & 42 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ services:
- doris-server
- starrocks-fe-server
- starrocks-be-server
- mongodb
- mongodb-setup
- sqlserver-server
volumes:
- ..:/risingwave
Expand Down
72 changes: 72 additions & 0 deletions ci/scripts/e2e-mongodb-sink-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.sh

while getopts 'p:' opt; do
case ${opt} in
p )
profile=$OPTARG
;;
\? )
echo "Invalid Option: -$OPTARG" 1>&2
exit 1
;;
: )
echo "Invalid option: $OPTARG requires an argument" 1>&2
;;
esac
done
shift $((OPTIND -1))

download_and_prepare_rw "$profile" source

echo "--- starting risingwave cluster"
cargo make ci-start ci-sink-test
sleep 1

# install the mongo shell
wget http://archive.ubuntu.com/ubuntu/pool/main/o/openssl/libssl1.1_1.1.1f-1ubuntu2_amd64.deb
wget https://repo.mongodb.org/apt/ubuntu/dists/focal/mongodb-org/4.4/multiverse/binary-amd64/mongodb-org-shell_4.4.28_amd64.deb
dpkg -i libssl1.1_1.1.1f-1ubuntu2_amd64.deb
dpkg -i mongodb-org-shell_4.4.28_amd64.deb

echo '> ping mongodb'
echo 'db.runCommand({ping: 1})' | mongo mongodb://mongodb:27017
echo '> rs config'
echo 'rs.conf()' | mongo mongodb://mongodb:27017
echo '> run mongodb sink test..'

sqllogictest -p 4566 -d dev './e2e_test/sink/mongodb_sink.slt'
sleep 1

append_only_result=$(mongo mongodb://mongodb:27017 --eval 'db.getSiblingDB("demo").t1.countDocuments({})' | tail -n 1)
if [ "$append_only_result" != "1" ]; then
echo "The append-only output is not as expected."
exit 1
fi

upsert_and_dynamic_coll_result1=$(mongo mongodb://mongodb:27017 --eval 'db.getSiblingDB("demo").t2.countDocuments({})' | tail -n 1)
if [ "$upsert_and_dynamic_coll_result1" != "1" ]; then
echo "The upsert output is not as expected."
exit 1
fi

upsert_and_dynamic_coll_result2=$(mongo mongodb://mongodb:27017 --eval 'db.getSiblingDB("shard_2024_01").tenant_1.countDocuments({})' | tail -n 1)
if [ "$upsert_and_dynamic_coll_result2" != "1" ]; then
echo "The upsert output is not as expected."
exit 1
fi

compound_pk_result=$(mongo mongodb://mongodb:27017 --eval 'db.getSiblingDB("demo").t3.countDocuments({})' | tail -n 1)
if [ "$compound_pk_result" != "1" ]; then
echo "The upsert output is not as expected."
exit 1
fi

echo "Mongodb sink check passed"

echo "--- Kill cluster"
risedev ci-kill
19 changes: 19 additions & 0 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,25 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end mongodb sink test"
key: "e2e-mongodb-sink-tests"
command: "ci/scripts/e2e-mongodb-sink-test.sh -p ci-release"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-mongodb-sink-tests"
|| build.env("CI_STEPS") =~ /(^|,)e2e-mongodb-sink-tests?(,|$$)/
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v5.1.0:
run: sink-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "connector node integration test Java {{matrix.java_version}}"
key: "connector-node-integration-test"
command: "ci/scripts/connector-node-integration-test.sh -p ci-release -v {{matrix.java_version}}"
Expand Down
15 changes: 15 additions & 0 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,21 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end mongodb sink test"
if: build.pull_request.labels includes "ci/run-e2e-mongodb-sink-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-mongodb-sink-tests?(,|$$)/
command: "ci/scripts/e2e-mongodb-sink-test.sh -p ci-dev"
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v5.1.0:
run: sink-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "e2e java-binding test"
if: build.pull_request.labels includes "ci/run-java-binding-tests" || build.env("CI_STEPS") =~ /(^|,)java-binding-tests?(,|$$)/
command: "ci/scripts/java-binding-test.sh -p ci-dev"
Expand Down
106 changes: 106 additions & 0 deletions e2e_test/sink/mongodb_sink.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
statement ok
create table t1(
a smallint,
b int,
c bigint,
d rw_int256,
e real,
f double precision,
g varchar,
h bytea,
i date,
j time,
k timestamp,
l timestamptz,
m interval,
n STRUCT <b STRUCT<c INTEGER>, d INTEGER>,
o varchar[],
p jsonb
) append only;

statement ok
create sink t1_sink from t1
with (
connector='mongodb',
type = 'append-only',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'demo.t1',
mongodb.bulk_write.max_entries = '1024'
);

statement ok
insert into t1 values(1, 2, 3, 4, 5.0, 6.0, '7', '\xDe00BeEf', date '2022-04-08', time '18:20:49',
'2022-03-13 01:00:00'::timestamp, '2022-03-13 01:00:00Z'::timestamptz, interval '4 hour',
ROW(ROW(8), 9), ARRAY['a', 'b', 'c'], '{"a": [{"b": 1}], "c": true}'::jsonb);

statement ok
create table t2(
_id bigint primary key,
collection_name varchar,
value varchar
);

statement ok
create sink t2_sink from t2
with (
connector='mongodb',
type = 'upsert',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'demo.t2',
mongodb.bulk_write.max_entries = '1024',
collection.name.field = 'collection_name',
collection.name.field.drop = 'true',
primary_key='_id'
);

statement ok
insert into t2 values(1, 'shard_2024_01.tenant_1', 'data');

statement ok
insert into t2 values(2, '', 'data');

statement ok
create table t3(
a int,
b int,
value text,
primary key (a,b)
);

statement ok
create sink t3_sink from t3
with (
connector='mongodb',
type = 'upsert',
mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0',
collection.name = 'demo.t3',
mongodb.bulk_write.max_entries = '1024',
primary_key='a,b'
);

statement ok
delete from t3 where a = 1 and b = 2;

statement ok
insert into t3 values(1, 2, 'abc');

statement ok
FLUSH;

statement ok
DROP SINK t1_sink;

statement ok
DROP TABLE t1;

statement ok
DROP SINK t2_sink;

statement ok
DROP TABLE t2;

statement ok
DROP SINK t3_sink;

statement ok
DROP TABLE t3;
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ jsonwebtoken = "9.2.0"
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
maplit = "1.0.2"
moka = { version = "0.12", features = ["future"] }
mongodb = { version = "2.8.2", features = ["tokio-runtime"] }
mysql_async = { version = "0.34", default-features = false, features = [
"default",
] }
Expand Down
21 changes: 21 additions & 0 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,3 +744,24 @@ pub(crate) fn load_private_key(
.ok_or_else(|| anyhow!("No private key found"))?;
Ok(cert?.into())
}

#[serde_as]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct MongodbCommon {
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
/// The URL of MongoDB
#[serde(rename = "mongodb.url")]
pub connect_uri: String,
/// The collection name where data should be written to or read from. For sinks, the format is
/// `db_name.collection_name`. Data can also be written to dynamic collections, see `collection.name.field`
/// for more information.
#[serde(rename = "collection.name")]
pub collection_name: String,
}

impl MongodbCommon {
pub(crate) async fn build_client(&self) -> ConnectorResult<mongodb::Client> {
let client = mongodb::Client::with_uri_str(&self.connect_uri).await?;

Ok(client)
}
}
2 changes: 1 addition & 1 deletion src/connector/src/connector_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ pub use mqtt_common::{MqttCommon, QualityOfService as MqttQualityOfService};
pub mod common;
pub use common::{
AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaPrivateLinkCommon, KinesisCommon,
NatsCommon, PulsarCommon, PulsarOauthCommon, RdKafkaPropertiesCommon,
MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon, RdKafkaPropertiesCommon,
PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
};
1 change: 1 addition & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def_anyhow_newtype! {
rumqttc::tokio_rustls::rustls::Error => "TLS error",
rumqttc::v5::ClientError => "MQTT error",
rumqttc::v5::OptionError => "MQTT error",
mongodb::error::Error => "Mongodb error",

openssl::error::ErrorStack => "OpenSSL error",
}
Expand Down
Loading
Loading