diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/DefaultSinkRecordHandler.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/DefaultSinkRecordHandler.java index f1a7e47f0b..b4658e22c5 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/DefaultSinkRecordHandler.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/DefaultSinkRecordHandler.java @@ -120,7 +120,7 @@ private void dataChangesHandle(JdbcConnectData connectData, SourceMateData sourc sql = this.dialectAssemblyLine.getDeleteStatement(sourceMateData, connectData.getSchema(), payload.ofDdl()); delete(sql, connectData.getSchema(), payload.ofDataChanges()); } else { - LogUtils.warn(log, "No support for DELETE"); + log.warn("No support for DELETE"); } break; case NONE: @@ -155,7 +155,9 @@ private void applyDatabaseAndTableChanges(String sql) { private void insert(String sql, Schema schema, DataChanges dataChanges) throws SQLException { final Transaction transaction = session.beginTransaction(); try { - LogUtils.debug(log, "execute insert sql: {}", sql); + if (log.isDebugEnabled()) { + log.debug("execute insert sql: {}", sql); + } final NativeQuery query = session.createNativeQuery(sql); AtomicInteger index = new AtomicInteger(1); Map dataChangesAfter = (Map) dataChanges.getAfter(); @@ -176,7 +178,28 @@ private void insert(String sql, Schema schema, DataChanges dataChanges) throws S } private void update(String sql, Schema schema, DataChanges dataChanges) throws SQLException { - + final Transaction transaction = session.beginTransaction(); + try { + if (log.isDebugEnabled()) { + log.debug("execute update sql: {}", sql); + } + final NativeQuery query = session.createNativeQuery(sql); + AtomicInteger index = new AtomicInteger(1); + Map dataChangesAfter = (Map) dataChanges.getAfter(); + Field after = schema.getFields().get(0); + after.getFields().stream().map(field -> field.getColumn()).sorted(Comparator.comparingInt(Column::getOrder)).forEach(column -> { + Type type = eventMeshDialect.getType(column); + query.setParameter(index.getAndIncrement(), type.convert2DatabaseTypeValue(dataChangesAfter.get(column.getName()))); + }); + final int result = query.executeUpdate(); + if (result != 1) { + throw new SQLException("Failed to update row from table"); + } + transaction.commit(); + } catch (SQLException e) { + transaction.rollback(); + throw e; + } } private void upsert(String sql, Schema schema, DataChanges dataChanges) throws SQLException { @@ -194,7 +217,7 @@ private void delete(String sql, Schema schema, DataChanges dataChanges) throws S } final NativeQuery query = session.createNativeQuery(sql); AtomicInteger index = new AtomicInteger(1); - Map dataChangesAfter = (Map) dataChanges.getAfter(); + Map dataChangesAfter = (Map) dataChanges.getBefore(); final Map> columnMap = schema.getFields().get(0).getFields().stream().map(field -> field.getColumn()) .collect(Collectors.toMap(Column::getName, column -> column)); schema.getKeySet().stream().forEach(columnName -> { diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/GeneralDialectAssemblyLine.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/GeneralDialectAssemblyLine.java index ec7958ee55..703b1a3bd9 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/GeneralDialectAssemblyLine.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/handle/GeneralDialectAssemblyLine.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import org.hibernate.dialect.Dialect; @@ -143,19 +144,22 @@ public String getDeleteStatement(SourceMateData sourceMateData, Schema schema, S public String getUpdateStatement(SourceMateData sourceMateData, Schema schema, String originStatement) { final SqlStatementAssembler sqlStatementAssembler = new SqlStatementAssembler(); final TableId tableId = sourceMateData.ofTableId(); + //primary key set + final Set keySet = schema.getKeySet(); + Field tableColumns = schema.getFields().get(0); sqlStatementAssembler.appendSqlSlice("UPDATE "); sqlStatementAssembler.appendSqlSlice(((AbstractGeneralDatabaseDialect) databaseDialect).getQualifiedTableName(tableId)); sqlStatementAssembler.appendSqlSlice(" SET "); - sqlStatementAssembler.appendSqlSliceLists(", ", schema.getFields().stream().map(Field::getName).collect(Collectors.toList()), + sqlStatementAssembler.appendSqlSliceLists(", ", + tableColumns.getFields().stream().filter(field -> keySet.contains(field.getName())).map(Field::getName).collect(Collectors.toList()), (columnName) -> columnName + " =?"); if (schema.containsKey()) { sqlStatementAssembler.appendSqlSlice(" WHERE "); - sqlStatementAssembler.appendSqlSliceLists(" AND ", schema.getKeySet(), (columnName) -> columnName + " =?"); + sqlStatementAssembler.appendSqlSliceLists(" AND ", keySet, (columnName) -> columnName + " =?"); } else { - Field after = schema.getFields().get(0); sqlStatementAssembler.appendSqlSlice(" WHERE "); sqlStatementAssembler.appendSqlSliceOfColumns(" AND ", - after.getFields().stream().map(field -> field.getColumn()).sorted(Comparator.comparingInt(Column::getOrder)) + tableColumns.getFields().stream().map(field -> field.getColumn()).sorted(Comparator.comparingInt(Column::getOrder)) .collect(Collectors.toList()), column -> column.getName() + " =?"); } return sqlStatementAssembler.build(); diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/AbstractSnapshotEngine.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/AbstractSnapshotEngine.java index 4c5198c1f5..3a272dca19 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/AbstractSnapshotEngine.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/AbstractSnapshotEngine.java @@ -217,10 +217,10 @@ private void createDataEvents(Jc context, SnapshotContext snapshot private Callable createSnapshotDataEvent4TableCallable(Jc context, SnapshotContext snapshotContext, Queue connectionPool, String sql, TableId tableId) { UniversalJdbcContext universalJdbcContext = (UniversalJdbcContext) context; - universalJdbcContext.withTableId(tableId); + //universalJdbcContext.withTableId(tableId); return () -> { JdbcConnection connection = connectionPool.poll(); - SourceMateData sourceMateData = buildSourceMateData(context, snapshotContext); + SourceMateData sourceMateData = buildSourceMateData(context, snapshotContext,tableId); TableSchema tableSchema = universalJdbcContext.getCatalogTableSet().getTableSchema(tableId); Field field = new Field().withField("after").withName("payload.after").withRequired(false); List columns = tableSchema.getColumns(); @@ -277,9 +277,10 @@ private Queue createConnectionPool(final SnapshotContext snapshotContext); + protected abstract SourceMateData buildSourceMateData(Jc context, SnapshotContext snapshotContext,TableId tableId); /** * Pre-snapshot preparations. diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/mysql/MysqlSnapshotEngine.java b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/mysql/MysqlSnapshotEngine.java index e871bd8741..aa138a1bd8 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/mysql/MysqlSnapshotEngine.java +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/dialect/snapshot/mysql/MysqlSnapshotEngine.java @@ -89,14 +89,15 @@ public void close() throws Exception { * * @param context The context. * @param snapshotContext The snapshot context. + * @param tableId The table id * @return The source metadata. */ @Override - protected SourceMateData buildSourceMateData(MysqlJdbcContext context, SnapshotContext snapshotContext) { + protected SourceMateData buildSourceMateData(MysqlJdbcContext context, SnapshotContext snapshotContext,TableId tableId) { MysqlSourceMateData sourceMateData = MysqlSourceMateData.newBuilder() .name(sourceConnectorConfig.getName()) - .withTableId(context.ofCurrentTableId()) + .withTableId(tableId) .serverId(sourceConnectorConfig.getMysqlConfig().getServerId()) .snapshot(true) .position(context.getSourceInfo().getCurrentBinlogPosition()) diff --git a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/resources/server-config.yml b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/resources/server-config.yml index d9416e1ed2..0cd7b5b5ab 100644 --- a/eventmesh-connectors/eventmesh-connector-jdbc/src/main/resources/server-config.yml +++ b/eventmesh-connectors/eventmesh-connector-jdbc/src/main/resources/server-config.yml @@ -15,5 +15,5 @@ # limitations under the License. # -sourceEnable: false -sinkEnable: true +sourceEnable: true +sinkEnable: false