Skip to content

Commit

Permalink
add update and upsert logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Dec 11, 2023
1 parent 7c7eaf9 commit f010ed4
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ private void insert(String sql, Schema schema, DataChanges dataChanges) throws S
}
}

@SuppressWarnings("unchecked")
private void update(String sql, Schema schema, DataChanges dataChanges) throws SQLException {
final Transaction transaction = session.beginTransaction();
try {
Expand All @@ -189,13 +190,20 @@ private void update(String sql, Schema schema, DataChanges dataChanges) throws S
AtomicInteger index = new AtomicInteger(1);
Map<String, Object> dataChangesAfter = (Map<String, Object>) dataChanges.getAfter();
Field after = schema.getFields().get(0);
final Map<String, ? extends Column<?>> columnMap = after.getFields().stream().map(field -> field.getColumn())
.collect(Collectors.toMap(Column::getName, column -> column));
final Set<String> keySet = schema.getKeySet();
after.getFields().stream().map(field -> field.getColumn()).sorted(Comparator.comparingInt(Column::getOrder))
.filter(column -> !keySet.contains(column.getName())).forEach(column -> {
Type type = eventMeshDialect.getType(column);
query.setParameter(index.getAndIncrement(), type.convert2DatabaseTypeValue(dataChangesAfter.get(column.getName())));
});

schema.getKeySet().stream().forEach(key -> {
if (columnMap.containsKey(key)) {
Type type = eventMeshDialect.getType(columnMap.get(key));
query.setParameter(index.getAndIncrement(), type.convert2DatabaseTypeValue(dataChangesAfter.get(key)));
}
});
final int result = query.executeUpdate();
if (result != 1) {
throw new SQLException("Failed to update row from table");
Expand All @@ -207,8 +215,30 @@ private void update(String sql, Schema schema, DataChanges dataChanges) throws S
}
}

@SuppressWarnings("unchecked")
private void upsert(String sql, Schema schema, DataChanges dataChanges) throws SQLException {

final Transaction transaction = session.beginTransaction();
try {
if (log.isDebugEnabled()) {
log.debug("execute upsert sql: {}", sql);
}
final NativeQuery<?> query = session.createNativeQuery(sql);
AtomicInteger index = new AtomicInteger(1);
Map<String, Object> dataChangesAfter = (Map<String, Object>) dataChanges.getAfter();
Field after = schema.getFields().get(0);
after.getFields().stream().map(field -> field.getColumn()).sorted(Comparator.comparingInt(Column::getOrder)).forEach(column -> {
Type type = eventMeshDialect.getType(column);
query.setParameter(index.getAndIncrement(), type.convert2DatabaseTypeValue(dataChangesAfter.get(column.getName())));
});
final int result = query.executeUpdate();
if (result != 1) {
throw new SQLException("Failed to update row from table");
}
transaction.commit();
} catch (SQLException e) {
transaction.rollback();
throw e;
}
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import org.apache.eventmesh.connector.jdbc.utils.JdbcStringUtils;

import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import org.hibernate.dialect.Dialect;
Expand All @@ -51,22 +54,31 @@ public String getUpsertStatement(SourceMateData sourceMateData, Schema schema, S
final SqlStatementAssembler sqlStatementAssembler = new SqlStatementAssembler();
sqlStatementAssembler.appendSqlSlice(getInsertStatement(sourceMateData, schema, originStatement));
Field afterField = schema.getFields().get(0);
List<Column<?>> columns = afterField.getFields().stream().map(item -> item.getColumn()).sorted(Comparator.comparingInt(Column::getOrder))
.collect(Collectors.toList());
Set<String> keySet = Optional.ofNullable(schema.getKeySet()).orElse(new HashSet<>(0));
List<Column<?>> columns = afterField.getFields().stream().map(item -> item.getColumn()).filter(column -> !keySet.contains(column.getName()))
.sorted(Comparator.comparingInt(Column::getOrder)).collect(Collectors.toList());
if (JdbcStringUtils.compareVersion(getDatabaseDialect().getJdbcDriverMetaData().getDatabaseProductVersion(), "8.0.20") >= 0) {
//mysql doc:https://dev.mysql.com/doc/refman/8.0/en/insert-on-duplicate.html
//Beginning with MySQL 8.0.20, an INSERT ... SELECT ... ON DUPLICATE KEY UPDATE statement that uses VALUES() in the UPDATE clause
sqlStatementAssembler.appendSqlSlice("AS new ON DUPLICATE KEY UPDATE ");
sqlStatementAssembler.appendSqlSliceOfColumns(",", columns, (column) -> {
final String columnName = column.getName();
return columnName + "=new." + columnName;
});
if(schema.containsKey()){
sqlStatementAssembler.appendSqlSliceLists(",", schema.getKeySet(), columnName -> columnName + "=new." + columnName);
}else {
sqlStatementAssembler.appendSqlSliceOfColumns(",", columns, column -> {
final String columnName = column.getName();
return columnName + "=new." + columnName;
});
}
} else {
sqlStatementAssembler.appendSqlSlice(" ON DUPLICATE KEY UPDATE ");
sqlStatementAssembler.appendSqlSliceOfColumns(",", columns, column -> {
final String columnName = column.getName();
return columnName + "=VALUES(" + columnName + ")";
});
if(schema.containsKey()){
sqlStatementAssembler.appendSqlSliceLists(",", schema.getKeySet(), columnName -> columnName + "=VALUES(" + columnName + ")");
}else {
sqlStatementAssembler.appendSqlSliceOfColumns(",", columns, column -> {
final String columnName = column.getName();
return columnName + "=VALUES(" + columnName + ")";
});
}
}
return sqlStatementAssembler.build();
}
Expand Down

0 comments on commit f010ed4

Please sign in to comment.