Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo committed Jun 16, 2023
1 parent 61b23ff commit 6e7b426
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.bson.BsonValue;
import org.jetbrains.annotations.NotNull;

import com.mongodb.client.MongoClient;
import com.mongodb.client.model.changestream.OperationType;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
Expand Down Expand Up @@ -198,6 +199,14 @@ record -> {

@Override
public void close() {
createMongoClient(sourceConfig).close();
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
MongoClient mongoClient = createMongoClient(sourceConfig);
if (mongoClient != null) {
mongoClient.close();
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class MongodbScanFetchTask implements FetchTask<SourceSplitBase> {

private volatile boolean taskRunning = false;

private MongoClient mongoClient;

public MongodbScanFetchTask(SnapshotSplit snapshotSplit) {
this.snapshotSplit = snapshotSplit;
}
Expand Down Expand Up @@ -158,12 +160,15 @@ public void execute(Context context) throws Exception {
throw e;
} finally {
taskRunning = false;
if (mongoClient != null) {
mongoClient.close();
}
}
}

@NotNull private MongoCursor<RawBsonDocument> getSnapshotCursor(
@NotNull SnapshotSplit snapshotSplit, MongodbSourceConfig sourceConfig) {
MongoClient mongoClient = createMongoClient(sourceConfig);
mongoClient = createMongoClient(sourceConfig);
MongoCollection<RawBsonDocument> collection =
getMongoCollection(mongoClient, snapshotSplit.getTableId(), RawBsonDocument.class);
BsonDocument startKey = (BsonDocument) snapshotSplit.getSplitStart()[1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ public void execute(Context context) throws Exception {
if (changeStreamCursor != null) {
changeStreamCursor.close();
}
if (mongoClient != null) {
mongoClient.close();
}
}
}

Expand All @@ -205,7 +208,9 @@ public boolean isRunning() {
}

@Override
public void shutdown() {}
public void shutdown() {
taskRunning = false;
}

@Override
public IncrementalSplit getSplit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,35 +142,41 @@ public void startUp() {

@TestTemplate
public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) {
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/mongodbcdc_to_mysql.conf");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertIterableEquals(
readMongodbData().stream()
.peek(e -> e.remove("_id"))
.map(Document::entrySet)
.map(Set::stream)
.map(
entryStream ->
entryStream
.map(Map.Entry::getValue)
.collect(
Collectors.toCollection(
ArrayList
::new)))
.collect(Collectors.toList()),
querySql());
});
try {
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/mongodbcdc_to_mysql.conf");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertIterableEquals(
readMongodbData().stream()
.peek(e -> e.remove("_id"))
.map(Document::entrySet)
.map(Set::stream)
.map(
entryStream ->
entryStream
.map(Map.Entry::getValue)
.collect(
Collectors
.toCollection(
ArrayList
::new)))
.collect(Collectors.toList()),
querySql());
});

} catch (Exception e) {
e.printStackTrace();
}
}

private Connection getJdbcConnection() throws SQLException {
Expand Down

0 comments on commit 6e7b426

Please sign in to comment.