Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo committed Jun 16, 2023
1 parent e14582c commit e1d876b
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 9 deletions.
4 changes: 2 additions & 2 deletions docs/en/connector-v2/source/MongoDB-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ Source Options
| hosts | String | Yes | - | The comma-separated list of hostname and port pairs of the MongoDB servers. eg. `localhost:27017,localhost:27018` |
| username | String | No | - | Name of the database user to be used when connecting to MongoDB. |
| password | String | No | - | Password to be used when connecting to MongoDB. |
| databases | String | Yes | - | Name of the database to watch for changes. If not set then all databases will be captured. The database also supports regular expressions to monitor multiple databases matching the regular expression. eg. `db1,db2` |
| collections | String | Yes | - | Name of the collection in the database to watch for changes. If not set then all collections will be captured. The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers. eg. `db1.coll1,db2.coll2` |
| database | String | Yes | - | Name of the database to watch for changes. If not set then all databases will be captured. The database also supports regular expressions to monitor multiple databases matching the regular expression. eg. `db1,db2` |
| collection | String | Yes | - | Name of the collection in the database to watch for changes. If not set then all collections will be captured. The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers. eg. `db1.coll1,db2.coll2` |
| connection.options | String | No | - | The ampersand-separated connection options of MongoDB. eg. `replicaSet=test&connectTimeoutMS=300000` |
| batch.size | Long | No | 1024 | The cursor batch size. |
| poll.max.batch.size | Enum | No | 1024 | Maximum number of change stream documents to include in a single batch when polling for new data. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class MySqlIncrementalSourceFactoryTest {
public class MongodbIncrementalSourceFactoryTest {
@Test
public void testOptionRule() {
Assertions.assertNotNull((new MongodbIncrementalSourceFactory()).optionRule());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;

import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.lifecycle.Startables;

import com.mongodb.client.MongoClient;
Expand Down Expand Up @@ -87,6 +90,9 @@ public class MongodbCDCIT extends TestSuiteBase implements TestResource {
// mysql sink table query sql
private static final String SINK_SQL = "select name,description,weight from products";

private static final String MYSQL_DRIVER_JAR =
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar";

private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");

Expand All @@ -104,6 +110,18 @@ private static MySqlContainer createMySqlContainer() {
return mySqlContainer;
}

@TestContainerExtension
private final ContainerExtendedFactory extendedFactory =
container -> {
Container.ExecResult extraCommands =
container.execInContainer(
"bash",
"-c",
"mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ MYSQL_DRIVER_JAR);
Assertions.assertEquals(0, extraCommands.getExitCode());
};

@BeforeAll
@Override
public void startUp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ env {
source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = "inventory"
collection = "inventory.products"
database = ["inventory"]
collection = ["inventory.products"]
username = stuser
password = stpw
connection.options = "maxIdleTimeMS=3000&connectTimeoutMS=300000&authSource=admin"
schema = {
fields {
"_id": string,
Expand All @@ -45,7 +46,7 @@ source {

sink {
jdbc {
url = "jdbc:mysql://mysql_cdc_e2e:3306"
url = "jdbc:mysql://mysql_cdc_e2e:3306?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user"
password = "seatunnel"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -99,8 +98,6 @@ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger("mysql-docker-image")));
// For local test use
mySqlContainer.setPortBindings(Collections.singletonList("3308:3306"));
return mySqlContainer;
}

Expand Down

0 comments on commit e1d876b

Please sign in to comment.