Skip to content

Commit

Permalink
test: created test for MongoDB to Postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
osalvador committed Dec 29, 2022
1 parent 4c86a2f commit 39bbd89
Show file tree
Hide file tree
Showing 7 changed files with 4,527 additions and 2 deletions.
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<version.debezium>1.5.2.Final</version.debezium>
<version.testContainers>1.17.2</version.testContainers>
<version.testContainers>1.17.6</version.testContainers>
</properties>


Expand Down Expand Up @@ -101,6 +101,13 @@
<version>${version.testContainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>${version.testContainers}</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>mysql</groupId>
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/replicadb/manager/MongoDBManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public ResultSet readTable (String tableName, String[] columns, int nThread) thr
}

findIterable.batchSize(options.getFetchSize());
findIterable.allowDiskUse(true);
// TODO: fail in mongodb 4.0.10
//findIterable.allowDiskUse(true);
cursor = findIterable.cursor();
firstDocument = findIterable.first();

Expand All @@ -189,6 +190,8 @@ public ResultSet readTable (String tableName, String[] columns, int nThread) thr

} catch (MongoException me) {
LOG.error("{}: MongoDB error: {}", Thread.currentThread().getName(), me.getMessage(), me);
// rethrow the exception
throw me;
}
return mongoDbResultSet;
}
Expand Down
78 changes: 78 additions & 0 deletions src/test/java/org/replicadb/config/ReplicadbMongodbContainer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.replicadb.config;

import com.mongodb.ConnectionString;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bson.Document;
import org.testcontainers.containers.MongoDBContainer;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;

public class ReplicadbMongodbContainer extends MongoDBContainer {
private static final Logger LOG = LogManager.getLogger(ReplicadbMongodbContainer.class);
private static final String IMAGE_VERSION = "mongo:4.0.10";
private static final String RESOURCE_DIR = Paths.get("src", "test", "resources").toFile().getAbsolutePath();
private static final String SOURCE_FILE = "/mongo/t_source_data.json";
private static final String T_SOURCE_COLLECTION = "t_source";
private static ReplicadbMongodbContainer container;
private ConnectionString connectionString;

private ReplicadbMongodbContainer () {
super(IMAGE_VERSION);
}

public static ReplicadbMongodbContainer getInstance () {
if (container == null) {
container = new ReplicadbMongodbContainer();
container.start();
}
return container;
}

@Override
public void start () {
super.start();
connectionString = new ConnectionString(container.getReplicaSetUrl());

String uri = container.getReplicaSetUrl();
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase(connectionString.getDatabase());
try {
List<String> allLines = Files.readAllLines(Paths.get(RESOURCE_DIR + SOURCE_FILE));

for (String line : allLines) {
// if line is empty or a comment, skip it
if (line.trim().isEmpty() || line.trim().startsWith("//")) {
continue;
}
Document doc = Document.parse(line);
database.getCollection(T_SOURCE_COLLECTION).insertOne(doc);
}
// count inserted documents
long count = database.getCollection(T_SOURCE_COLLECTION).countDocuments();
LOG.info("Inserted " + count + " documents into the '" + T_SOURCE_COLLECTION + "' collection");
// log the first document
LOG.info("First document: " + database.getCollection(T_SOURCE_COLLECTION).find().first().toJson());


} catch (Exception e) {
throw new RuntimeException(e);
}
}
}


@Override
public void stop () {
//do nothing, JVM handles shut down
}

