diff --git a/docs/en/connector-v2/source/MongoDB.md b/docs/en/connector-v2/source/MongoDB.md new file mode 100644 index 000000000000..e587f919a14a --- /dev/null +++ b/docs/en/connector-v2/source/MongoDB.md @@ -0,0 +1,76 @@ +# MongoDb + +> MongoDb source connector + +## Description + +Read data from MongoDB. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [schema projection](../../concept/connector-v2-features.md) +- [ ] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|----------------|--------|----------|---------------| +| uri | string | yes | - | +| database | string | yes | - | +| collection | string | yes | - | +| schema | object | yes | - | +| common-options | string | yes | - | + +### uri [string] + +MongoDB uri + +### database [string] + +MongoDB database + +### collection [string] + +MongoDB collection + +### schema [object] + +Because `MongoDB` does not have the concept of `schema`, when engine reads `MongoDB` , it will sample `MongoDB` data and infer the `schema` . In fact, this process will be slow and may be inaccurate. This parameter can be manually specified. Avoid these problems. + +such as: + +``` +schema { + fields { + id = int + key_aa = string + key_bb = string + } +} +``` + +### common options [string] + +Source Plugin common parameters, refer to [Source Plugin](common-options.md) for details + +## Example + +```bash +mongodb { + uri = "mongodb://username:password@127.0.0.1:27017/mypost?retryWrites=true&writeConcern=majority" + database = "mydatabase" + collection = "mycollection" + schema { + fields { + id = int + key_aa = string + key_bb = string + } + } + result_table_name = "mongodb_result_table" +} +``` diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 78922c4eb586..e90f025348ce 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -126,3 +126,5 @@ seatunnel.source.Redis = connector-redis seatunnel.sink.Redis = connector-redis seatunnel.sink.DataHub = connector-datahub seatunnel.sink.Sentry = connector-sentry +seatunnel.source.MongoDB = connector-mongodb + diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml index baf154db1516..25bec4a20aa7 100644 --- a/seatunnel-connectors-v2-dist/pom.xml +++ b/seatunnel-connectors-v2-dist/pom.xml @@ -161,6 +161,11 @@ connector-sentry ${project.version} + + org.apache.seatunnel + connector-mongodb + ${project.version} + diff --git a/seatunnel-connectors-v2/connector-mongodb/pom.xml b/seatunnel-connectors-v2/connector-mongodb/pom.xml new file mode 100644 index 000000000000..c6a4287de009 --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/pom.xml @@ -0,0 +1,61 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connectors-v2 + ${revision} + + + connector-mongodb + + + 3.12.11 + + + + + org.apache.seatunnel + connector-common + ${project.version} + + + + org.apache.seatunnel + seatunnel-format-json + ${project.version} + + + + org.apache.seatunnel + seatunnel-api + ${project.version} + + + + org.mongodb + mongodb-driver + ${mongodb.version} + + + diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java new file mode 100644 index 000000000000..45857e85bf54 --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.config; + +import java.io.Serializable; + +/** + * The config of mongodb + */ +public class MongodbConfig implements Serializable { + + public static final String URI = "uri"; + + public static final String DATABASE = "database"; + + public static final String COLLECTION = "collection"; + + public static final String SCHEMA = "schema"; + + public static final String FORMAT = "format"; + + public static final String DEFAULT_FORMAT = "json"; + +} diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java new file mode 100644 index 000000000000..713d5421e3e0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.config; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class MongodbParameters implements Serializable { + + private String uri; + + private String database; + + private String collection; + +} diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java new file mode 100644 index 000000000000..0770975690f0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.source; + +import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION; +import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE; +import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DEFAULT_FORMAT; +import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.FORMAT; +import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.SCHEMA; +import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelContext; +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters; +import org.apache.seatunnel.format.json.JsonDeserializationSchema; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory; + +import com.google.auto.service.AutoService; + +@AutoService(SeaTunnelSource.class) +public class MongodbSource extends AbstractSingleSplitSource { + + private SeaTunnelContext seaTunnelContext; + + private SeaTunnelRowType rowType; + + private MongodbParameters params; + + private DeserializationSchema deserializationSchema; + + @Override + public String getPluginName() { + return "MongoDB"; + } + + @Override + public void prepare(Config config) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(config, URI, DATABASE, COLLECTION); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + + this.params = ConfigBeanFactory.create(config, MongodbParameters.class); + + if (config.hasPath(SCHEMA)) { + Config schema = config.getConfig(SCHEMA); + this.rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType(); + } else { + this.rowType = SeaTunnelSchema.buildSimpleTextSchema(); + } + + // TODO: use format SPI + // default use json format + String format; + if (config.hasPath(FORMAT)) { + format = config.getString(FORMAT); + this.deserializationSchema = null; + } else { + format = DEFAULT_FORMAT; + this.deserializationSchema = new JsonDeserializationSchema(false, false, rowType); + } + } + + @Override + public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) { + this.seaTunnelContext = seaTunnelContext; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.BOUNDED; + } + + @Override + public SeaTunnelDataType getProducedType() { + return this.rowType; + } + + @Override + public AbstractSingleSplitReader createReader(SingleSplitReaderContext context) throws Exception { + return new MongodbSourceReader(context, this.params, this.deserializationSchema); + } + +} diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java new file mode 100644 index 000000000000..0a4bd95e29b1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.mongodb.source; + +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCursor; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class MongodbSourceReader extends AbstractSingleSplitReader { + + private static final Logger LOGGER = LoggerFactory.getLogger(MongodbSourceReader.class); + + private final SingleSplitReaderContext context; + + private MongoClient client; + + private final MongodbParameters params; + + private final DeserializationSchema deserializationSchema; + + MongodbSourceReader(SingleSplitReaderContext context, MongodbParameters params, DeserializationSchema deserializationSchema) { + this.context = context; + this.params = params; + this.deserializationSchema = deserializationSchema; + } + + @Override + public void open() throws Exception { + client = MongoClients.create(params.getUri()); + } + + @Override + public void close() throws IOException { + if (client != null) { + client.close(); + } + } + + @Override + public void pollNext(Collector output) throws Exception { + try (MongoCursor mongoCursor = client.getDatabase(params.getDatabase()).getCollection(params.getCollection()).find().iterator()) { + + while (mongoCursor.hasNext()) { + Document doc = mongoCursor.next(); + HashMap map = new HashMap<>(doc.size()); + Set> entries = doc.entrySet(); + for (Map.Entry entry : entries) { + if (!"_id".equalsIgnoreCase(entry.getKey())) { + String key = entry.getKey(); + Object value = entry.getValue(); + map.put(key, value); + } + } + String content = JsonUtils.toJsonString(map); + if (deserializationSchema != null) { + deserializationSchema.deserialize(content.getBytes(), output); + } else { + // TODO: use seatunnel-text-format + output.collect(new SeaTunnelRow(new Object[]{content})); + } + } + } finally { + if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + // signal to the source that we have reached the end of the data. + LOGGER.info("Closed the bounded mongodb source"); + context.signalNoMoreElement(); + } + } + } + +} diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml index e1c5b683d38b..facea7d474d5 100644 --- a/seatunnel-connectors-v2/pom.xml +++ b/seatunnel-connectors-v2/pom.xml @@ -53,6 +53,7 @@ connector-redis connector-datahub connector-sentry + connector-mongodb diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE index 06d2b3285276..ecb3c9ed0214 100644 --- a/seatunnel-dist/release-docs/LICENSE +++ b/seatunnel-dist/release-docs/LICENSE @@ -763,6 +763,9 @@ The text of each license is the standard Apache 2.0 license. (The Apache Software License, Version 2.0) Maven Settings (org.apache.maven:maven-settings:3.1.1 - http://maven.apache.org/ref/3.1.1/maven-settings) (The Apache Software License, Version 2.0) Maven Settings Builder (org.apache.maven:maven-settings-builder:3.1.1 - http://maven.apache.org/ref/3.1.1/maven-settings-builder) (The Apache Software License, Version 2.0) MongoDB Java Driver (org.mongodb:mongo-java-driver:3.4.2 - http://www.mongodb.org) + (The Apache Software License, Version 2.0) MongoDB Java Driver (org.mongodb:mongodb-driver:3.12.11 - http://www.mongodb.org) + (The Apache Software License, Version 2.0) MongoDB Java Driver Core (org.mongodb:mongodb-driver-core:3.12.11 - http://www.mongodb.org) + (The Apache Software License, Version 2.0) BSON (org.mongodb:bson:3.12.11 - https://bsonspec.org) (The Apache Software License, Version 2.0) Nimbus JOSE+JWT (com.nimbusds:nimbus-jose-jwt:4.41.1 - https://bitbucket.org/connect2id/nimbus-jose-jwt) (The Apache Software License, Version 2.0) Okio (com.squareup.okio:okio:1.17.2 - https://github.com/square/okio/) (The Apache Software License, Version 2.0) Phoenix - Spark (org.apache.phoenix:phoenix-spark:5.0.0-HBase-2.0 - http://www.apache.org/phoenix/phoenix-spark/) diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml new file mode 100644 index 000000000000..a49496c68f4b --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml @@ -0,0 +1,45 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-flink-connector-v2-e2e + ${revision} + + + connector-mongodb-flink-e2e + + + + org.apache.seatunnel + connector-flink-e2e-base + ${project.version} + tests + test-jar + test + + + org.apache.seatunnel + connector-mongodb + ${project.version} + test + + + diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java new file mode 100644 index 000000000000..1c5dc90ceae9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.mongodb; + +import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED; + +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import lombok.extern.slf4j.Slf4j; +import org.bson.Document; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@Slf4j +public class MongodbSourceToConsoleIT extends FlinkContainer { + + private static final String MONGODB_IMAGE = "mongo:latest"; + + private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb_source"; + + private static final int MONGODB_PORT = 27017; + + private static final String MONGODB_DATABASE = "test_db"; + + private static final String MONGODB_COLLECTION = "test_table"; + + private GenericContainer mongodbContainer; + + private MongoClient client; + + @BeforeEach + public void startMongoContainer() { + DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE); + mongodbContainer = new GenericContainer<>(imageName) + .withNetwork(NETWORK) + .withNetworkAliases(MONGODB_CONTAINER_HOST) + .withExposedPorts(MONGODB_PORT) + .waitingFor(new HttpWaitStrategy() + .forPort(MONGODB_PORT) + .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED) + .withStartupTimeout(Duration.ofMinutes(2))) + .withLogConsumer(new Slf4jLogConsumer(log)); + Startables.deepStart(Stream.of(mongodbContainer)).join(); + log.info("Mongodb container started"); + Awaitility.given().ignoreExceptions() + .await() + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initConnection); + this.generateTestData(); + } + + public void initConnection() { + String host = mongodbContainer.getContainerIpAddress(); + int port = mongodbContainer.getFirstMappedPort(); + String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE); + + client = MongoClients.create(url); + } + + private void generateTestData() { + MongoCollection mongoCollection = client + .getDatabase(MONGODB_DATABASE) + .getCollection(MONGODB_COLLECTION); + + mongoCollection.deleteMany(new Document()); + + HashMap map = new HashMap<>(); + map.put("id", 1); + map.put("key_aa", "value_aa"); + map.put("key_bb", "value_bb"); + Document doc = new Document(map); + mongoCollection.insertOne(doc); + } + + @Test + public void testMongodbSource() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/mongodb/mongodb_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @AfterEach + public void close() { + super.close(); + if (client != null) { + client.close(); + } + if (mongodbContainer != null) { + mongodbContainer.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/log4j.properties new file mode 100644 index 000000000000..db5d9e512204 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Set everything to be logged to the console +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_to_console.conf new file mode 100644 index 000000000000..3480d955c7cc --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_to_console.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + #job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + MongoDB { + uri = "mongodb://flink_e2e_mongodb_source:27017/test_db?retryWrites=true&writeConcern=majority" + database = "test_db" + collection = "test_table" + schema { + fields { + id = int + key_aa = string + key_bb = string + } + } + result_table_name = "test_table" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/MongoDB +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/transform/Sql +} + +sink { + Console {} + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Console +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml index 737e6c32332e..4f5f037dd7db 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml @@ -35,6 +35,7 @@ connector-datahub-flink-e2e connector-assert-flink-e2e connector-fake-flink-e2e + connector-mongodb-flink-e2e diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 8f4a7db1eafd..d80df589f212 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -434,7 +434,10 @@ memory-0.9.0.jar metrics-core-4.2.9.jar minlog-1.3.0.jar mongo-java-driver-3.4.2.jar +mongodb-driver-3.12.11.jar +mongodb-driver-core-3.12.11.jar mongo-spark-connector_2.11-2.2.0.jar +bson-3.12.11.jar moshi-1.8.0.jar msgpack-core-0.9.0.jar mybatis-3.5.9.jar