Skip to content

Commit

Permalink
[Improve][Connector-v2][Mongodb]Optimize reading logic (apache#5001)
Browse files Browse the repository at this point in the history
Co-authored-by: chenqqq11 <chenzy15@ziroom.com>
  • Loading branch information
2 people authored and liunaijie committed Jul 13, 2023
1 parent aa5f86a commit c46cd40
Showing 1 changed file with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import org.bson.BsonDocument;

import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCursor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -38,6 +37,8 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/** MongoReader reads MongoDB by splits (queries). */
@Slf4j
Expand Down Expand Up @@ -70,7 +71,7 @@ public MongodbReader(
}

@Override
public void open() throws Exception {
public void open() {
if (cursor != null) {
cursor.close();
}
Expand All @@ -87,26 +88,18 @@ public void close() {
public void pollNext(Collector<SeaTunnelRow> output) {
synchronized (output.getCheckpointLock()) {
MongoSplit currentSplit = pendingSplits.poll();
if (null != currentSplit) {
if (currentSplit != null) {
if (cursor != null) {
// current split is in-progress
return;
}
log.info("Prepared to read split {}", currentSplit.splitId());
FindIterable<BsonDocument> rs =
clientProvider
.getDefaultCollection()
.find(currentSplit.getQuery())
.projection(currentSplit.getProjection())
.batchSize(readOptions.getFetchSize())
.noCursorTimeout(readOptions.isNoCursorTimeout())
.maxTime(readOptions.getMaxTimeMS(), TimeUnit.MINUTES);
cursor = rs.iterator();
while (cursor.hasNext()) {
SeaTunnelRow deserialize = deserializer.deserialize(cursor.next());
output.collect(deserialize);
try {
getCursor(currentSplit);
cursorToStream().map(deserializer::deserialize).forEach(output::collect);
} finally {
closeCurrentSplit();
}
closeCurrentSplit();
}
if (noMoreSplit && pendingSplits.isEmpty()) {
// signal to the source that we have reached the end of the data.
Expand All @@ -116,6 +109,23 @@ public void pollNext(Collector<SeaTunnelRow> output) {
}
}

private void getCursor(MongoSplit split) {
cursor =
clientProvider
.getDefaultCollection()
.find(split.getQuery())
.projection(split.getProjection())
.batchSize(readOptions.getFetchSize())
.noCursorTimeout(readOptions.isNoCursorTimeout())
.maxTime(readOptions.getMaxTimeMS(), TimeUnit.MINUTES)
.iterator();
}

private Stream<BsonDocument> cursorToStream() {
Iterable<BsonDocument> iterable = () -> cursor;
return StreamSupport.stream(iterable.spliterator(), false);
}

@Override
public List<MongoSplit> snapshotState(long checkpointId) {
return new ArrayList<>(pendingSplits);
Expand Down

0 comments on commit c46cd40

Please sign in to comment.