diff --git a/src/main/java/org/replicadb/manager/ManagerFactory.java b/src/main/java/org/replicadb/manager/ManagerFactory.java index 75f31a3..9abbed1 100644 --- a/src/main/java/org/replicadb/manager/ManagerFactory.java +++ b/src/main/java/org/replicadb/manager/ManagerFactory.java @@ -7,6 +7,8 @@ import org.replicadb.manager.cdc.OracleManagerCDC; import org.replicadb.manager.cdc.SQLServerManagerCDC; +import java.util.Properties; + import static org.replicadb.manager.SupportedManagers.*; /** @@ -71,6 +73,13 @@ public ConnManager accept(ToolOptions options, DataSourceType dsType) { } else if (S3.isTheManagerTypeOf(options, dsType)) { return new S3Manager(options, dsType); } else if (MYSQL.isTheManagerTypeOf(options, dsType)) { + // In MySQL this properties are required + if (dsType.equals(DataSourceType.SINK)){ + Properties mysqlProps = new Properties(); + mysqlProps.setProperty("characterEncoding", "UTF-8"); + mysqlProps.setProperty("allowLoadLocalInfile", "true"); + options.setSinkConnectionParams(mysqlProps); + } return new MySQLManager(options, dsType); } else { throw new IllegalArgumentException("The database with scheme "+scheme+" is not supported by ReplicaDB"); diff --git a/src/main/java/org/replicadb/manager/MySQLManager.java b/src/main/java/org/replicadb/manager/MySQLManager.java index 7336f47..022b6bb 100644 --- a/src/main/java/org/replicadb/manager/MySQLManager.java +++ b/src/main/java/org/replicadb/manager/MySQLManager.java @@ -1,12 +1,15 @@ package org.replicadb.manager; +import com.mysql.cj.jdbc.JdbcPreparedStatement; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.replicadb.cli.ReplicationMode; import org.replicadb.cli.ToolOptions; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.sql.*; public class MySQLManager extends SqlManager { @@ -30,13 +33,148 @@ public String getDriverClass() { } @Override - public int insertDataToTable(ResultSet resultSet, int taskId) throws SQLException { - throw new UnsupportedOperationException("MySQL does not yet support data insertion"); + public int insertDataToTable(ResultSet resultSet, int taskId) throws SQLException, IOException { + + try { + + ResultSetMetaData rsmd = resultSet.getMetaData(); + String tableName; + + // Get table name and columns + if (options.getMode().equals(ReplicationMode.COMPLETE.getModeText())) { + tableName = getSinkTableName(); + } else { + tableName = getQualifiedStagingTableName(); + } + + String allColumns = getAllSinkColumns(rsmd); + + // Get MySQL LOAD DATA manager + String loadDataSql = getLoadDataSql(tableName, allColumns); + PreparedStatement statement = this.connection.prepareStatement(loadDataSql); + JdbcPreparedStatement mysqlStatement = statement.unwrap(com.mysql.cj.jdbc.JdbcPreparedStatement.class); + + char unitSeparator = 0x1F; + int columnsNumber = rsmd.getColumnCount(); + + StringBuilder row = new StringBuilder(); + StringBuilder cols = new StringBuilder(); + + byte[] bytes = "".getBytes(); + String colValue; + int rowCounts = 0; + int batchSize = options.getFetchSize(); + + if (resultSet.next()) { + // Create Bandwidth Throttling + bandwidthThrottlingCreate(resultSet, rsmd); + + do { + bandwidthThrottlingAcquiere(); + + // Get Columns values + for (int i = 1; i <= columnsNumber; i++) { + if (i > 1) cols.append(unitSeparator); + + switch (rsmd.getColumnType(i)) { + + case Types.CLOB: + colValue = clobToString(resultSet.getClob(i)); + break; + //case Types.BINARY: + case Types.BLOB: + //colValue = blobToPostgresHex(resultSet.getBlob(i)); + // TODO revisar los BLOB y CLOB + colValue = ""; + break; + default: + colValue = resultSet.getString(i); + break; + // TODO revisar los timestamp con timezone + } + + if (!resultSet.wasNull() || colValue != null) cols.append(colValue); + } + + // Escape special chars + if (this.options.isSinkDisableEscape()) + row.append(cols); + else + row.append(cols.toString().replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("\u0000", "")); + + // Row ends with \n + row.append("\n"); + + // Copy data to mysql + bytes = row.toString().getBytes(StandardCharsets.UTF_8); + + if (++rowCounts % batchSize == 0) { + mysqlStatement.setLocalInfileInputStream(new ByteArrayInputStream(bytes)); + mysqlStatement.executeUpdate(); + + // Clear StringBuilders + row.setLength(0); // set length of buffer to 0 + row.trimToSize(); + rowCounts = 0; + } + + // Clear StringBuilders + cols.setLength(0); // set length of buffer to 0 + cols.trimToSize(); + } while (resultSet.next()); + } + + // insert remaining records + if (rowCounts != 0) { + mysqlStatement.setLocalInfileInputStream(new ByteArrayInputStream(bytes)); + mysqlStatement.executeUpdate(); + } + + } catch (Exception e) { + this.connection.rollback(); + throw e; + } + + this.getConnection().commit(); + return 0; + } + + private String getLoadDataSql(String tableName, String allColumns) { + StringBuilder loadDataSql = new StringBuilder(); + loadDataSql.append("LOAD DATA LOCAL INFILE 'dummy' INTO TABLE "); + loadDataSql.append(tableName); + loadDataSql.append(" CHARACTER SET UTF8 FIELDS TERMINATED BY X'1F' "); + if (allColumns != null) { + loadDataSql.append(" ("); + loadDataSql.append(allColumns); + loadDataSql.append(")"); + } + + LOG.info("Loading data with this command: {}", loadDataSql); + return loadDataSql.toString(); } @Override protected void createStagingTable() throws SQLException { - throw new UnsupportedOperationException("MySQL does not yet support data insertion"); + Statement statement = this.getConnection().createStatement(); + String sinkStagingTable = getQualifiedStagingTableName(); + + // Get sink columns. + String allSinkColumns = null; + if (this.options.getSinkColumns() != null && !this.options.getSinkColumns().isEmpty()) { + allSinkColumns = this.options.getSinkColumns(); + } else if (this.options.getSourceColumns() != null && !this.options.getSourceColumns().isEmpty()) { + allSinkColumns = this.options.getSourceColumns(); + } else { + allSinkColumns = "*"; + } + + String sql = " CREATE TABLE " + sinkStagingTable + " AS (SELECT " + allSinkColumns + " FROM " + this.getSinkTableName() + " WHERE 1 = 0 ) "; + + LOG.info("Creating staging table with this command: " + sql); + statement.executeUpdate(sql); + statement.close(); + this.getConnection().commit(); } @Override diff --git a/src/test/java/org/replicadb/postgres/Postgres2MySQLTest.java b/src/test/java/org/replicadb/postgres/Postgres2MySQLTest.java new file mode 100644 index 0000000..7b89673 --- /dev/null +++ b/src/test/java/org/replicadb/postgres/Postgres2MySQLTest.java @@ -0,0 +1,227 @@ +package org.replicadb.postgres; + +import org.apache.commons.cli.ParseException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.ClassRule; +import org.junit.jupiter.api.*; +import org.replicadb.ReplicaDB; +import org.replicadb.cli.ReplicationMode; +import org.replicadb.cli.ToolOptions; +import org.replicadb.utils.ScriptRunner; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Paths; +import java.sql.*; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Testcontainers +class Postgres2MySQLTest { + private static final Logger LOG = LogManager.getLogger(Postgres2MySQLTest.class); + private static final String RESOURECE_DIR = Paths.get("src", "test", "resources").toFile().getAbsolutePath(); + private static final String REPLICADB_CONF_FILE = "/replicadb.conf"; + private static final String POSTGRES_SOURCE_FILE = "/postgres/pg-source.sql"; + private static final String MYSQL_SINK_FILE = "/sinks/mysql-sink.sql"; + private static final String USER_PASSWD_DB = "replicadb"; + + private Connection mysqlConn; + private Connection postgresConn; + + @ClassRule + private static final MySQLContainer mysql = new MySQLContainer("mysql:5.6") + .withDatabaseName(USER_PASSWD_DB) + .withUsername(USER_PASSWD_DB) + .withPassword(USER_PASSWD_DB); + + @ClassRule + public static PostgreSQLContainer postgres = new PostgreSQLContainer("postgres:9.6") + .withDatabaseName(USER_PASSWD_DB) + .withUsername(USER_PASSWD_DB) + .withPassword(USER_PASSWD_DB); + + @BeforeAll + static void setUp() throws SQLException, IOException { + // Start the mysql container + mysql.start(); + postgres.start(); + // Create tables + /*Postgres*/ + Connection con = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword()); + ScriptRunner runner = new ScriptRunner(con, false, true); + runner.runScript(new BufferedReader(new FileReader(RESOURECE_DIR + POSTGRES_SOURCE_FILE))); + con.close(); + /*MySQL*/ + con = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword()); + runner = new ScriptRunner(con, false, true); + runner.runScript(new BufferedReader(new FileReader(RESOURECE_DIR + MYSQL_SINK_FILE))); + con.close(); + + } + + @BeforeEach + void before() throws SQLException { + this.mysqlConn = DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword()); + this.postgresConn = DriverManager.getConnection(postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword()); + } + + @AfterEach + void tearDown() throws SQLException { + // Truncate sink table and close connections + mysqlConn.createStatement().execute("TRUNCATE TABLE t_sink"); + this.mysqlConn.close(); + this.postgresConn.close(); + } + + + public int countSinkRows() throws SQLException { + Statement stmt = mysqlConn.createStatement(); + ResultSet rs = stmt.executeQuery("select count(*) from t_sink"); + rs.next(); + int count = rs.getInt(1); + LOG.info(count); + return count; + } + + + @Test + void testMysqlVersion56() throws SQLException { + Statement stmt = mysqlConn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT VERSION()"); + rs.next(); + String version = rs.getString(1); + LOG.info(version); + assertTrue(version.contains("5.6")); + } + + @Test + void testPostgresConnection() throws SQLException { + Statement stmt = postgresConn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT 1"); + rs.next(); + String version = rs.getString(1); + LOG.info(version); + assertTrue(version.contains("1")); + } + + @Test + void testMysqlInit() throws SQLException { + Statement stmt = mysqlConn.createStatement(); + ResultSet rs = stmt.executeQuery("select count(*) from t_source"); + rs.next(); + String version = rs.getString(1); + LOG.info(version); + assertTrue(version.contains("4096")); + } + + @Test + void testPostgres2MySQLComplete() throws ParseException, IOException, SQLException { + String[] args = { + "--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE, + "--source-connect", postgres.getJdbcUrl(), + "--source-user", postgres.getUsername(), + "--source-password", postgres.getPassword(), + "--sink-connect", mysql.getJdbcUrl(), + "--sink-user", mysql.getUsername(), + "--sink-password", mysql.getPassword() + }; + ToolOptions options = new ToolOptions(args); + Assertions.assertEquals(0, ReplicaDB.processReplica(options)); + assertEquals(4096, countSinkRows()); + } + + @Test + void testMySQL2PostgresCompleteAtomic() throws ParseException, IOException, SQLException { + String[] args = { + "--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE, + "--source-connect", mysql.getJdbcUrl(), + "--source-user", mysql.getUsername(), + "--source-password", mysql.getPassword(), + "--sink-connect", postgres.getJdbcUrl(), + "--sink-user", postgres.getUsername(), + "--sink-password", postgres.getPassword(), + "--mode", ReplicationMode.COMPLETE_ATOMIC.getModeText() + }; + ToolOptions options = new ToolOptions(args); + assertEquals(0, ReplicaDB.processReplica(options)); + assertEquals(4096, countSinkRows()); + + } + + @Test + void testMySQL2PostgresIncremental() throws ParseException, IOException, SQLException { + String[] args = { + "--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE, + "--source-connect", mysql.getJdbcUrl(), + "--source-user", mysql.getUsername(), + "--source-password", mysql.getPassword(), + "--sink-connect", postgres.getJdbcUrl(), + "--sink-user", postgres.getUsername(), + "--sink-password", postgres.getPassword(), + "--mode", ReplicationMode.INCREMENTAL.getModeText() + }; + ToolOptions options = new ToolOptions(args); + assertEquals(0, ReplicaDB.processReplica(options)); + assertEquals(4096, countSinkRows()); + + } + + @Test + void testMySQL2PostgresCompleteParallel() throws ParseException, IOException, SQLException { + String[] args = { + "--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE, + "--source-connect", mysql.getJdbcUrl(), + "--source-user", mysql.getUsername(), + "--source-password", mysql.getPassword(), + "--sink-connect", postgres.getJdbcUrl(), + "--sink-user", postgres.getUsername(), + "--sink-password", postgres.getPassword(), + "--jobs", "4" + }; + ToolOptions options = new ToolOptions(args); + assertEquals(0, ReplicaDB.processReplica(options)); + assertEquals(4096, countSinkRows()); + } + + @Test + void testMySQL2PostgresCompleteAtomicParallel() throws ParseException, IOException, SQLException { + String[] args = { + "--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE, + "--source-connect", mysql.getJdbcUrl(), + "--source-user", mysql.getUsername(), + "--source-password", mysql.getPassword(), + "--sink-connect", postgres.getJdbcUrl(), + "--sink-user", postgres.getUsername(), + "--sink-password", postgres.getPassword(), + "--mode", ReplicationMode.COMPLETE_ATOMIC.getModeText(), + "--jobs", "4" + }; + ToolOptions options = new ToolOptions(args); + assertEquals(0, ReplicaDB.processReplica(options)); + assertEquals(4096, countSinkRows()); + } + + @Test + void testMySQL2PostgresIncrementalParallel() throws ParseException, IOException, SQLException { + String[] args = { + "--options-file", RESOURECE_DIR + REPLICADB_CONF_FILE, + "--source-connect", mysql.getJdbcUrl(), + "--source-user", mysql.getUsername(), + "--source-password", mysql.getPassword(), + "--sink-connect", postgres.getJdbcUrl(), + "--sink-user", postgres.getUsername(), + "--sink-password", postgres.getPassword(), + "--mode", ReplicationMode.INCREMENTAL.getModeText(), + "--jobs", "4" + }; + ToolOptions options = new ToolOptions(args); + assertEquals(0, ReplicaDB.processReplica(options)); + assertEquals(4096, countSinkRows()); + } +} \ No newline at end of file diff --git a/src/test/resources/postgres/pg-source.sql b/src/test/resources/postgres/pg-source.sql new file mode 100644 index 0000000..506b5a6 --- /dev/null +++ b/src/test/resources/postgres/pg-source.sql @@ -0,0 +1,107 @@ +CREATE OR REPLACE VIEW generate_series_16 +AS SELECT 0 generate_series UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL + SELECT 3 UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL + SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL + SELECT 9 UNION ALL SELECT 10 UNION ALL SELECT 11 UNION ALL + SELECT 12 UNION ALL SELECT 13 UNION ALL SELECT 14 UNION ALL + SELECT 15; + +CREATE OR REPLACE VIEW generate_series_256 +AS SELECT ( ( hi.generate_series << 4 ) | lo.generate_series ) AS generate_series + FROM generate_series_16 lo, generate_series_16 hi; + +CREATE OR REPLACE VIEW generate_series_4k +AS SELECT ( ( hi.generate_series << 8 ) | lo.generate_series ) AS generate_series + FROM generate_series_256 lo, generate_series_16 hi; + +create table t_source +( + /*Exact Numerics*/ + C_INTEGER serial, + C_SMALLINT smallint, + C_BIGINT bigint, + C_NUMERIC numeric, + C_DECIMAL decimal, + /*Approximate Numerics:*/ + C_REAL real, + C_DOUBLE_PRECISION double precision, + C_FLOAT float, + /*Binary Strings:*/ + C_BINARY BYTEA, + C_BINARY_VAR BYTEA, + C_BINARY_LOB BYTEA, + /*Boolean:*/ + C_BOOLEAN boolean, + /*Character Strings:*/ + C_CHARACTER CHAR(35), + C_CHARACTER_VAR VARCHAR(255), + C_CHARACTER_LOB TEXT, + C_NATIONAL_CHARACTER CHAR(35), + C_NATIONAL_CHARACTER_VAR VARCHAR(255), + /*Datetimes:*/ + C_DATE DATE, + C_TIME_WITHOUT_TIMEZONE time without time zone, + C_TIMESTAMP_WITHOUT_TIMEZONE TIMESTAMP without time zone, + C_TIME_WITH_TIMEZONE time with time zone, + C_TIMESTAMP_WITH_TIMEZONE TIMESTAMP with time zone, + /*Intervals:*/ + C_INTERVAL_DAY INTERVAL, + C_INTERVAL_YEAR INTERVAL, + /*Collection Types:*/ + C_ARRAY text[], + C_MULTISET text/*not supported*/, + /*Other Types:*/ + C_XML xml, + C_JSON jsonb, + PRIMARY KEY (C_INTEGER) +); + + +insert into t_source ( + /*C_INTEGER, auto incremented*/ + C_SMALLINT, + C_BIGINT, + C_NUMERIC, + C_DECIMAL, + C_REAL , + C_DOUBLE_PRECISION, + C_FLOAT, + C_BINARY, + C_BINARY_VAR, + C_BINARY_LOB, + C_BOOLEAN, + C_CHARACTER, + C_CHARACTER_VAR, + C_CHARACTER_LOB, + C_NATIONAL_CHARACTER, + C_NATIONAL_CHARACTER_VAR, + C_DATE, + C_TIME_WITHOUT_TIMEZONE, + C_TIMESTAMP_WITHOUT_TIMEZONE, + C_TIME_WITH_TIMEZONE , + C_TIMESTAMP_WITH_TIMEZONE +) +select + (generate_series * random())::integer as C_SMALLINT, + (generate_series * random())::bigint as C_BIGINT, + (generate_series * random())::numeric as C_NUMERIC, + (generate_series * random())::decimal as C_DECIMAL, + (generate_series * random())::real as C_REAL, + (generate_series * random())::double precision as C_DOUBLE_PRECISION, + (generate_series * random())::float as C_FLOAT, + E'\\xDEADBEEF' as C_BINARY, + E'\\xDEADBEEF' as C_BINARY_VAR, + E'\\xDEADBEEF' as C_BINARY_LOB, + TRUE, + MD5((generate_series * random())::text) as C_CHAR, + MD5((generate_series * random())::text) as C_VARCHAR, + MD5((generate_series * random())::text) as C_VARCHAR_LOB, + MD5((generate_series * random())::text) as C_NATIONAL_CHARACTER, + MD5((generate_series * random())::text) as C_NATIONAL_CHARACTER_VAR, + now() as C_DATE, + current_time as C_TIME_WITHOUT_TIMEZONE, + current_timestamp as C_TIMESTAMP_WITHOUT_TIMEZONE, + current_time at time zone 'cet' as C_TIME_WITH_TIMEZONE, + current_timestamp at time zone 'cet' as C_TIMESTAMP_WITH_TIMEZONE +from generate_series_4k; + diff --git a/src/test/resources/sinks/mysql-sink.sql b/src/test/resources/sinks/mysql-sink.sql new file mode 100644 index 0000000..4d2fae7 --- /dev/null +++ b/src/test/resources/sinks/mysql-sink.sql @@ -0,0 +1,41 @@ +create table t_sink +( + /*Exact Numerics*/ + C_INTEGER INTEGER , + C_SMALLINT SMALLINT, + C_BIGINT BIGINT, + C_NUMERIC NUMERIC(65, 30), + C_DECIMAL DECIMAL(65, 30), + /*Approximate Numerics:*/ + C_REAL REAL, + C_DOUBLE_PRECISION DOUBLE PRECISION, + C_FLOAT FLOAT, + /*Binary Strings:*/ + C_BINARY BINARY(35), + C_BINARY_VAR VARBINARY(255), + C_BINARY_LOB BLOB, + /*Boolean:*/ + C_BOOLEAN BOOLEAN, + /*Character Strings:*/ + C_CHARACTER CHAR(35), + C_CHARACTER_VAR VARCHAR(255), + C_CHARACTER_LOB TEXT, + C_NATIONAL_CHARACTER NATIONAL CHARACTER(35), + C_NATIONAL_CHARACTER_VAR NVARCHAR(255), + /*Datetimes:*/ + C_DATE DATE, + C_TIME_WITHOUT_TIMEZONE TIME, + C_TIMESTAMP_WITHOUT_TIMEZONE TIMESTAMP, + C_TIME_WITH_TIMEZONE TIME, + C_TIMESTAMP_WITH_TIMEZONE TIMESTAMP, + /*Intervals:*/ + C_INTERVAL_DAY TEXT /*not supported*/, + C_INTERVAL_YEAR TEXT/*not supported*/, + /*Collection Types:*/ + C_ARRAY TEXT/*not supported*/, + C_MULTISET TEXT/*not supported*/, + /*Other Types:*/ + C_XML TEXT/*not supported*/, + C_JSON TEXT/*not supported*/, + PRIMARY KEY (C_INTEGER) +);