Skip to content

Commit

Permalink
Full support for MariaDB
Browse files Browse the repository at this point in the history
  • Loading branch information
osalvador committed Jun 24, 2021
1 parent 068b9c2 commit fdecef0
Show file tree
Hide file tree
Showing 9 changed files with 359 additions and 10 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<img src="https://img.shields.io/github/license/osalvador/replicadb?style=for-the-badge" alt="License"> <img src="https://img.shields.io/github/v/release/osalvador/replicadb?style=for-the-badge" alt="Last Version">
<img src="https://img.shields.io/travis/osalvador/replicadb?style=for-the-badge&logo=travis" alt="Main">
<img src="https://img.shields.io/docker/pulls/osalvador/replicadb.svg?style=for-the-badge&logo=docker" alt="Docker Pull">
<img src="https://img.shields.io/github/downloads/osalvador/replicadb/total?style=for-the-badge&logo=github" alt="Github Downloads">
<img src="https://img.shields.io/github/stars/osalvador/replicadb.svg?style=for-the-badge&logo=github" alt="Github Start">
Expand Down Expand Up @@ -133,6 +132,8 @@ $ replicadb --mode=complete -j=1 \
| Persistent Store | Source | Sink Complete | Sink Complete-Atomic | Sink Incremental | Sink Bandwidth Throttling |
|------------------|:------------------------:|:------------------:|:--------------------:|:------------------:|:-------------------------:|
| Oracle | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: |
| MySQL | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: |
| MariaDB | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: |
| PostgreSQL | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: |
| Denodo | :heavy_check_mark: | N/A | N/A | N/A | N/A |
| CSV | :heavy_check_mark: | :heavy_check_mark: | N/A | :heavy_check_mark: | :heavy_check_mark: |
Expand Down
2 changes: 2 additions & 0 deletions ReplicaDB.iml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<orderEntry type="library" scope="TEST" name="Maven: org.testcontainers:mysql:1.15.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.testcontainers:jdbc:1.15.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.testcontainers:database-commons:1.15.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.testcontainers:mariadb:1.15.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.testcontainers:oracle-xe:1.15.3" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.testcontainers:postgresql:1.15.3" level="project" />
<orderEntry type="library" name="Maven: org.apache.logging.log4j:log4j-api:2.13.0" level="project" />
Expand Down Expand Up @@ -62,6 +63,7 @@
<orderEntry type="library" name="Maven: com.oracle.ojdbc:xmlparserv2:19.3.0.0" level="project" />
<orderEntry type="library" name="Maven: mysql:mysql-connector-java:8.0.21" level="project" />
<orderEntry type="library" name="Maven: com.google.protobuf:protobuf-java:3.11.4" level="project" />
<orderEntry type="library" name="Maven: org.mariadb.jdbc:mariadb-java-client:2.7.3" level="project" />
<orderEntry type="library" name="Maven: org.slf4j:slf4j-log4j12:1.7.25" level="project" />
<orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.12.1" level="project" />
Expand Down
14 changes: 14 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@
<version>${version.testContainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mariadb</artifactId>
<version>${version.testContainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>oracle-xe</artifactId>
Expand Down Expand Up @@ -176,6 +182,14 @@
<version>8.0.21</version>
</dependency>

<!-- MariaDB -->
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.7.3</version>
</dependency>


<!--Dependencia con el driver de Denodo-->
<!--<dependency>-->
<!--<groupId>com.denodo</groupId>-->
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/replicadb/manager/JdbcDrivers.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.replicadb.manager;

public enum JdbcDrivers {
MYSQL("com.mysql.cj.jdbc.Driver", "jdbc:mysql:"), POSTGRES("org.postgresql.Driver", "jdbc:postgresql:"),
MYSQL("com.mysql.cj.jdbc.Driver", "jdbc:mysql:"),MARIADB("org.mariadb.jdbc.Driver", "jdbc:mariadb:"), POSTGRES("org.postgresql.Driver", "jdbc:postgresql:"),
HSQLDB("org.hsqldb.jdbcDriver", "jdbc:hsqldb:"), ORACLE("oracle.jdbc.OracleDriver", "jdbc:oracle:"),
SQLSERVER("com.microsoft.sqlserver.jdbc.SQLServerDriver", "jdbc:sqlserver:"),
JTDS_SQLSERVER("net.sourceforge.jtds.jdbc.Driver", "jdbc:jtds:sqlserver:"),
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/replicadb/manager/ManagerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public ConnManager accept(ToolOptions options, DataSourceType dsType) {
return new SQLServerManager(options, dsType);
} else if (S3.isTheManagerTypeOf(options, dsType)) {
return new S3Manager(options, dsType);
} else if (MYSQL.isTheManagerTypeOf(options, dsType)) {
// In MySQL this properties are required
} else if (MYSQL.isTheManagerTypeOf(options, dsType) || MARIADB.isTheManagerTypeOf(options, dsType)) {
// In MySQL and MariaDB this properties are required
if (dsType.equals(DataSourceType.SINK)){
Properties mysqlProps = new Properties();
mysqlProps.setProperty("characterEncoding", "UTF-8");
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/replicadb/manager/SupportedManagers.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import org.replicadb.cli.ToolOptions;

public enum SupportedManagers {
MYSQL(JdbcDrivers.MYSQL.getSchemePrefix()), POSTGRES(JdbcDrivers.POSTGRES.getSchemePrefix()),
MYSQL(JdbcDrivers.MYSQL.getSchemePrefix()), MARIADB(JdbcDrivers.MARIADB.getSchemePrefix()), POSTGRES(JdbcDrivers.POSTGRES.getSchemePrefix()),
HSQLDB(JdbcDrivers.HSQLDB.getSchemePrefix()), ORACLE(JdbcDrivers.ORACLE.getSchemePrefix()),
SQLSERVER(JdbcDrivers.SQLSERVER.getSchemePrefix()), CUBRID(JdbcDrivers.CUBRID.getSchemePrefix()),
JTDS_SQLSERVER(JdbcDrivers.JTDS_SQLSERVER.getSchemePrefix()), DB2(JdbcDrivers.DB2.getSchemePrefix()),
NETEZZA(JdbcDrivers.NETEZZA.getSchemePrefix()), DENODO(JdbcDrivers.DENODO.getSchemePrefix()),
CSV(JdbcDrivers.CSV.getSchemePrefix()),KAFKA(JdbcDrivers.KAFKA.getSchemePrefix()),S3(JdbcDrivers.S3.getSchemePrefix());
CSV(JdbcDrivers.CSV.getSchemePrefix()), KAFKA(JdbcDrivers.KAFKA.getSchemePrefix()), S3(JdbcDrivers.S3.getSchemePrefix());

private final String schemePrefix;

Expand Down Expand Up @@ -44,9 +44,9 @@ static String extractScheme(ToolOptions options, DataSourceType dsType) {

String connectStr = null;

if (dsType == DataSourceType.SOURCE){
if (dsType == DataSourceType.SOURCE) {
connectStr = options.getSourceConnect();
} else if (dsType == DataSourceType.SINK){
} else if (dsType == DataSourceType.SINK) {
connectStr = options.getSinkConnect();
} else {
LOG.error("DataSourceType must be Source or Sink");
Expand Down
228 changes: 228 additions & 0 deletions src/test/java/org/replicadb/mariadb/MariaDB2PostgresTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package org.replicadb.mariadb;

import org.apache.commons.cli.ParseException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.ClassRule;
import org.junit.jupiter.api.*;
import org.replicadb.ReplicaDB;
import org.replicadb.cli.ReplicationMode;
import org.replicadb.cli.ToolOptions;
import org.replicadb.utils.ScriptRunner;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Paths;
import java.sql.*;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Testcontainers
class MariaDB2PostgresTest {
private static final Logger LOG = LogManager.getLogger(MariaDB2PostgresTest.class);
private static final String RESOURECE_DIR = Paths.get("src", "test", "resources").toFile().getAbsolutePath();
private static final String REPLICADB_CONF_FILE = "/replicadb.conf";
private static final String MARIADB_SOURCE_FILE = "/mariadb/mariadb-source.sql";
private static final String POSTGRES_SINK_FILE = "/sinks/pg-sink.sql";
private static final String USER_PASSWD_DB = "replicadb";

private Connection mariadbConn;
private Connection postgresConn;

@ClassRule
private static final MariaDBContainer mariadb = new MariaDBContainer("mariadb:10.2")
.withDatabaseName(USER_PASSWD_DB)
.withUsername(USER_PASSWD_DB)
.withPassword(USER_PASSWD_DB);

@ClassRule
public static PostgreSQLContainer postgres = new PostgreSQLContainer("postgres:9.6")
.withDatabaseName(USER_PASSWD_DB)
.withUsername(USER_PASSWD_DB)
.withPassword(USER_PASSWD_DB);

@BeforeAll
static void setUp() throws SQLException, IOException {
// Start the mariadb container
mariadb.start();
postgres.start();
// Create tables
/*MariaDB*/
Connection con = DriverManager.getConnection(mariadb.getJdbcUrl(), mariadb.getUsername(), mariadb.getPassword());
ScriptRunner runner = new ScriptRunner(con, false, true);
runner.runScript(new BufferedReader(new FileReader(RESOURECE_DIR + MARIADB_SOURCE_FILE)));
LOG.info("Creating MariaDB source tables");
con.close();
/*Postgres*/
con = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword());
runner = new ScriptRunner(con, false, true);
runner.runScript(new BufferedReader(new FileReader(RESOURECE_DIR + POSTGRES_SINK_FILE)));
LOG.info("Creating Postgres sink tables");
con.close();
}

@BeforeEach
void before() throws SQLException {
this.mariadbConn = DriverManager.getConnection(mariadb.getJdbcUrl(), mariadb.getUsername(), mariadb.getPassword());
this.postgresConn = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword());
}

@AfterEach
void tearDown() throws SQLException {
// Truncate sink table and close connections
postgresConn.createStatement().execute("TRUNCATE TABLE T_SINK");
this.mariadbConn.close();
this.postgresConn.close();
}


public int countSinkRows() throws SQLException {
Statement stmt = postgresConn.createStatement();
ResultSet rs = stmt.executeQuery("select count(*) from t_sink");
rs.next();
int count = rs.getInt(1);
LOG.info(count);
return count;
}


@Test
void testMariadbVersion102() throws SQLException {
Statement stmt = mariadbConn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT VERSION()");
rs.next();
String version = rs.getString(1);
LOG.info(version);
assertTrue(version.contains("10.2"));
}

@Test
void testPostgresConnection() throws SQLException {
Statement stmt = postgresConn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1");
rs.next();
String version = rs.getString(1);
LOG.info(version);
assertTrue(version.contains("1"));
}

@Test
void testMariadbInit() throws SQLException {
Statement stmt = mariadbConn.createStatement();
ResultSet rs = stmt.executeQuery("select count(*) from t_source");
rs.next();
String version = rs.getString(1);
LOG.info(version);
assertTrue(version.contains("4096"));
}

@Test
void testMariadb2PostgresComplete() throws ParseException, IOException, SQLException {
String[] args = {
"--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE,
"--source-connect", mariadb.getJdbcUrl(),
"--source-user", mariadb.getUsername(),
"--source-password", mariadb.getPassword(),
"--sink-connect", postgres.getJdbcUrl(),
"--sink-user", postgres.getUsername(),
"--sink-password", postgres.getPassword()
};
ToolOptions options = new ToolOptions(args);
Assertions.assertEquals(0, ReplicaDB.processReplica(options));
assertEquals(4096,countSinkRows());
}

@Test
void testMariadb2PostgresCompleteAtomic() throws ParseException, IOException, SQLException {
String[] args = {
"--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE,
"--source-connect", mariadb.getJdbcUrl(),
"--source-user", mariadb.getUsername(),
"--source-password", mariadb.getPassword(),
"--sink-connect", postgres.getJdbcUrl(),
"--sink-user", postgres.getUsername(),
"--sink-password", postgres.getPassword(),
"--mode", ReplicationMode.COMPLETE_ATOMIC.getModeText()
};
ToolOptions options = new ToolOptions(args);
assertEquals(0, ReplicaDB.processReplica(options));
assertEquals(4096,countSinkRows());

}

@Test
void testMariadb2PostgresIncremental() throws ParseException, IOException, SQLException {
String[] args = {
"--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE,
"--source-connect", mariadb.getJdbcUrl(),
"--source-user", mariadb.getUsername(),
"--source-password", mariadb.getPassword(),
"--sink-connect", postgres.getJdbcUrl(),
"--sink-user", postgres.getUsername(),
"--sink-password", postgres.getPassword(),
"--mode", ReplicationMode.INCREMENTAL.getModeText()
};
ToolOptions options = new ToolOptions(args);
assertEquals(0, ReplicaDB.processReplica(options));
assertEquals(4096,countSinkRows());

}

@Test
void testMariadb2PostgresCompleteParallel() throws ParseException, IOException, SQLException {
String[] args = {
"--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE,
"--source-connect", mariadb.getJdbcUrl(),
"--source-user", mariadb.getUsername(),
"--source-password", mariadb.getPassword(),
"--sink-connect", postgres.getJdbcUrl(),
"--sink-user", postgres.getUsername(),
"--sink-password", postgres.getPassword(),
"--jobs", "4"
};
ToolOptions options = new ToolOptions(args);
assertEquals(0, ReplicaDB.processReplica(options));
assertEquals(4096,countSinkRows());
}

@Test
void testMariadb2PostgresCompleteAtomicParallel() throws ParseException, IOException, SQLException {
String[] args = {
"--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE,
"--source-connect", mariadb.getJdbcUrl(),
"--source-user", mariadb.getUsername(),
"--source-password", mariadb.getPassword(),
"--sink-connect", postgres.getJdbcUrl(),
"--sink-user", postgres.getUsername(),
"--sink-password", postgres.getPassword(),
"--mode", ReplicationMode.COMPLETE_ATOMIC.getModeText(),
"--jobs", "4"
};
ToolOptions options = new ToolOptions(args);
assertEquals(0, ReplicaDB.processReplica(options));
assertEquals(4096,countSinkRows());
}

@Test
void testMariadb2PostgresIncrementalParallel() throws ParseException, IOException, SQLException {
String[] args = {
"--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE,
"--source-connect", mariadb.getJdbcUrl(),
"--source-user", mariadb.getUsername(),
"--source-password", mariadb.getPassword(),
"--sink-connect", postgres.getJdbcUrl(),
"--sink-user", postgres.getUsername(),
"--sink-password", postgres.getPassword(),
"--mode", ReplicationMode.INCREMENTAL.getModeText(),
"--jobs", "4"
};
ToolOptions options = new ToolOptions(args);
assertEquals(0, ReplicaDB.processReplica(options));
assertEquals(4096,countSinkRows());
}
}
Loading

0 comments on commit fdecef0

Please sign in to comment.