Skip to content

Commit

Permalink
feat: Transform Mongo document and arrays to json string
Browse files Browse the repository at this point in the history
  • Loading branch information
osalvador committed Dec 29, 2022
1 parent fd35515 commit 4c86a2f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 42 deletions.
55 changes: 31 additions & 24 deletions src/main/java/org/replicadb/manager/MongoDBManager.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package org.replicadb.manager;


import com.mongodb.*;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCompressor;
import com.mongodb.MongoException;
import com.mongodb.client.*;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.WriteModel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.codecs.BsonArrayCodec;
Expand All @@ -22,9 +22,13 @@
import org.replicadb.manager.util.BandwidthThrottling;
import org.replicadb.rowset.MongoDBRowSetImpl;

import java.sql.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.replicadb.manager.SupportedManagers.MONGODB;
Expand All @@ -33,8 +37,6 @@
public class MongoDBManager extends SqlManager {

private static final Logger LOG = LogManager.getLogger(MongoDBManager.class.getName());
private static final String MONGO_LIMIT = "limit";
private static final String MONGO_SKIP = "skip";

private MongoClient sourceMongoClient;
private MongoClient sinkMongoClient;
Expand Down Expand Up @@ -112,6 +114,11 @@ protected Connection makeSinkConnection () throws SQLException {
public ResultSet readTable (String tableName, String[] columns, int nThread) throws SQLException {
// If table name parameter is null get it from options
String collectionName = tableName == null ? this.options.getSourceTable() : tableName;
// if the chunk size is 0 and the current job is greater than 0, return null
if (chunkSize == 0 && nThread > 0) {
return null;
}

long skip = nThread * chunkSize;
mongoDbResultSet = new MongoDBRowSetImpl();

Expand All @@ -130,14 +137,14 @@ public ResultSet readTable (String tableName, String[] columns, int nThread) thr

if (this.options.getJobs() == nThread + 1) {
// add skip to the pipeline
pipeline.add(BsonDocument.parse("{ $skip: "+skip+" }"));
pipeline.add(BsonDocument.parse("{ $skip: " + skip + " }"));
} else {
// add skip and limit to the pipeline
pipeline.add(BsonDocument.parse("{ $skip: "+skip+" }"));
pipeline.add(BsonDocument.parse("{ $limit: "+chunkSize+" }"));
pipeline.add(BsonDocument.parse("{ $skip: " + skip + " }"));
pipeline.add(BsonDocument.parse("{ $limit: " + chunkSize + " }"));
}

LOG.info("Using this aggregation query to get data from MongoDB: {}", pipeline);
LOG.info("{}: Using this aggregation query to get data from MongoDB: {}",Thread.currentThread().getName(), pipeline);
// create a MongoCursor to iterate over the results
cursor = collection.aggregate(pipeline).allowDiskUse(true).cursor();
firstDocument = collection.aggregate(pipeline).allowDiskUse(true).first();
Expand All @@ -149,24 +156,24 @@ public ResultSet readTable (String tableName, String[] columns, int nThread) thr
if (options.getSourceWhere() != null && !options.getSourceWhere().isEmpty()) {
BsonDocument filter = BsonDocument.parse(options.getSourceWhere());
findIterable.filter(filter);
LOG.info("Using this clause to filter data from MongoDB: {}", filter.toJson());
LOG.info("{}: Using this clause to filter data from MongoDB: {}",Thread.currentThread().getName(), filter.toJson());
}
// Source Fields
if (options.getSourceColumns() != null && !options.getSourceColumns().isEmpty()) {
BsonDocument projection = BsonDocument.parse(options.getSourceColumns());
findIterable.projection(projection);
LOG.info("Using this clause to project data from MongoDB: {}", projection.toJson());
LOG.info("{}: Using this clause to project data from MongoDB: {}", Thread.currentThread().getName(), projection.toJson());
}

if (this.options.getJobs() == nThread + 1) {
// add skip to the pipeline
findIterable.skip((int) skip);
LOG.info("Using this clause to skip data from MongoDB: {}", skip);
LOG.info("{}: Skip {} data from source",Thread.currentThread().getName(), skip);
} else {
// add skip and limit to the pipeline
findIterable.skip(Math.toIntExact(skip));
findIterable.limit(Math.toIntExact(chunkSize));
LOG.info("Using this clause to skip and limit data from MongoDB: {} {}", skip, chunkSize);
LOG.info("{}: Skip {}, Limit {} data from source", Thread.currentThread().getName(), skip, chunkSize);
}

findIterable.batchSize(options.getFetchSize());
Expand All @@ -181,7 +188,7 @@ public ResultSet readTable (String tableName, String[] columns, int nThread) thr
mongoDbResultSet.setMongoCursor(cursor);

} catch (MongoException me) {
LOG.error("MongoDB error: {}", me.getMessage(), me);
LOG.error("{}: MongoDB error: {}", Thread.currentThread().getName(), me.getMessage(), me);
}
return mongoDbResultSet;
}
Expand Down Expand Up @@ -216,7 +223,7 @@ public int insertDataToTable (ResultSet resultSet, int taskId) throws Exception
// unordered bulk write
BulkWriteOptions bulkWriteOptions = new BulkWriteOptions().ordered(false);

if (resultSet.next()) {
if (resultSet != null && resultSet.next()) {
// Create Bandwidth Throttling
BandwidthThrottling bt = new BandwidthThrottling(options.getBandwidthThrottling(), options.getFetchSize(), resultSet);
do {
Expand Down Expand Up @@ -295,11 +302,9 @@ public void preSourceTasks () throws Exception {
}

// set chunk size for each task
this.chunkSize = Math.abs( totalRows / this.options.getJobs());
LOG.info("Total rows: {}, chunk size: {}", totalRows, this.chunkSize);

this.chunkSize = Math.abs(totalRows / this.options.getJobs());
LOG.info("Source collection total rows: {}, chunk size per job: {}", totalRows, this.chunkSize);
}

}

@Override
Expand Down Expand Up @@ -385,7 +390,9 @@ public void close () throws SQLException {
mongoDbResultSet.setFetchSize(0);
mongoDbResultSet.close();
MongoCursor<Document> cursor = mongoDbResultSet.getCursor();
cursor.close();
if (cursor != null) {
cursor.close();
}
}

// Close connection, ignore exceptions
Expand Down
48 changes: 30 additions & 18 deletions src/main/java/org/replicadb/rowset/MongoDBRowSetImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bson.Document;
import org.bson.json.Converter;
import org.bson.json.JsonWriterSettings;
import org.bson.json.StrictJsonWriter;
import org.bson.types.Binary;

import javax.sql.RowSetMetaData;
import javax.sql.rowset.RowSetMetaDataImpl;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.sql.Types;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -41,7 +48,7 @@ public void execute () throws SQLException {
List<String> fields = new ArrayList<>();

// if the sink database is mongodb, the document will be inserted as is
if (this.isSourceAndSinkMongo) {
if (Boolean.TRUE.equals(this.isSourceAndSinkMongo)) {
rsmd.setColumnCount(1);
rsmd.setColumnName(1, "document");
rsmd.setColumnType(1, java.sql.Types.OTHER);
Expand Down Expand Up @@ -119,8 +126,10 @@ static int getSqlType (String typeString) {
case "class org.bson.types.Binary":
return java.sql.Types.BINARY;
case "class java.util.List":
case "class java.util.ArrayList":
return java.sql.Types.ARRAY;
case "class org.bson.Document":
return Types.STRUCT;
case "class org.bson.types.ObjectId":
case "class java.lang.Object":
default:
Expand Down Expand Up @@ -202,7 +211,7 @@ private void readData () throws SQLException {
break;
case Types.TIMESTAMP_WITH_TIMEZONE:
if (document.getDate(columnName) == null) updateNull(j + 1);
// convert to offsetDateTime
// convert to offsetDateTime
else updateObject(j + 1, document.getDate(columnName).toInstant().atOffset(ZoneOffset.UTC));
break;
case Types.BINARY:
Expand All @@ -215,19 +224,16 @@ private void readData () throws SQLException {
if (document.getBoolean(columnName) == null) updateNull(j + 1);
else updateBoolean(j + 1, document.getBoolean(columnName));
break;
case Types.ARRAY:
if (document.get(columnName) == null) updateNull(j + 1);
// convert to java.sql.Array
else updateString(j + 1, document.get(columnName).toString());
break;
default:
if (document.getString(columnName) == null) updateNull(j + 1);
else updateString(j + 1, document.getString(columnName));
String json = documentToJson(document.get(columnName, org.bson.Document.class));
if (json == null) updateNull(j + 1);
else updateString(j + 1, json);
break;
}
}
}
insertRow();
document.clear();
}
} catch (Exception e) {
LOG.error("MongoDB error: {}", e.getMessage(), e);
Expand All @@ -239,16 +245,22 @@ private void readData () throws SQLException {
beforeFirst();
}

public static class JsonDateTimeConverter implements Converter<Long> {
static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ISO_INSTANT.withZone(ZoneId.of("UTC"));
@Override
public void convert (Long value, StrictJsonWriter writer) {
Instant instant = new Date(value).toInstant();
String s = DATE_TIME_FORMATTER.format(instant);
writer.writeString(s);
}
}

/**
* Checks if the value is empty or null and return a null object
*
* @param value
* @return
*/
private String getStringOrNull (String value) {
if (value == null || value.isEmpty()) value = null;
return value;
private String documentToJson (Document document) {
if (document == null) return null;
return document.toJson(JsonWriterSettings
.builder()
.dateTimeConverter(new JsonDateTimeConverter())
.build());
}

public void setMongoCursor (MongoCursor<Document> cursor) {
Expand Down

0 comments on commit 4c86a2f

Please sign in to comment.