From fdecef0637397d80c503a728b00a6aafff99556c Mon Sep 17 00:00:00 2001 From: Oscar Date: Thu, 24 Jun 2021 10:18:42 +0200 Subject: [PATCH] Full support for MariaDB --- README.md | 3 +- ReplicaDB.iml | 2 + pom.xml | 14 ++ .../org/replicadb/manager/JdbcDrivers.java | 2 +- .../org/replicadb/manager/ManagerFactory.java | 4 +- .../replicadb/manager/SupportedManagers.java | 8 +- .../mariadb/MariaDB2PostgresTest.java | 228 ++++++++++++++++++ src/test/resources/mariadb/mariadb-source.sql | 106 ++++++++ src/test/resources/replicadb.conf | 2 - 9 files changed, 359 insertions(+), 10 deletions(-) create mode 100644 src/test/java/org/replicadb/mariadb/MariaDB2PostgresTest.java create mode 100644 src/test/resources/mariadb/mariadb-source.sql diff --git a/README.md b/README.md index 0650025..be3615b 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ License Last Version -Main Docker Pull Github Downloads Github Start @@ -133,6 +132,8 @@ $ replicadb --mode=complete -j=1 \ | Persistent Store | Source | Sink Complete | Sink Complete-Atomic | Sink Incremental | Sink Bandwidth Throttling | |------------------|:------------------------:|:------------------:|:--------------------:|:------------------:|:-------------------------:| | Oracle | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | +| MySQL | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | +| MariaDB | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | | PostgreSQL | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | :heavy_check_mark: | | Denodo | :heavy_check_mark: | N/A | N/A | N/A | N/A | | CSV | :heavy_check_mark: | :heavy_check_mark: | N/A | :heavy_check_mark: | :heavy_check_mark: | diff --git a/ReplicaDB.iml b/ReplicaDB.iml index d401823..58a233d 100644 --- a/ReplicaDB.iml +++ b/ReplicaDB.iml @@ -34,6 +34,7 @@ + @@ -62,6 +63,7 @@ + diff --git a/pom.xml b/pom.xml index a9af759..7eaecbb 100644 --- a/pom.xml +++ b/pom.xml @@ -71,6 +71,12 @@ ${version.testContainers} test + + org.testcontainers + mariadb + ${version.testContainers} + test + org.testcontainers oracle-xe @@ -176,6 +182,14 @@ 8.0.21 + + + org.mariadb.jdbc + mariadb-java-client + 2.7.3 + + + diff --git a/src/main/java/org/replicadb/manager/JdbcDrivers.java b/src/main/java/org/replicadb/manager/JdbcDrivers.java index 0b9775e..d251568 100644 --- a/src/main/java/org/replicadb/manager/JdbcDrivers.java +++ b/src/main/java/org/replicadb/manager/JdbcDrivers.java @@ -1,7 +1,7 @@ package org.replicadb.manager; public enum JdbcDrivers { - MYSQL("com.mysql.cj.jdbc.Driver", "jdbc:mysql:"), POSTGRES("org.postgresql.Driver", "jdbc:postgresql:"), + MYSQL("com.mysql.cj.jdbc.Driver", "jdbc:mysql:"),MARIADB("org.mariadb.jdbc.Driver", "jdbc:mariadb:"), POSTGRES("org.postgresql.Driver", "jdbc:postgresql:"), HSQLDB("org.hsqldb.jdbcDriver", "jdbc:hsqldb:"), ORACLE("oracle.jdbc.OracleDriver", "jdbc:oracle:"), SQLSERVER("com.microsoft.sqlserver.jdbc.SQLServerDriver", "jdbc:sqlserver:"), JTDS_SQLSERVER("net.sourceforge.jtds.jdbc.Driver", "jdbc:jtds:sqlserver:"), diff --git a/src/main/java/org/replicadb/manager/ManagerFactory.java b/src/main/java/org/replicadb/manager/ManagerFactory.java index 9abbed1..ab9a344 100644 --- a/src/main/java/org/replicadb/manager/ManagerFactory.java +++ b/src/main/java/org/replicadb/manager/ManagerFactory.java @@ -72,8 +72,8 @@ public ConnManager accept(ToolOptions options, DataSourceType dsType) { return new SQLServerManager(options, dsType); } else if (S3.isTheManagerTypeOf(options, dsType)) { return new S3Manager(options, dsType); - } else if (MYSQL.isTheManagerTypeOf(options, dsType)) { - // In MySQL this properties are required + } else if (MYSQL.isTheManagerTypeOf(options, dsType) || MARIADB.isTheManagerTypeOf(options, dsType)) { + // In MySQL and MariaDB this properties are required if (dsType.equals(DataSourceType.SINK)){ Properties mysqlProps = new Properties(); mysqlProps.setProperty("characterEncoding", "UTF-8"); diff --git a/src/main/java/org/replicadb/manager/SupportedManagers.java b/src/main/java/org/replicadb/manager/SupportedManagers.java index 322aa73..6de1d7d 100644 --- a/src/main/java/org/replicadb/manager/SupportedManagers.java +++ b/src/main/java/org/replicadb/manager/SupportedManagers.java @@ -5,12 +5,12 @@ import org.replicadb.cli.ToolOptions; public enum SupportedManagers { - MYSQL(JdbcDrivers.MYSQL.getSchemePrefix()), POSTGRES(JdbcDrivers.POSTGRES.getSchemePrefix()), + MYSQL(JdbcDrivers.MYSQL.getSchemePrefix()), MARIADB(JdbcDrivers.MARIADB.getSchemePrefix()), POSTGRES(JdbcDrivers.POSTGRES.getSchemePrefix()), HSQLDB(JdbcDrivers.HSQLDB.getSchemePrefix()), ORACLE(JdbcDrivers.ORACLE.getSchemePrefix()), SQLSERVER(JdbcDrivers.SQLSERVER.getSchemePrefix()), CUBRID(JdbcDrivers.CUBRID.getSchemePrefix()), JTDS_SQLSERVER(JdbcDrivers.JTDS_SQLSERVER.getSchemePrefix()), DB2(JdbcDrivers.DB2.getSchemePrefix()), NETEZZA(JdbcDrivers.NETEZZA.getSchemePrefix()), DENODO(JdbcDrivers.DENODO.getSchemePrefix()), - CSV(JdbcDrivers.CSV.getSchemePrefix()),KAFKA(JdbcDrivers.KAFKA.getSchemePrefix()),S3(JdbcDrivers.S3.getSchemePrefix()); + CSV(JdbcDrivers.CSV.getSchemePrefix()), KAFKA(JdbcDrivers.KAFKA.getSchemePrefix()), S3(JdbcDrivers.S3.getSchemePrefix()); private final String schemePrefix; @@ -44,9 +44,9 @@ static String extractScheme(ToolOptions options, DataSourceType dsType) { String connectStr = null; - if (dsType == DataSourceType.SOURCE){ + if (dsType == DataSourceType.SOURCE) { connectStr = options.getSourceConnect(); - } else if (dsType == DataSourceType.SINK){ + } else if (dsType == DataSourceType.SINK) { connectStr = options.getSinkConnect(); } else { LOG.error("DataSourceType must be Source or Sink"); diff --git a/src/test/java/org/replicadb/mariadb/MariaDB2PostgresTest.java b/src/test/java/org/replicadb/mariadb/MariaDB2PostgresTest.java new file mode 100644 index 0000000..d560b34 --- /dev/null +++ b/src/test/java/org/replicadb/mariadb/MariaDB2PostgresTest.java @@ -0,0 +1,228 @@ +package org.replicadb.mariadb; + +import org.apache.commons.cli.ParseException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.ClassRule; +import org.junit.jupiter.api.*; +import org.replicadb.ReplicaDB; +import org.replicadb.cli.ReplicationMode; +import org.replicadb.cli.ToolOptions; +import org.replicadb.utils.ScriptRunner; +import org.testcontainers.containers.MariaDBContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Paths; +import java.sql.*; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Testcontainers +class MariaDB2PostgresTest { + private static final Logger LOG = LogManager.getLogger(MariaDB2PostgresTest.class); + private static final String RESOURECE_DIR = Paths.get("src", "test", "resources").toFile().getAbsolutePath(); + private static final String REPLICADB_CONF_FILE = "/replicadb.conf"; + private static final String MARIADB_SOURCE_FILE = "/mariadb/mariadb-source.sql"; + private static final String POSTGRES_SINK_FILE = "/sinks/pg-sink.sql"; + private static final String USER_PASSWD_DB = "replicadb"; + + private Connection mariadbConn; + private Connection postgresConn; + + @ClassRule + private static final MariaDBContainer mariadb = new MariaDBContainer("mariadb:10.2") + .withDatabaseName(USER_PASSWD_DB) + .withUsername(USER_PASSWD_DB) + .withPassword(USER_PASSWD_DB); + + @ClassRule + public static PostgreSQLContainer postgres = new PostgreSQLContainer("postgres:9.6") + .withDatabaseName(USER_PASSWD_DB) + .withUsername(USER_PASSWD_DB) + .withPassword(USER_PASSWD_DB); + + @BeforeAll + static void setUp() throws SQLException, IOException { + // Start the mariadb container + mariadb.start(); + postgres.start(); + // Create tables + /*MariaDB*/ + Connection con = DriverManager.getConnection(mariadb.getJdbcUrl(), mariadb.getUsername(), mariadb.getPassword()); + ScriptRunner runner = new ScriptRunner(con, false, true); + runner.runScript(new BufferedReader(new FileReader(RESOURECE_DIR + MARIADB_SOURCE_FILE))); + LOG.info("Creating MariaDB source tables"); + con.close(); + /*Postgres*/ + con = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword()); + runner = new ScriptRunner(con, false, true); + runner.runScript(new BufferedReader(new FileReader(RESOURECE_DIR + POSTGRES_SINK_FILE))); + LOG.info("Creating Postgres sink tables"); + con.close(); + } + + @BeforeEach + void before() throws SQLException { + this.mariadbConn = DriverManager.getConnection(mariadb.getJdbcUrl(), mariadb.getUsername(), mariadb.getPassword()); + this.postgresConn = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword()); + } + + @AfterEach + void tearDown() throws SQLException { + // Truncate sink table and close connections + postgresConn.createStatement().execute("TRUNCATE TABLE T_SINK"); + this.mariadbConn.close(); + this.postgresConn.close(); + } + + + public int countSinkRows() throws SQLException { + Statement stmt = postgresConn.createStatement(); + ResultSet rs = stmt.executeQuery("select count(*) from t_sink"); + rs.next(); + int count = rs.getInt(1); + LOG.info(count); + return count; + } + + + @Test + void testMariadbVersion102() throws SQLException { + Statement stmt = mariadbConn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT VERSION()"); + rs.next(); + String version = rs.getString(1); + LOG.info(version); + assertTrue(version.contains("10.2")); + } + + @Test + void testPostgresConnection() throws SQLException { + Statement stmt = postgresConn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT 1"); + rs.next(); + String version = rs.getString(1); + LOG.info(version); + assertTrue(version.contains("1")); + } + + @Test + void testMariadbInit() throws SQLException { + Statement stmt = mariadbConn.createStatement(); + ResultSet rs = stmt.executeQuery("select count(*) from t_source"); + rs.next(); + String version = rs.getString(1); + LOG.info(version); + assertTrue(version.contains("4096")); + } + + @Test + void testMariadb2PostgresComplete() throws ParseException, IOException, SQLException { + String[] args = { + "--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE, + "--source-connect", mariadb.getJdbcUrl(), + "--source-user", mariadb.getUsername(), + "--source-password", mariadb.getPassword(), + "--sink-connect", postgres.getJdbcUrl(), + "--sink-user", postgres.getUsername(), + "--sink-password", postgres.getPassword() + }; + ToolOptions options = new ToolOptions(args); + Assertions.assertEquals(0, ReplicaDB.processReplica(options)); + assertEquals(4096,countSinkRows()); + } + + @Test + void testMariadb2PostgresCompleteAtomic() throws ParseException, IOException, SQLException { + String[] args = { + "--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE, + "--source-connect", mariadb.getJdbcUrl(), + "--source-user", mariadb.getUsername(), + "--source-password", mariadb.getPassword(), + "--sink-connect", postgres.getJdbcUrl(), + "--sink-user", postgres.getUsername(), + "--sink-password", postgres.getPassword(), + "--mode", ReplicationMode.COMPLETE_ATOMIC.getModeText() + }; + ToolOptions options = new ToolOptions(args); + assertEquals(0, ReplicaDB.processReplica(options)); + assertEquals(4096,countSinkRows()); + + } + + @Test + void testMariadb2PostgresIncremental() throws ParseException, IOException, SQLException { + String[] args = { + "--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE, + "--source-connect", mariadb.getJdbcUrl(), + "--source-user", mariadb.getUsername(), + "--source-password", mariadb.getPassword(), + "--sink-connect", postgres.getJdbcUrl(), + "--sink-user", postgres.getUsername(), + "--sink-password", postgres.getPassword(), + "--mode", ReplicationMode.INCREMENTAL.getModeText() + }; + ToolOptions options = new ToolOptions(args); + assertEquals(0, ReplicaDB.processReplica(options)); + assertEquals(4096,countSinkRows()); + + } + + @Test + void testMariadb2PostgresCompleteParallel() throws ParseException, IOException, SQLException { + String[] args = { + "--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE, + "--source-connect", mariadb.getJdbcUrl(), + "--source-user", mariadb.getUsername(), + "--source-password", mariadb.getPassword(), + "--sink-connect", postgres.getJdbcUrl(), + "--sink-user", postgres.getUsername(), + "--sink-password", postgres.getPassword(), + "--jobs", "4" + }; + ToolOptions options = new ToolOptions(args); + assertEquals(0, ReplicaDB.processReplica(options)); + assertEquals(4096,countSinkRows()); + } + + @Test + void testMariadb2PostgresCompleteAtomicParallel() throws ParseException, IOException, SQLException { + String[] args = { + "--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE, + "--source-connect", mariadb.getJdbcUrl(), + "--source-user", mariadb.getUsername(), + "--source-password", mariadb.getPassword(), + "--sink-connect", postgres.getJdbcUrl(), + "--sink-user", postgres.getUsername(), + "--sink-password", postgres.getPassword(), + "--mode", ReplicationMode.COMPLETE_ATOMIC.getModeText(), + "--jobs", "4" + }; + ToolOptions options = new ToolOptions(args); + assertEquals(0, ReplicaDB.processReplica(options)); + assertEquals(4096,countSinkRows()); + } + + @Test + void testMariadb2PostgresIncrementalParallel() throws ParseException, IOException, SQLException { + String[] args = { + "--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE, + "--source-connect", mariadb.getJdbcUrl(), + "--source-user", mariadb.getUsername(), + "--source-password", mariadb.getPassword(), + "--sink-connect", postgres.getJdbcUrl(), + "--sink-user", postgres.getUsername(), + "--sink-password", postgres.getPassword(), + "--mode", ReplicationMode.INCREMENTAL.getModeText(), + "--jobs", "4" + }; + ToolOptions options = new ToolOptions(args); + assertEquals(0, ReplicaDB.processReplica(options)); + assertEquals(4096,countSinkRows()); + } +} \ No newline at end of file diff --git a/src/test/resources/mariadb/mariadb-source.sql b/src/test/resources/mariadb/mariadb-source.sql new file mode 100644 index 0000000..1541749 --- /dev/null +++ b/src/test/resources/mariadb/mariadb-source.sql @@ -0,0 +1,106 @@ + +CREATE OR REPLACE VIEW generate_series_16 +AS SELECT 0 generate_series UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL + SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL + SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL + SELECT 9 UNION ALL SELECT 10 UNION ALL SELECT 11 UNION ALL + SELECT 12 UNION ALL SELECT 13 UNION ALL SELECT 14 UNION ALL + SELECT 15; + +CREATE OR REPLACE VIEW generate_series_256 +AS SELECT ( ( hi.generate_series << 4 ) | lo.generate_series ) AS generate_series + FROM generate_series_16 lo, generate_series_16 hi; + +CREATE OR REPLACE VIEW generate_series_4k +AS SELECT ( ( hi.generate_series << 8 ) | lo.generate_series ) AS generate_series + FROM generate_series_256 lo, generate_series_16 hi; + +create table t_source ( + /*Exact Numerics*/ + C_INTEGER INTEGER AUTO_INCREMENT, + C_SMALLINT SMALLINT, + C_BIGINT BIGINT, + C_NUMERIC NUMERIC(65,30), + C_DECIMAL DECIMAL(65,30), + /*Approximate Numerics:*/ + C_REAL REAL, + C_DOUBLE_PRECISION DOUBLE PRECISION, + C_FLOAT FLOAT, + /*Binary Strings:*/ + C_BINARY BINARY(35), + C_BINARY_VAR VARBINARY(255), + C_BINARY_LOB BLOB, + /*Boolean:*/ + C_BOOLEAN BOOLEAN, + /*Character Strings:*/ + C_CHARACTER CHAR(35), + C_CHARACTER_VAR VARCHAR(255), + C_CHARACTER_LOB TEXT, + C_NATIONAL_CHARACTER NATIONAL CHARACTER(35), + C_NATIONAL_CHARACTER_VAR NVARCHAR(255), + /*Datetimes:*/ + C_DATE DATE, + C_TIME_WITHOUT_TIMEZONE TIME, + C_TIMESTAMP_WITHOUT_TIMEZONE TIMESTAMP, + C_TIME_WITH_TIMEZONE TIME, + C_TIMESTAMP_WITH_TIMEZONE TIMESTAMP, + -- /*Intervals:*/ + -- INTERVAL DAY /*not supported*/ + -- INTERVAL YEAR /*not supported*/ + -- /*Collection Types:*/ + -- ARRAY /*not supported*/ + -- MULTISET /*not supported*/ + -- /*Other Types:*/ + -- ROW /*not supported*/ + -- XML /*not supported*/ + -- JSON /*not supported*/ + PRIMARY KEY (C_INTEGER) +); + +insert into t_source ( + /*C_INTEGER, auto incremented*/ + C_SMALLINT, + C_BIGINT, + C_NUMERIC, + C_DECIMAL, + C_REAL , + C_DOUBLE_PRECISION, + C_FLOAT, + C_BINARY, + C_BINARY_VAR, + C_BINARY_LOB, + C_BOOLEAN, + C_CHARACTER, + C_CHARACTER_VAR, + C_CHARACTER_LOB, + C_NATIONAL_CHARACTER, + C_NATIONAL_CHARACTER_VAR, + C_DATE, + C_TIME_WITHOUT_TIMEZONE, + C_TIMESTAMP_WITHOUT_TIMEZONE, + C_TIME_WITH_TIMEZONE , + C_TIMESTAMP_WITH_TIMEZONE +) +select + CAST(generate_series * rand() as signed) as C_SMALLINT, + CAST(generate_series * rand() as signed) as C_BIGINT, + CAST(generate_series * rand() as decimal(65, 30)) as C_NUMERIC, + CAST(generate_series * rand() as decimal(65, 30)) as C_DECIMAL, + CAST(generate_series * rand() as decimal(25, 10)) as C_REAL, + CAST(generate_series * rand() as decimal(25, 10)) as C_DOUBLE_PRECISION, + CAST(generate_series * rand() as decimal(25, 10)) as C_FLOAT, + BINARY(md5(rand())) as C_BINARY, + BINARY(md5(rand())) as C_BINARY_VAR, + BINARY(md5(rand())) as C_BINARY_LOB, + TRUE, + md5(rand()) as C_CHAR, + md5(rand()) as C_VARCHAR, + md5(rand()) as C_VARCHAR_LOB, + CONVERT(md5(rand()) using utf8) as C_NATIONAL_CHARACTER, + CONVERT(md5(rand()) using utf8) as C_NATIONAL_CHARACTER_VAR, + current_date as C_DATE, + current_time as C_TIME_WITHOUT_TIMEZONE, + current_timestamp as C_TIMESTAMP_WITHOUT_TIMEZONE, + CONVERT_TZ(current_time, '+00:00', '+02:00' ) as C_TIME_WITH_TIMEZONE, + CONVERT_TZ(current_timestamp, '+00:00', '+02:00' ) as C_TIMESTAMP_WITH_TIMEZONE +from generate_series_4k; diff --git a/src/test/resources/replicadb.conf b/src/test/resources/replicadb.conf index 6696486..b9fd11e 100644 --- a/src/test/resources/replicadb.conf +++ b/src/test/resources/replicadb.conf @@ -6,8 +6,6 @@ verbose=true quoted.identifiers=false ############################# Soruce Options ############################# source.table=t_source -source.connect.parameter.oracle.jdbc.timezoneAsRegion=false ############################# Sink Options ############################# sink.table=t_sink -sink.connect.parameter.oracle.jdbc.timezoneAsRegion=false ############################# Other ############################# \ No newline at end of file