Skip to content

Commit

Permalink
[Hotfix]Fix mongodb cdc e2e instability (#5128)
Browse files Browse the repository at this point in the history
Co-authored-by: chenzy15 <chenzy15@ziroom.com>
  • Loading branch information
MonsterChenzhuo and chenzy15 authored Jul 21, 2023
1 parent 4cc10e8 commit 6f30b29
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.SingleChoiceOption;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
Expand Down Expand Up @@ -234,7 +235,7 @@ public class MongodbSourceOptions extends SourceOptions {
.withDescription(
"Decides if the table options contains Debezium client properties that start with prefix 'debezium'.");

public static final Option<StartupMode> STARTUP_MODE =
public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
Options.key(SourceOptions.STARTUP_MODE_KEY)
.singleChoice(
StartupMode.class,
Expand All @@ -245,7 +246,7 @@ public class MongodbSourceOptions extends SourceOptions {
"Optional startup mode for CDC source, valid enumerations are "
+ "\"initial\", \"earliest\", \"latest\", \"timestamp\"\n or \"specific\"");

public static final Option<StopMode> STOP_MODE =
public static final SingleChoiceOption<StopMode> STOP_MODE =
Options.key(SourceOptions.STOP_MODE_KEY)
.singleChoice(StopMode.class, Collections.singletonList(StopMode.NEVER))
.defaultValue(StopMode.NEVER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
Expand Down Expand Up @@ -78,7 +80,7 @@ public class MongodbCDCIT extends TestSuiteBase implements TestResource {

// ----------------------------------------------------------------------------
// mysql
private static final String MYSQL_HOST = "mysql_cdc_e2e";
private static final String MYSQL_HOST = "mysql_e2e";

private static final String MYSQL_USER_NAME = "st_user";

Expand All @@ -104,8 +106,10 @@ private static MySqlContainer createMySqlContainer() {
mySqlContainer.withDatabaseName(MYSQL_DATABASE);
mySqlContainer.withUsername(MYSQL_USER_NAME);
mySqlContainer.withPassword(MYSQL_USER_PASSWORD);
mySqlContainer.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mysql-Docker-Image")));
// For local test use
// mySqlContainer.setPortBindings(Collections.singletonList("3308:3306"));
mySqlContainer.setPortBindings(Collections.singletonList("3310:3306"));
return mySqlContainer;
}

Expand Down Expand Up @@ -134,6 +138,9 @@ public void startUp() {
mongodbContainer = new MongoDBContainer(NETWORK);
// For local test use
mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
mongodbContainer.withLogConsumer(
new Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mongodb-Docker-Image")));

Startables.deepStart(Stream.of(mongodbContainer)).join();
mongodbContainer.executeCommandFileInSeparateDatabase(MONGODB_DATABASE);
initConnection();
Expand Down Expand Up @@ -213,6 +220,7 @@ private List<List<Object>> querySql() {
for (int i = 1; i <= columnCount; i++) {
objects.add(resultSet.getObject(i));
}
log.info("Print mysql sink data:" + objects);
result.add(objects);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set engine configuration here
Expand Down Expand Up @@ -45,11 +42,10 @@ source {

sink {
jdbc {
url = "jdbc:mysql://mysql_cdc_e2e:3306?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8"
url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user"
password = "seatunnel"

generate_sink_sql = true
# You need to configure both database and table
database = mongodb_cdc
Expand Down

0 comments on commit 6f30b29

Please sign in to comment.