Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo committed Jun 17, 2023
1 parent 6e7b426 commit 9408a78
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ public class MongodbScanFetchTask implements FetchTask<SourceSplitBase> {

private volatile boolean taskRunning = false;

private MongoClient mongoClient;

public MongodbScanFetchTask(SnapshotSplit snapshotSplit) {
this.snapshotSplit = snapshotSplit;
}
Expand Down Expand Up @@ -153,22 +151,19 @@ public void execute(Context context) throws Exception {
dataBackfillTask.execute(taskContext);
}
} catch (Exception e) {
log.error(
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT,
String.format(
"Execute snapshot read subtask for mongo split %s fail", snapshotSplit),
e);
throw e;
"Execute snapshot read subtask for mongodb split %s fail",
snapshotSplit));
} finally {
taskRunning = false;
if (mongoClient != null) {
mongoClient.close();
}
}
}

@NotNull private MongoCursor<RawBsonDocument> getSnapshotCursor(
@NotNull SnapshotSplit snapshotSplit, MongodbSourceConfig sourceConfig) {
mongoClient = createMongoClient(sourceConfig);
MongoClient mongoClient = createMongoClient(sourceConfig);
MongoCollection<RawBsonDocument> collection =
getMongoCollection(mongoClient, snapshotSplit.getTableId(), RawBsonDocument.class);
BsonDocument startKey = (BsonDocument) snapshotSplit.getSplitStart()[1];
Expand Down Expand Up @@ -212,7 +207,9 @@ public boolean isRunning() {
}

@Override
public void shutdown() {}
public void shutdown() {
// taskRunning = false;
}

@Override
public SnapshotSplit getSplit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public MongodbStreamFetchTask(IncrementalSplit streamSplit) {
}

@Override
public void execute(Context context) throws Exception {
public void execute(Context context) {
MongodbFetchTaskContext taskContext = (MongodbFetchTaskContext) context;
this.sourceConfig = taskContext.getSourceConfig();

Expand Down Expand Up @@ -189,16 +189,13 @@ public void execute(Context context) throws Exception {
}
}
} catch (Exception e) {
log.error("Poll change stream records failed ", e);
throw e;
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT, "Poll change stream records failed");
} finally {
taskRunning = false;
if (changeStreamCursor != null) {
changeStreamCursor.close();
}
if (mongoClient != null) {
mongoClient.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -105,7 +107,7 @@ private static MySqlContainer createMySqlContainer() {
mySqlContainer.withUsername(MYSQL_USER_NAME);
mySqlContainer.withPassword(MYSQL_USER_PASSWORD);
// For local test use
// mySqlContainer.setPortBindings(Collections.singletonList("3308:3306"));
mySqlContainer.setPortBindings(Collections.singletonList("3308:3306"));
return mySqlContainer;
}

Expand Down Expand Up @@ -133,7 +135,7 @@ public void startUp() {
log.info("The second stage:Starting Mongodb containers...");
mongodbContainer = new MongoDBContainer(NETWORK);
// For local test use
// mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
Startables.deepStart(Stream.of(mongodbContainer)).join();
mongodbContainer.executeCommandFileInSeparateDatabase(MONGODB_DATABASE);
initConnection();
Expand All @@ -142,41 +144,63 @@ public void startUp() {

@TestTemplate
public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) {
try {
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/mongodbcdc_to_mysql.conf");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertIterableEquals(
readMongodbData().stream()
.peek(e -> e.remove("_id"))
.map(Document::entrySet)
.map(Set::stream)
.map(
entryStream ->
entryStream
.map(Map.Entry::getValue)
.collect(
Collectors
.toCollection(
ArrayList
::new)))
.collect(Collectors.toList()),
querySql());
});

} catch (Exception e) {
e.printStackTrace();
}
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/mongodbcdc_to_mysql.conf");
container.tearDown();
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException();
}
return null;
});
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertIterableEquals(
readMongodbData().stream()
.peek(e -> e.remove("_id"))
.map(Document::entrySet)
.map(Set::stream)
.map(
entryStream ->
entryStream
.map(Map.Entry::getValue)
.collect(
Collectors.toCollection(
ArrayList
::new)))
.collect(Collectors.toList()),
querySql());
});

// insert update delete
// upsertDeleteSourceTable();
//
// await().atMost(240000, TimeUnit.MILLISECONDS)
// .untilAsserted(
// () -> {
// Assertions.assertIterableEquals(
// readMongodbData().stream()
// .peek(e -> e.remove("_id"))
// .map(Document::entrySet)
// .map(Set::stream)
// .map(
// entryStream ->
// entryStream
//
// .map(Map.Entry::getValue)
// .collect(
//
// Collectors.toCollection(
//
// ArrayList
//
// ::new)))
// .collect(Collectors.toList()),
// querySql());
// });
}

private Connection getJdbcConnection() throws SQLException {
Expand Down Expand Up @@ -214,8 +238,8 @@ public void initConnection() {
String url =
String.format(
"mongodb://%s:%s@%s:%d/%s?authSource=admin",
"stuser",
"stpw",
"superuser",
"superpw",
ipAddress,
port,
MONGODB_DATABASE + "." + MONGODB_COLLECTION);
Expand All @@ -225,6 +249,7 @@ public void initConnection() {
protected List<Document> readMongodbData() {
MongoCollection<Document> sinkTable =
client.getDatabase(MONGODB_DATABASE).getCollection(MongodbCDCIT.MONGODB_COLLECTION);
// If the cursor has been traversed, it will automatically close without explicitly closing.
MongoCursor<Document> cursor = sinkTable.find().sort(Sorts.ascending("_id")).cursor();
List<Document> documents = new ArrayList<>();
while (cursor.hasNext()) {
Expand All @@ -233,10 +258,13 @@ protected List<Document> readMongodbData() {
return documents;
}

@Override
@AfterAll
@Override
public void tearDown() {
// close Container
if (Objects.nonNull(client)) {
client.close();
}
MYSQL_CONTAINER.close();
if (mongodbContainer != null) {
mongodbContainer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ source {
hosts = "mongo0:27017"
database = ["inventory"]
collection = ["inventory.products"]
username = stuser
password = stpw
username = superuser
password = superpw
schema = {
fields {
"_id": string,
Expand Down

0 comments on commit 9408a78

Please sign in to comment.