diff --git a/docs/en/connector-v2/source/MongoDB.md b/docs/en/connector-v2/source/MongoDB.md
new file mode 100644
index 000000000000..e587f919a14a
--- /dev/null
+++ b/docs/en/connector-v2/source/MongoDB.md
@@ -0,0 +1,76 @@
+# MongoDb
+
+> MongoDb source connector
+
+## Description
+
+Read data from MongoDB.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|----------------|--------|----------|---------------|
+| uri | string | yes | - |
+| database | string | yes | - |
+| collection | string | yes | - |
+| schema | object | yes | - |
+| common-options | string | yes | - |
+
+### uri [string]
+
+MongoDB uri
+
+### database [string]
+
+MongoDB database
+
+### collection [string]
+
+MongoDB collection
+
+### schema [object]
+
+Because `MongoDB` does not have the concept of `schema`, when engine reads `MongoDB` , it will sample `MongoDB` data and infer the `schema` . In fact, this process will be slow and may be inaccurate. This parameter can be manually specified. Avoid these problems.
+
+such as:
+
+```
+schema {
+ fields {
+ id = int
+ key_aa = string
+ key_bb = string
+ }
+}
+```
+
+### common options [string]
+
+Source Plugin common parameters, refer to [Source Plugin](common-options.md) for details
+
+## Example
+
+```bash
+mongodb {
+ uri = "mongodb://username:password@127.0.0.1:27017/mypost?retryWrites=true&writeConcern=majority"
+ database = "mydatabase"
+ collection = "mycollection"
+ schema {
+ fields {
+ id = int
+ key_aa = string
+ key_bb = string
+ }
+ }
+ result_table_name = "mongodb_result_table"
+}
+```
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 78a201de43f8..c0aecfec7855 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -127,3 +127,5 @@ seatunnel.source.Redis = connector-redis
seatunnel.sink.Redis = connector-redis
seatunnel.sink.DataHub = connector-datahub
seatunnel.sink.Sentry = connector-sentry
+seatunnel.source.MongoDB = connector-mongodb
+
diff --git a/pom.xml b/pom.xml
index cae9457ddb47..4a13fe353928 100644
--- a/pom.xml
+++ b/pom.xml
@@ -256,7 +256,7 @@
commons-collections4
${commons-collections4.version}
-
+
com.beust
jcommander
@@ -344,7 +344,7 @@
slf4j-log4j12
${slf4j.version}
-
+
commons-logging
commons-logging
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index baf154db1516..25bec4a20aa7 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -161,6 +161,11 @@
connector-sentry
${project.version}
+
+ org.apache.seatunnel
+ connector-mongodb
+ ${project.version}
+
diff --git a/seatunnel-connectors-v2/connector-mongodb/pom.xml b/seatunnel-connectors-v2/connector-mongodb/pom.xml
new file mode 100644
index 000000000000..c6a4287de009
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-mongodb/pom.xml
@@ -0,0 +1,61 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-connectors-v2
+ ${revision}
+
+
+ connector-mongodb
+
+
+ 3.12.11
+
+
+
+
+ org.apache.seatunnel
+ connector-common
+ ${project.version}
+
+
+
+ org.apache.seatunnel
+ seatunnel-format-json
+ ${project.version}
+
+
+
+ org.apache.seatunnel
+ seatunnel-api
+ ${project.version}
+
+
+
+ org.mongodb
+ mongodb-driver
+ ${mongodb.version}
+
+
+
diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
new file mode 100644
index 000000000000..45857e85bf54
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.config;
+
+import java.io.Serializable;
+
+/**
+ * The config of mongodb
+ */
+public class MongodbConfig implements Serializable {
+
+ public static final String URI = "uri";
+
+ public static final String DATABASE = "database";
+
+ public static final String COLLECTION = "collection";
+
+ public static final String SCHEMA = "schema";
+
+ public static final String FORMAT = "format";
+
+ public static final String DEFAULT_FORMAT = "json";
+
+}
diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java
new file mode 100644
index 000000000000..713d5421e3e0
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbParameters.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class MongodbParameters implements Serializable {
+
+ private String uri;
+
+ private String database;
+
+ private String collection;
+
+}
diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
new file mode 100644
index 000000000000..0770975690f0
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
+
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DEFAULT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.SCHEMA;
+import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
+public class MongodbSource extends AbstractSingleSplitSource {
+
+ private SeaTunnelContext seaTunnelContext;
+
+ private SeaTunnelRowType rowType;
+
+ private MongodbParameters params;
+
+ private DeserializationSchema deserializationSchema;
+
+ @Override
+ public String getPluginName() {
+ return "MongoDB";
+ }
+
+ @Override
+ public void prepare(Config config) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(config, URI, DATABASE, COLLECTION);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
+ }
+
+ this.params = ConfigBeanFactory.create(config, MongodbParameters.class);
+
+ if (config.hasPath(SCHEMA)) {
+ Config schema = config.getConfig(SCHEMA);
+ this.rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+ } else {
+ this.rowType = SeaTunnelSchema.buildSimpleTextSchema();
+ }
+
+ // TODO: use format SPI
+ // default use json format
+ String format;
+ if (config.hasPath(FORMAT)) {
+ format = config.getString(FORMAT);
+ this.deserializationSchema = null;
+ } else {
+ format = DEFAULT_FORMAT;
+ this.deserializationSchema = new JsonDeserializationSchema(false, false, rowType);
+ }
+ }
+
+ @Override
+ public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+ this.seaTunnelContext = seaTunnelContext;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SeaTunnelDataType getProducedType() {
+ return this.rowType;
+ }
+
+ @Override
+ public AbstractSingleSplitReader createReader(SingleSplitReaderContext context) throws Exception {
+ return new MongodbSourceReader(context, this.params, this.deserializationSchema);
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
new file mode 100644
index 000000000000..0a4bd95e29b1
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mongodb.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCursor;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class MongodbSourceReader extends AbstractSingleSplitReader {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MongodbSourceReader.class);
+
+ private final SingleSplitReaderContext context;
+
+ private MongoClient client;
+
+ private final MongodbParameters params;
+
+ private final DeserializationSchema deserializationSchema;
+
+ MongodbSourceReader(SingleSplitReaderContext context, MongodbParameters params, DeserializationSchema deserializationSchema) {
+ this.context = context;
+ this.params = params;
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public void open() throws Exception {
+ client = MongoClients.create(params.getUri());
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
+ public void pollNext(Collector output) throws Exception {
+ try (MongoCursor mongoCursor = client.getDatabase(params.getDatabase()).getCollection(params.getCollection()).find().iterator()) {
+
+ while (mongoCursor.hasNext()) {
+ Document doc = mongoCursor.next();
+ HashMap map = new HashMap<>(doc.size());
+ Set> entries = doc.entrySet();
+ for (Map.Entry entry : entries) {
+ if (!"_id".equalsIgnoreCase(entry.getKey())) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ map.put(key, value);
+ }
+ }
+ String content = JsonUtils.toJsonString(map);
+ if (deserializationSchema != null) {
+ deserializationSchema.deserialize(content.getBytes(), output);
+ } else {
+ // TODO: use seatunnel-text-format
+ output.collect(new SeaTunnelRow(new Object[]{content}));
+ }
+ }
+ } finally {
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the data.
+ LOGGER.info("Closed the bounded mongodb source");
+ context.signalNoMoreElement();
+ }
+ }
+ }
+
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index e1c5b683d38b..facea7d474d5 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -53,6 +53,7 @@
connector-redis
connector-datahub
connector-sentry
+ connector-mongodb
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 06d2b3285276..ecb3c9ed0214 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -763,6 +763,9 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) Maven Settings (org.apache.maven:maven-settings:3.1.1 - http://maven.apache.org/ref/3.1.1/maven-settings)
(The Apache Software License, Version 2.0) Maven Settings Builder (org.apache.maven:maven-settings-builder:3.1.1 - http://maven.apache.org/ref/3.1.1/maven-settings-builder)
(The Apache Software License, Version 2.0) MongoDB Java Driver (org.mongodb:mongo-java-driver:3.4.2 - http://www.mongodb.org)
+ (The Apache Software License, Version 2.0) MongoDB Java Driver (org.mongodb:mongodb-driver:3.12.11 - http://www.mongodb.org)
+ (The Apache Software License, Version 2.0) MongoDB Java Driver Core (org.mongodb:mongodb-driver-core:3.12.11 - http://www.mongodb.org)
+ (The Apache Software License, Version 2.0) BSON (org.mongodb:bson:3.12.11 - https://bsonspec.org)
(The Apache Software License, Version 2.0) Nimbus JOSE+JWT (com.nimbusds:nimbus-jose-jwt:4.41.1 - https://bitbucket.org/connect2id/nimbus-jose-jwt)
(The Apache Software License, Version 2.0) Okio (com.squareup.okio:okio:1.17.2 - https://github.com/square/okio/)
(The Apache Software License, Version 2.0) Phoenix - Spark (org.apache.phoenix:phoenix-spark:5.0.0-HBase-2.0 - http://www.apache.org/phoenix/phoenix-spark/)
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml
new file mode 100644
index 000000000000..a49496c68f4b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/pom.xml
@@ -0,0 +1,45 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-flink-connector-v2-e2e
+ ${revision}
+
+
+ connector-mongodb-flink-e2e
+
+
+
+ org.apache.seatunnel
+ connector-flink-e2e-base
+ ${project.version}
+ tests
+ test-jar
+ test
+
+
+ org.apache.seatunnel
+ connector-mongodb
+ ${project.version}
+ test
+
+
+
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java
new file mode 100644
index 000000000000..1c5dc90ceae9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.mongodb;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import lombok.extern.slf4j.Slf4j;
+import org.bson.Document;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class MongodbSourceToConsoleIT extends FlinkContainer {
+
+ private static final String MONGODB_IMAGE = "mongo:latest";
+
+ private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb_source";
+
+ private static final int MONGODB_PORT = 27017;
+
+ private static final String MONGODB_DATABASE = "test_db";
+
+ private static final String MONGODB_COLLECTION = "test_table";
+
+ private GenericContainer> mongodbContainer;
+
+ private MongoClient client;
+
+ @BeforeEach
+ public void startMongoContainer() {
+ DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
+ mongodbContainer = new GenericContainer<>(imageName)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MONGODB_CONTAINER_HOST)
+ .withExposedPorts(MONGODB_PORT)
+ .waitingFor(new HttpWaitStrategy()
+ .forPort(MONGODB_PORT)
+ .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
+ .withStartupTimeout(Duration.ofMinutes(2)))
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ Startables.deepStart(Stream.of(mongodbContainer)).join();
+ log.info("Mongodb container started");
+ Awaitility.given().ignoreExceptions()
+ .await()
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(this::initConnection);
+ this.generateTestData();
+ }
+
+ public void initConnection() {
+ String host = mongodbContainer.getContainerIpAddress();
+ int port = mongodbContainer.getFirstMappedPort();
+ String url = String.format("mongodb://%s:%d/%s", host, port, MONGODB_DATABASE);
+
+ client = MongoClients.create(url);
+ }
+
+ private void generateTestData() {
+ MongoCollection mongoCollection = client
+ .getDatabase(MONGODB_DATABASE)
+ .getCollection(MONGODB_COLLECTION);
+
+ mongoCollection.deleteMany(new Document());
+
+ HashMap map = new HashMap<>();
+ map.put("id", 1);
+ map.put("key_aa", "value_aa");
+ map.put("key_bb", "value_bb");
+ Document doc = new Document(map);
+ mongoCollection.insertOne(doc);
+ }
+
+ @Test
+ public void testMongodbSource() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/mongodb/mongodb_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @AfterEach
+ public void close() {
+ super.close();
+ if (client != null) {
+ client.close();
+ }
+ if (mongodbContainer != null) {
+ mongodbContainer.close();
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000000..db5d9e512204
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_to_console.conf
new file mode 100644
index 000000000000..3480d955c7cc
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/resources/mongodb/mongodb_to_console.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ MongoDB {
+ uri = "mongodb://flink_e2e_mongodb_source:27017/test_db?retryWrites=true&writeConcern=majority"
+ database = "test_db"
+ collection = "test_table"
+ schema {
+ fields {
+ id = int
+ key_aa = string
+ key_bb = string
+ }
+ }
+ result_table_name = "test_table"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/MongoDB
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/transform/Sql
+}
+
+sink {
+ Console {}
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Console
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 737e6c32332e..4f5f037dd7db 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -35,6 +35,7 @@
connector-datahub-flink-e2e
connector-assert-flink-e2e
connector-fake-flink-e2e
+ connector-mongodb-flink-e2e
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 8f4a7db1eafd..d80df589f212 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -434,7 +434,10 @@ memory-0.9.0.jar
metrics-core-4.2.9.jar
minlog-1.3.0.jar
mongo-java-driver-3.4.2.jar
+mongodb-driver-3.12.11.jar
+mongodb-driver-core-3.12.11.jar
mongo-spark-connector_2.11-2.2.0.jar
+bson-3.12.11.jar
moshi-1.8.0.jar
msgpack-core-0.9.0.jar
mybatis-3.5.9.jar