Skip to content

Commit

Permalink
88445
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Dec 1, 2023
1 parent ab4a5da commit f396dc3
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.apache.eventmesh.connector.jdbc.common;

public interface EnumeratedValue<T> {

T getValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ public enum DataChangeEventType {
this.code = code;
}

public static DataChangeEventType parseFromCode(String code) {
for (DataChangeEventType type : DataChangeEventType.values()) {
if (type.code.equals(code)) {
return type;
}
}
throw new IllegalArgumentException("Unknown DataChangeEventType code: " + code);
}

public String ofCode() {
return this.code;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@

@Data
@EqualsAndHashCode(callSuper = true)
public class JdbcSinkConfig extends SinkConfig {
public class JdbcSinkConfig extends SinkConfig {

private boolean supportUpsert = true;

private boolean supportDelete = true;

public SinkConnectorConfig sinkConnectorConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@

import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.connector.jdbc.DataChanges;
import org.apache.eventmesh.connector.jdbc.Field;
import org.apache.eventmesh.connector.jdbc.JdbcConnectData;
import org.apache.eventmesh.connector.jdbc.Payload;
import org.apache.eventmesh.connector.jdbc.Schema;
import org.apache.eventmesh.connector.jdbc.common.EnumeratedValue;
import org.apache.eventmesh.connector.jdbc.dialect.DatabaseDialect;
import org.apache.eventmesh.connector.jdbc.event.DataChangeEventType;
import org.apache.eventmesh.connector.jdbc.sink.config.JdbcSinkConfig;
import org.apache.eventmesh.connector.jdbc.source.SourceMateData;
import org.apache.eventmesh.connector.jdbc.source.dialect.cdc.mysql.MysqlCdcEngine.CdcDmlType;
import org.apache.eventmesh.connector.jdbc.type.Type;
import org.hibernate.SessionFactory;
import org.hibernate.StatelessSession;
Expand All @@ -51,12 +56,15 @@ public class DefaultSinkRecordHandler implements SinkRecordHandler {

private final StatelessSession session;

public DefaultSinkRecordHandler(DatabaseDialect<?> eventMeshDialect, SessionFactory sessionFactory) {
private final JdbcSinkConfig jdbcSinkConfig;

public DefaultSinkRecordHandler(DatabaseDialect<?> eventMeshDialect, SessionFactory sessionFactory, JdbcSinkConfig jdbcSinkConfig) {
this.eventMeshDialect = eventMeshDialect;
this.sessionFactory = sessionFactory;
this.hibernateDialect = sessionFactory.unwrap(SessionFactoryImplementor.class).getJdbcServices().getDialect();
this.session = this.sessionFactory.openStatelessSession();
this.dialectAssemblyLine = new GeneralDialectAssemblyLine(eventMeshDialect, hibernateDialect);
this.jdbcSinkConfig = jdbcSinkConfig;
}

/**
Expand All @@ -75,11 +83,37 @@ public void handle(JdbcConnectData connectData) throws Exception {
applyDatabseAndTableChanges(sql);
} else if (connectData.isDataChanges()) {
//do handle data changes
sql = this.dialectAssemblyLine.getInsertStatement(sourceMateData, connectData.getSchema(), payload.ofDdl());
DataChangeEventType type = DataChangeEventType.parseFromCode(connectData.getPayload().getDataChanges().getType());
switch (type) {
case INSERT:
sql = this.dialectAssemblyLine.getInsertStatement(sourceMateData, connectData.getSchema(), payload.ofDdl());
insert(sql, connectData.getSchema(), payload.ofDataChanges());
break;
case UPDATE:{
if(jdbcSinkConfig.isSupportUpsert()){
sql = this.dialectAssemblyLine.getUpsertStatement(sourceMateData, connectData.getSchema(), payload.ofDdl());
}else{
sql = this.dialectAssemblyLine.getUpdateStatement(sourceMateData, connectData.getSchema(), payload.ofDdl());
//update(sql, connectData.getSchema(), payload.ofDataChanges());
}
break;
}
case DELETE:{
if (!jdbcSinkConfig.isSupportDelete()){
break;
}
sql = this.dialectAssemblyLine.getDeleteStatement(sourceMateData, connectData.getSchema(), payload.ofDdl());
//delete(sql, connectData.getSchema(), payload.ofDataChanges());
break;
}
default: {
log.warn("Unknown data changes type: {}", connectData.getPayload().getDataChanges().getType());
return;
}
}
} else {
log.warn("Unknown connect data type: {}", connectData.getType());
}
log.info("SQL={}", sql);
}

private void applyDatabseAndTableChanges(String sql){
Expand All @@ -96,16 +130,18 @@ private void applyDatabseAndTableChanges(String sql){

}

private void insert(String sql, Schema schema, DataChanges dataChanges) throws SQLException {
@SuppressWarnings("unchecked")
private void insert(String sql, Schema schema, DataChanges dataChanges) throws SQLException {
final Transaction transaction = session.beginTransaction();
try {
LogUtils.debug(log, "execute sql: {}",sql);
final NativeQuery<?> query = session.createNativeQuery(sql, Object.class);
final NativeQuery<?> query = session.createNativeQuery(sql);
AtomicInteger index = new AtomicInteger(1);
Map<String, String> dataChangesAfter = (Map<String, String>) dataChanges.getAfter();
schema.getFields().forEach(field -> {
Map<String, Object> dataChangesAfter = (Map<String, Object>) dataChanges.getAfter();
Field after = schema.getFields().get(0);
after.getFields().forEach(field -> {
Type type = eventMeshDialect.getType(field.getColumn());
query.setParameter(index.getAndIncrement(), type.convert2DatabaseType(dataChangesAfter.get(field.getName())));
query.setParameter(index.getAndIncrement(), type.convert2DatabaseType(dataChangesAfter.get(field.getField())));
});

final int result = query.executeUpdate();
Expand All @@ -120,4 +156,32 @@ private void insert(String sql, Schema schema, DataChanges dataChanges) throws S
throw e;
}
}

public enum DataHandleMode implements EnumeratedValue<String> {
INSERT("insert"),
UPSERT("upsert"),
UPDATE("update"),
DELETE("delete");

private String value;

DataHandleMode(String value) {

this.value = value;
}

public static DataHandleMode forValue(String value) {
for (DataHandleMode mode : DataHandleMode.values()) {
if (mode.getValue().equalsIgnoreCase(value)) {
return mode;
}
}
throw new IllegalArgumentException("No enum constant " + DataHandleMode.class.getName() + "." + value);
}

@Override
public String getValue() {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ public interface DialectAssemblyLine {

String getInsertStatement(SourceMateData sourceMateData, Schema schema, String originStatement);

String getUpsertStatement();
String getUpsertStatement(SourceMateData sourceMateData, Schema schema, String originStatement);

String getDeleteStatement();
String getDeleteStatement(SourceMateData sourceMateData, Schema schema, String originStatement);

String getUpdateStatement();
String getUpdateStatement(SourceMateData sourceMateData, Schema schema, String originStatement);

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public String getInsertStatement(SourceMateData sourceMateData, Schema schema, S
sqlAssembler.appendSqlSlice(" (");
// assemble columns
Field afterField = afterFields.get(0);
List<Column<?>> columns = afterField.getFields().stream().map(item -> item.getColumn()).collect(Collectors.toList());
List<Column<?>> columns = afterField.getFields().stream().map(item -> item.getColumn()).sorted(Comparator.comparingInt(Column::getOrder)).collect(Collectors.toList());
sqlAssembler.appendSqlSliceOfColumns(", ", columns, column -> column.getName());
sqlAssembler.appendSqlSlice(") VALUES (");
//assemble values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ private MysqlSourceMateData buildMysqlSourceMateData(MysqlJdbcContext context, E
return sourceMateData;
}

private enum CdcDmlType {
public enum CdcDmlType {
INSERT,
UPDATE,
DELETE
Expand Down Expand Up @@ -645,6 +645,7 @@ private void handleCdcDmlData(MysqlJdbcContext context, MysqlSourceMateData sour
int columnsSize = orderColumnMap.size();
for (Pair<Pair<Serializable[], BitSet>, Pair<Serializable[], BitSet>> pair : rows) {
GeneralDataChangeEvent dataEvent = buildEvent(type, tableId);
builder.withType(dataEvent.getDataChangeEventType().ofCode());
Payload payload = dataEvent.getJdbcConnectData().getPayload();
Schema schema = new Schema();
Pair<Serializable[], BitSet> beforePair = Optional.ofNullable(pair.getLeft()).orElse(new Pair<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.eventmesh.connector.jdbc.type.mysql;

import java.time.LocalDate;
import java.util.Arrays;
import java.util.List;
import org.apache.eventmesh.connector.jdbc.JdbcDriverMetaData;
Expand Down Expand Up @@ -63,4 +64,12 @@ public String getTypeName(Column<?> column) {
public String getDefaultValue(DatabaseDialect<?> databaseDialect, Column<?> column) {
return column.getDefaultValue() == null ? "NULL" : "'"+column.getDefaultValue()+"'";
}

@Override
public Object convert2DatabaseType(Object value) {
if (value == null) {
return null;
}
return LocalDate.parse((String)value).getYear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.apache.eventmesh.connector.jdbc.utils;

public class ByteArrayUtils {
private static final char[] HEX_CHARS = new char[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};

/**
* Converts a byte array into a hexadecimal string.
*
* @param bytes the byte array to be converted
* @return the hexadecimal string representation of the byte array
* @throws NullPointerException if the byte array is null
*/
public static String bytesToHexString(byte[] bytes) {
if (bytes == null) {
throw new NullPointerException("Parameter to be converted can not be null");
}

char[] converted = new char[bytes.length * 2];
for (int i = 0; i < bytes.length; i++) {
byte b = bytes[i];
converted[i * 2] = HEX_CHARS[b >> 4 & 0x0F];
converted[i * 2 + 1] = HEX_CHARS[b & 0x0F];
}

return String.valueOf(converted);
}


/**
* This method converts a hexadecimal string into an array of bytes.
*
* @param str the hexadecimal string to be converted
* @return the resulting byte array
* @throws IllegalArgumentException if the supplied character array contains an odd number of hex characters
*/
public static byte[] hexStringToBytes(String str) {
final char[] chars = str.toCharArray();
if (chars.length % 2 != 0) {
throw new IllegalArgumentException("The supplied character array must contain an even number of hex chars.");
}

byte[] response = new byte[chars.length / 2];

for (int i = 0; i < response.length; i++) {
int posOne = i * 2;
response[i] = (byte) (toByte(chars, posOne) << 4 | toByte(chars, posOne + 1));
}

return response;
}


private static byte toByte(final char[] chars, final int pos) {
int response = Character.digit(chars[pos], 16);
if (response < 0 || response > 15) {
throw new IllegalArgumentException("Non-hex character '" + chars[pos] + "' at index=" + pos);
}

return (byte) response;
}

}

0 comments on commit f396dc3

Please sign in to comment.