Skip to content

Commit

Permalink
[skip travis] Mysql complete sink mode
Browse files Browse the repository at this point in the history
  • Loading branch information
osalvador committed Jun 16, 2021
1 parent cd84800 commit fa81af1
Show file tree
Hide file tree
Showing 5 changed files with 528 additions and 6 deletions.
9 changes: 9 additions & 0 deletions src/main/java/org/replicadb/manager/ManagerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import org.replicadb.manager.cdc.OracleManagerCDC;
import org.replicadb.manager.cdc.SQLServerManagerCDC;

import java.util.Properties;

import static org.replicadb.manager.SupportedManagers.*;

/**
Expand Down Expand Up @@ -71,6 +73,13 @@ public ConnManager accept(ToolOptions options, DataSourceType dsType) {
} else if (S3.isTheManagerTypeOf(options, dsType)) {
return new S3Manager(options, dsType);
} else if (MYSQL.isTheManagerTypeOf(options, dsType)) {
// In MySQL this properties are required
if (dsType.equals(DataSourceType.SINK)){
Properties mysqlProps = new Properties();
mysqlProps.setProperty("characterEncoding", "UTF-8");
mysqlProps.setProperty("allowLoadLocalInfile", "true");
options.setSinkConnectionParams(mysqlProps);
}
return new MySQLManager(options, dsType);
} else {
throw new IllegalArgumentException("The database with scheme "+scheme+" is not supported by ReplicaDB");
Expand Down
150 changes: 144 additions & 6 deletions src/main/java/org/replicadb/manager/MySQLManager.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.replicadb.manager;

import com.mysql.cj.jdbc.JdbcPreparedStatement;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.replicadb.cli.ReplicationMode;
import org.replicadb.cli.ToolOptions;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.*;

public class MySQLManager extends SqlManager {

Expand All @@ -30,13 +33,148 @@ public String getDriverClass() {
}

@Override
public int insertDataToTable(ResultSet resultSet, int taskId) throws SQLException {
throw new UnsupportedOperationException("MySQL does not yet support data insertion");
public int insertDataToTable(ResultSet resultSet, int taskId) throws SQLException, IOException {

try {

ResultSetMetaData rsmd = resultSet.getMetaData();
String tableName;

// Get table name and columns
if (options.getMode().equals(ReplicationMode.COMPLETE.getModeText())) {
tableName = getSinkTableName();
} else {
tableName = getQualifiedStagingTableName();
}

String allColumns = getAllSinkColumns(rsmd);

// Get MySQL LOAD DATA manager
String loadDataSql = getLoadDataSql(tableName, allColumns);
PreparedStatement statement = this.connection.prepareStatement(loadDataSql);
JdbcPreparedStatement mysqlStatement = statement.unwrap(com.mysql.cj.jdbc.JdbcPreparedStatement.class);

char unitSeparator = 0x1F;
int columnsNumber = rsmd.getColumnCount();

StringBuilder row = new StringBuilder();
StringBuilder cols = new StringBuilder();

byte[] bytes = "".getBytes();
String colValue;
int rowCounts = 0;
int batchSize = options.getFetchSize();

if (resultSet.next()) {
// Create Bandwidth Throttling
bandwidthThrottlingCreate(resultSet, rsmd);

do {
bandwidthThrottlingAcquiere();

// Get Columns values
for (int i = 1; i <= columnsNumber; i++) {
if (i > 1) cols.append(unitSeparator);

switch (rsmd.getColumnType(i)) {

case Types.CLOB:
colValue = clobToString(resultSet.getClob(i));
break;
//case Types.BINARY:
case Types.BLOB:
//colValue = blobToPostgresHex(resultSet.getBlob(i));
// TODO revisar los BLOB y CLOB
colValue = "";
break;
default:
colValue = resultSet.getString(i);
break;
// TODO revisar los timestamp con timezone
}

if (!resultSet.wasNull() || colValue != null) cols.append(colValue);
}

// Escape special chars
if (this.options.isSinkDisableEscape())
row.append(cols);
else
row.append(cols.toString().replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r").replace("\u0000", ""));

// Row ends with \n
row.append("\n");

// Copy data to mysql
bytes = row.toString().getBytes(StandardCharsets.UTF_8);

if (++rowCounts % batchSize == 0) {
mysqlStatement.setLocalInfileInputStream(new ByteArrayInputStream(bytes));
mysqlStatement.executeUpdate();

// Clear StringBuilders
row.setLength(0); // set length of buffer to 0
row.trimToSize();
rowCounts = 0;
}

// Clear StringBuilders
cols.setLength(0); // set length of buffer to 0
cols.trimToSize();
} while (resultSet.next());
}

// insert remaining records
if (rowCounts != 0) {
mysqlStatement.setLocalInfileInputStream(new ByteArrayInputStream(bytes));
mysqlStatement.executeUpdate();
}

} catch (Exception e) {
this.connection.rollback();
throw e;
}

this.getConnection().commit();
return 0;
}

private String getLoadDataSql(String tableName, String allColumns) {
StringBuilder loadDataSql = new StringBuilder();
loadDataSql.append("LOAD DATA LOCAL INFILE 'dummy' INTO TABLE ");
loadDataSql.append(tableName);
loadDataSql.append(" CHARACTER SET UTF8 FIELDS TERMINATED BY X'1F' ");
if (allColumns != null) {
loadDataSql.append(" (");
loadDataSql.append(allColumns);
loadDataSql.append(")");
}

LOG.info("Loading data with this command: {}", loadDataSql);
return loadDataSql.toString();
}

@Override
protected void createStagingTable() throws SQLException {
throw new UnsupportedOperationException("MySQL does not yet support data insertion");
Statement statement = this.getConnection().createStatement();
String sinkStagingTable = getQualifiedStagingTableName();

// Get sink columns.
String allSinkColumns = null;
if (this.options.getSinkColumns() != null && !this.options.getSinkColumns().isEmpty()) {
allSinkColumns = this.options.getSinkColumns();
} else if (this.options.getSourceColumns() != null && !this.options.getSourceColumns().isEmpty()) {
allSinkColumns = this.options.getSourceColumns();
} else {
allSinkColumns = "*";
}

String sql = " CREATE TABLE " + sinkStagingTable + " AS (SELECT " + allSinkColumns + " FROM " + this.getSinkTableName() + " WHERE 1 = 0 ) ";

LOG.info("Creating staging table with this command: " + sql);
statement.executeUpdate(sql);
statement.close();
this.getConnection().commit();
}

@Override
Expand Down
Loading

0 comments on commit fa81af1

Please sign in to comment.