public ConnectionString getMongoConnectionString () {
return connectionString;
}
}
188 changes: 188 additions & 0 deletions src/test/java/org/replicadb/mongo/Mongo2PostgresTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package org.replicadb.mongo;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.apache.commons.cli.ParseException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bson.Document;
import org.junit.Rule;
import org.junit.jupiter.api.*;
import org.replicadb.ReplicaDB;
import org.replicadb.cli.ReplicationMode;
import org.replicadb.cli.ToolOptions;
import org.replicadb.config.ReplicadbMongodbContainer;
import org.replicadb.config.ReplicadbPostgresqlContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.IOException;
import java.nio.file.Paths;
import java.sql.*;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Testcontainers
class Mongo2PostgresTest {
private static final Logger LOG = LogManager.getLogger(Mongo2PostgresTest.class);
private static final String RESOURCE_DIR = Paths.get("src", "test", "resources").toFile().getAbsolutePath();
private static final String REPLICADB_CONF_FILE = "/replicadb.conf";
private static final int EXPECTED_ROWS = 4096;
private static final String SINK_COLLECTION = "t_sink";
private static final String SOURCE_COLUMNS= "{_id:0,C_INTEGER:1,C_SMALLINT:1,C_BIGINT:1,C_NUMERIC:1,C_DECIMAL:1,C_REAL:1,C_DOUBLE_PRECISION:1,C_FLOAT:1,C_BINARY:1,C_BINARY_VAR:1,C_BINARY_LOB:1,C_BOOLEAN:1,C_CHARACTER:1,C_CHARACTER_VAR:1,C_CHARACTER_LOB:1,C_NATIONAL_CHARACTER:1,C_NATIONAL_CHARACTER_VAR:1,C_DATE:1,C_TIMESTAMP_WITH_TIMEZONE:1,C_OBJECT:1}";
private static final String SINK_COLUMNS= "C_REAL, C_JSON, C_FLOAT, C_DOUBLE_PRECISION, C_BIGINT, C_CHARACTER_VAR, C_CHARACTER, C_BINARY, C_NUMERIC, C_SMALLINT, C_TIMESTAMP_WITH_TIMEZONE, C_NATIONAL_CHARACTER_VAR, C_NATIONAL_CHARACTER, C_CHARACTER_LOB, C_DECIMAL, C_INTEGER, C_DATE, C_BOOLEAN, C_BINARY_LOB, C_BINARY_VAR";

private MongoClient mongoClient;
private String mongoDatabaseName;
private Connection postgresConn;

@Rule
public static ReplicadbMongodbContainer mongoContainer = ReplicadbMongodbContainer.getInstance();

@Rule
public static PostgreSQLContainer<ReplicadbPostgresqlContainer> postgres = ReplicadbPostgresqlContainer.getInstance();

@BeforeAll
static void setUp(){
}

@BeforeEach
void before() throws SQLException {
this.mongoClient= MongoClients.create(mongoContainer.getMongoConnectionString());
this.mongoDatabaseName = mongoContainer.getMongoConnectionString().getDatabase();
this.postgresConn = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword());
}

@AfterEach
void tearDown() throws SQLException {
// Truncate sink table and close connections
mongoClient.getDatabase(mongoDatabaseName).getCollection(SINK_COLLECTION).deleteMany(new Document());
this.mongoClient.close();
this.postgresConn.close();
}


public int countSinkRows() throws SQLException {
Statement stmt = postgresConn.createStatement();
ResultSet rs = stmt.executeQuery("select count(*) from t_sink");
rs.next();
return rs.getInt(1);
}


@Test
void testPostgresConnection() throws SQLException {
Statement stmt = postgresConn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1");
rs.next();
String version = rs.getString(1);
LOG.info(version);
assertTrue(version.contains("1"));
}


