From f396dc30e395753b5ca1349eacffb2f2e14dc8b2 Mon Sep 17 00:00:00 2001 From: mxsm Date: Fri, 1 Dec 2023 06:45:51 +0000 Subject: [PATCH] 88445 --- .../jdbc/common/EnumeratedValue.java | 6 ++ .../jdbc/event/DataChangeEventType.java | 9 +++ .../jdbc/sink/config/JdbcSinkConfig.java | 6 +- .../sink/handle/DefaultSinkRecordHandler.java | 80 +++++++++++++++++-- .../jdbc/sink/handle/DialectAssemblyLine.java | 6 +- .../handle/GeneralDialectAssemblyLine.java | 2 +- .../dialect/cdc/mysql/MysqlCdcEngine.java | 3 +- .../connector/jdbc/type/mysql/YearType.java | 9 +++ .../connector/jdbc/utils/ByteArrayUtils.java | 62 ++++++++++++++ 9 files changed, 169 insertions(+), 14 deletions(-) create mode 100644 eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/common/EnumeratedValue.java create mode 100644 eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/utils/ByteArrayUtils.java diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/common/EnumeratedValue.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/common/EnumeratedValue.java new file mode 100644 index 0000000000..09ddff45ef --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/common/EnumeratedValue.java @@ -0,0 +1,6 @@ +package org.apache.eventmesh.connector.jdbc.common; + +public interface EnumeratedValue { + + T getValue(); +} diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/event/DataChangeEventType.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/event/DataChangeEventType.java index be79cb3ebf..e0f8e45be9 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/event/DataChangeEventType.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/event/DataChangeEventType.java @@ -26,6 +26,15 @@ public enum DataChangeEventType { this.code = code; } + public static DataChangeEventType parseFromCode(String code) { + for (DataChangeEventType type : DataChangeEventType.values()) { + if (type.code.equals(code)) { + return type; + } + } + throw new IllegalArgumentException("Unknown DataChangeEventType code: " + code); + } + public String ofCode() { return this.code; } diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/config/JdbcSinkConfig.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/config/JdbcSinkConfig.java index 2c31229ab5..91fdd7b1f6 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/config/JdbcSinkConfig.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/config/JdbcSinkConfig.java @@ -24,7 +24,11 @@ @Data @EqualsAndHashCode(callSuper = true) -public class JdbcSinkConfig extends SinkConfig { +public class JdbcSinkConfig extends SinkConfig { + + private boolean supportUpsert = true; + + private boolean supportDelete = true; public SinkConnectorConfig sinkConnectorConfig; diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/DefaultSinkRecordHandler.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/DefaultSinkRecordHandler.java index 240775e4f5..73adbe4257 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/DefaultSinkRecordHandler.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/DefaultSinkRecordHandler.java @@ -23,11 +23,16 @@ import org.apache.eventmesh.common.utils.LogUtils; import org.apache.eventmesh.connector.jdbc.DataChanges; +import org.apache.eventmesh.connector.jdbc.Field; import org.apache.eventmesh.connector.jdbc.JdbcConnectData; import org.apache.eventmesh.connector.jdbc.Payload; import org.apache.eventmesh.connector.jdbc.Schema; +import org.apache.eventmesh.connector.jdbc.common.EnumeratedValue; import org.apache.eventmesh.connector.jdbc.dialect.DatabaseDialect; +import org.apache.eventmesh.connector.jdbc.event.DataChangeEventType; +import org.apache.eventmesh.connector.jdbc.sink.config.JdbcSinkConfig; import org.apache.eventmesh.connector.jdbc.source.SourceMateData; +import org.apache.eventmesh.connector.jdbc.source.dialect.cdc.mysql.MysqlCdcEngine.CdcDmlType; import org.apache.eventmesh.connector.jdbc.type.Type; import org.hibernate.SessionFactory; import org.hibernate.StatelessSession; @@ -51,12 +56,15 @@ public class DefaultSinkRecordHandler implements SinkRecordHandler { private final StatelessSession session; - public DefaultSinkRecordHandler(DatabaseDialect eventMeshDialect, SessionFactory sessionFactory) { + private final JdbcSinkConfig jdbcSinkConfig; + + public DefaultSinkRecordHandler(DatabaseDialect eventMeshDialect, SessionFactory sessionFactory, JdbcSinkConfig jdbcSinkConfig) { this.eventMeshDialect = eventMeshDialect; this.sessionFactory = sessionFactory; this.hibernateDialect = sessionFactory.unwrap(SessionFactoryImplementor.class).getJdbcServices().getDialect(); this.session = this.sessionFactory.openStatelessSession(); this.dialectAssemblyLine = new GeneralDialectAssemblyLine(eventMeshDialect, hibernateDialect); + this.jdbcSinkConfig = jdbcSinkConfig; } /** @@ -75,11 +83,37 @@ public void handle(JdbcConnectData connectData) throws Exception { applyDatabseAndTableChanges(sql); } else if (connectData.isDataChanges()) { //do handle data changes - sql = this.dialectAssemblyLine.getInsertStatement(sourceMateData, connectData.getSchema(), payload.ofDdl()); + DataChangeEventType type = DataChangeEventType.parseFromCode(connectData.getPayload().getDataChanges().getType()); + switch (type) { + case INSERT: + sql = this.dialectAssemblyLine.getInsertStatement(sourceMateData, connectData.getSchema(), payload.ofDdl()); + insert(sql, connectData.getSchema(), payload.ofDataChanges()); + break; + case UPDATE:{ + if(jdbcSinkConfig.isSupportUpsert()){ + sql = this.dialectAssemblyLine.getUpsertStatement(sourceMateData, connectData.getSchema(), payload.ofDdl()); + }else{ + sql = this.dialectAssemblyLine.getUpdateStatement(sourceMateData, connectData.getSchema(), payload.ofDdl()); + //update(sql, connectData.getSchema(), payload.ofDataChanges()); + } + break; + } + case DELETE:{ + if (!jdbcSinkConfig.isSupportDelete()){ + break; + } + sql = this.dialectAssemblyLine.getDeleteStatement(sourceMateData, connectData.getSchema(), payload.ofDdl()); + //delete(sql, connectData.getSchema(), payload.ofDataChanges()); + break; + } + default: { + log.warn("Unknown data changes type: {}", connectData.getPayload().getDataChanges().getType()); + return; + } + } } else { log.warn("Unknown connect data type: {}", connectData.getType()); } - log.info("SQL={}", sql); } private void applyDatabseAndTableChanges(String sql){ @@ -96,16 +130,18 @@ private void applyDatabseAndTableChanges(String sql){ } - private void insert(String sql, Schema schema, DataChanges dataChanges) throws SQLException { + @SuppressWarnings("unchecked") + private void insert(String sql, Schema schema, DataChanges dataChanges) throws SQLException { final Transaction transaction = session.beginTransaction(); try { LogUtils.debug(log, "execute sql: {}",sql); - final NativeQuery query = session.createNativeQuery(sql, Object.class); + final NativeQuery query = session.createNativeQuery(sql); AtomicInteger index = new AtomicInteger(1); - Map dataChangesAfter = (Map) dataChanges.getAfter(); - schema.getFields().forEach(field -> { + Map dataChangesAfter = (Map) dataChanges.getAfter(); + Field after = schema.getFields().get(0); + after.getFields().forEach(field -> { Type type = eventMeshDialect.getType(field.getColumn()); - query.setParameter(index.getAndIncrement(), type.convert2DatabaseType(dataChangesAfter.get(field.getName()))); + query.setParameter(index.getAndIncrement(), type.convert2DatabaseType(dataChangesAfter.get(field.getField()))); }); final int result = query.executeUpdate(); @@ -120,4 +156,32 @@ private void insert(String sql, Schema schema, DataChanges dataChanges) throws S throw e; } } + + public enum DataHandleMode implements EnumeratedValue { + INSERT("insert"), + UPSERT("upsert"), + UPDATE("update"), + DELETE("delete"); + + private String value; + + DataHandleMode(String value) { + + this.value = value; + } + + public static DataHandleMode forValue(String value) { + for (DataHandleMode mode : DataHandleMode.values()) { + if (mode.getValue().equalsIgnoreCase(value)) { + return mode; + } + } + throw new IllegalArgumentException("No enum constant " + DataHandleMode.class.getName() + "." + value); + } + + @Override + public String getValue() { + return value; + } + } } \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/DialectAssemblyLine.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/DialectAssemblyLine.java index f60fedef69..6ffe0ef9c3 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/DialectAssemblyLine.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/DialectAssemblyLine.java @@ -27,10 +27,10 @@ public interface DialectAssemblyLine { String getInsertStatement(SourceMateData sourceMateData, Schema schema, String originStatement); - String getUpsertStatement(); + String getUpsertStatement(SourceMateData sourceMateData, Schema schema, String originStatement); - String getDeleteStatement(); + String getDeleteStatement(SourceMateData sourceMateData, Schema schema, String originStatement); - String getUpdateStatement(); + String getUpdateStatement(SourceMateData sourceMateData, Schema schema, String originStatement); } diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/GeneralDialectAssemblyLine.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/GeneralDialectAssemblyLine.java index 60fd46500d..423d23db77 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/GeneralDialectAssemblyLine.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/GeneralDialectAssemblyLine.java @@ -138,7 +138,7 @@ public String getInsertStatement(SourceMateData sourceMateData, Schema schema, S sqlAssembler.appendSqlSlice(" ("); // assemble columns Field afterField = afterFields.get(0); - List> columns = afterField.getFields().stream().map(item -> item.getColumn()).collect(Collectors.toList()); + List> columns = afterField.getFields().stream().map(item -> item.getColumn()).sorted(Comparator.comparingInt(Column::getOrder)).collect(Collectors.toList()); sqlAssembler.appendSqlSliceOfColumns(", ", columns, column -> column.getName()); sqlAssembler.appendSqlSlice(") VALUES ("); //assemble values diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java index c9c3129cff..52abd13b2c 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/cdc/mysql/MysqlCdcEngine.java @@ -607,7 +607,7 @@ private MysqlSourceMateData buildMysqlSourceMateData(MysqlJdbcContext context, E return sourceMateData; } - private enum CdcDmlType { + public enum CdcDmlType { INSERT, UPDATE, DELETE @@ -645,6 +645,7 @@ private void handleCdcDmlData(MysqlJdbcContext context, MysqlSourceMateData sour int columnsSize = orderColumnMap.size(); for (Pair, Pair> pair : rows) { GeneralDataChangeEvent dataEvent = buildEvent(type, tableId); + builder.withType(dataEvent.getDataChangeEventType().ofCode()); Payload payload = dataEvent.getJdbcConnectData().getPayload(); Schema schema = new Schema(); Pair beforePair = Optional.ofNullable(pair.getLeft()).orElse(new Pair<>()); diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/type/mysql/YearType.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/type/mysql/YearType.java index ef4026755d..ff6c56300a 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/type/mysql/YearType.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/type/mysql/YearType.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.connector.jdbc.type.mysql; +import java.time.LocalDate; import java.util.Arrays; import java.util.List; import org.apache.eventmesh.connector.jdbc.JdbcDriverMetaData; @@ -63,4 +64,12 @@ public String getTypeName(Column column) { public String getDefaultValue(DatabaseDialect databaseDialect, Column column) { return column.getDefaultValue() == null ? "NULL" : "'"+column.getDefaultValue()+"'"; } + + @Override + public Object convert2DatabaseType(Object value) { + if (value == null) { + return null; + } + return LocalDate.parse((String)value).getYear(); + } } diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/utils/ByteArrayUtils.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/utils/ByteArrayUtils.java new file mode 100644 index 0000000000..ad50e23e0b --- /dev/null +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/utils/ByteArrayUtils.java @@ -0,0 +1,62 @@ +package org.apache.eventmesh.connector.jdbc.utils; + +public class ByteArrayUtils { + private static final char[] HEX_CHARS = new char[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; + + /** + * Converts a byte array into a hexadecimal string. + * + * @param bytes the byte array to be converted + * @return the hexadecimal string representation of the byte array + * @throws NullPointerException if the byte array is null + */ + public static String bytesToHexString(byte[] bytes) { + if (bytes == null) { + throw new NullPointerException("Parameter to be converted can not be null"); + } + + char[] converted = new char[bytes.length * 2]; + for (int i = 0; i < bytes.length; i++) { + byte b = bytes[i]; + converted[i * 2] = HEX_CHARS[b >> 4 & 0x0F]; + converted[i * 2 + 1] = HEX_CHARS[b & 0x0F]; + } + + return String.valueOf(converted); + } + + + /** + * This method converts a hexadecimal string into an array of bytes. + * + * @param str the hexadecimal string to be converted + * @return the resulting byte array + * @throws IllegalArgumentException if the supplied character array contains an odd number of hex characters + */ + public static byte[] hexStringToBytes(String str) { + final char[] chars = str.toCharArray(); + if (chars.length % 2 != 0) { + throw new IllegalArgumentException("The supplied character array must contain an even number of hex chars."); + } + + byte[] response = new byte[chars.length / 2]; + + for (int i = 0; i < response.length; i++) { + int posOne = i * 2; + response[i] = (byte) (toByte(chars, posOne) << 4 | toByte(chars, posOne + 1)); + } + + return response; + } + + + private static byte toByte(final char[] chars, final int pos) { + int response = Character.digit(chars[pos], 16); + if (response < 0 || response > 15) { + throw new IllegalArgumentException("Non-hex character '" + chars[pos] + "' at index=" + pos); + } + + return (byte) response; + } + +}