@Test
void testMongodb2PostgresComplete() throws ParseException, IOException, SQLException {
String[] args = {
"--options-file", RESOURCE_DIR + REPLICADB_CONF_FILE,
"--source-connect", mongoContainer.getReplicaSetUrl(),
"--sink-connect", postgres.getJdbcUrl(),
"--sink-user", postgres.getUsername(),
"--sink-password", postgres.getPassword(),
"--source-columns", SOURCE_COLUMNS,
"--sink-columns", SINK_COLUMNS
};
ToolOptions options = new ToolOptions(args);
Assertions.assertEquals(0, ReplicaDB.processReplica(options));
assertEquals(EXPECTED_ROWS,countSinkRows());
}
//
// @Test
// void testMongodb2PostgresCompleteAtomic() throws ParseException, IOException, SQLException {
// String[] args = {
// "--options-file", RESOURCE_DIR + REPLICADB_CONF_FILE,
// "--source-connect", mongoContainer.getJdbcUrl(),
// "--source-user", mongoContainer.getUsername(),
// "--source-password", mongoContainer.getPassword(),
// "--sink-connect", postgres.getJdbcUrl(),
// "--sink-user", postgres.getUsername(),
// "--sink-password", postgres.getPassword(),
// "--mode", ReplicationMode.COMPLETE_ATOMIC.getModeText()
// };
// ToolOptions options = new ToolOptions(args);
// assertEquals(0, ReplicaDB.processReplica(options));
// assertEquals(EXPECTED_ROWS,countSinkRows());//
// }
//
@Test
void testMongodb2PostgresIncremental() throws ParseException, IOException, SQLException {
String[] args = {
"--options-file", RESOURCE_DIR + REPLICADB_CONF_FILE,
"--source-connect", mongoContainer.getReplicaSetUrl(),
"--sink-connect", postgres.getJdbcUrl(),
"--sink-user", postgres.getUsername(),
"--sink-password", postgres.getPassword(),
"--source-columns", SOURCE_COLUMNS,
"--sink-columns", SINK_COLUMNS,
"--mode", ReplicationMode.INCREMENTAL.getModeText()
};
ToolOptions options = new ToolOptions(args);
assertEquals(0, ReplicaDB.processReplica(options));
assertEquals(EXPECTED_ROWS,countSinkRows());

}

@Test
void testMongodb2PostgresCompleteParallel() throws ParseException, IOException, SQLException {
String[] args = {
"--options-file", RESOURCE_DIR + REPLICADB_CONF_FILE,
"--source-connect", mongoContainer.getReplicaSetUrl(),
"--sink-connect", postgres.getJdbcUrl(),
"--sink-user", postgres.getUsername(),
"--sink-password", postgres.getPassword(),
"--source-columns", SOURCE_COLUMNS,
"--sink-columns", SINK_COLUMNS,
"--jobs", "4"
};
ToolOptions options = new ToolOptions(args);
assertEquals(0, ReplicaDB.processReplica(options));
assertEquals(EXPECTED_ROWS,countSinkRows());
}
//
// @Test
// void testMongodb2PostgresCompleteAtomicParallel() throws ParseException, IOException, SQLException {
// String[] args = {
// "--options-file", RESOURCE_DIR + REPLICADB_CONF_FILE,
// "--source-connect", mongoContainer.getJdbcUrl(),
// "--source-user", mongoContainer.getUsername(),
// "--source-password", mongoContainer.getPassword(),
// "--sink-connect", postgres.getJdbcUrl(),
// "--sink-user", postgres.getUsername(),
// "--sink-password", postgres.getPassword(),
// "--mode", ReplicationMode.COMPLETE_ATOMIC.getModeText(),
// "--jobs", "4"
// };
// ToolOptions options = new ToolOptions(args);
// assertEquals(0, ReplicaDB.processReplica(options));
// assertEquals(EXPECTED_ROWS,countSinkRows());
// }
//
@Test
void testMongodb2PostgresIncrementalParallel() throws ParseException, IOException, SQLException {
String[] args = {
"--options-file", RESOURCE_DIR + REPLICADB_CONF_FILE,
"--source-connect", mongoContainer.getReplicaSetUrl(),
"--sink-connect", postgres.getJdbcUrl(),
"--sink-user", postgres.getUsername(),
"--sink-password", postgres.getPassword(),
"--source-columns", SOURCE_COLUMNS,
"--sink-columns", SINK_COLUMNS,
"--mode", ReplicationMode.INCREMENTAL.getModeText(),
"--jobs", "4"
};
ToolOptions options = new ToolOptions(args);
assertEquals(0, ReplicaDB.processReplica(options));
assertEquals(EXPECTED_ROWS,countSinkRows());
}
}
7 changes: 7 additions & 0 deletions src/test/resources/mongo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Mongo Data generator

I used this command to generate data for the mongo tests using [mgodatagen](https://github.com/feliixx/mgodatagen)

```bash
mgodatagen -f mongoDatagen.json --output=stdout.dat
```
Loading

0 comments on commit 39bbd89

Please sign in to comment.