Skip to content

Commit

Permalink
fix some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Dec 10, 2023
1 parent 25c9401 commit 694e090
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private void dataChangesHandle(JdbcConnectData connectData, SourceMateData sourc
sql = this.dialectAssemblyLine.getDeleteStatement(sourceMateData, connectData.getSchema(), payload.ofDdl());
delete(sql, connectData.getSchema(), payload.ofDataChanges());
} else {
LogUtils.warn(log, "No support for DELETE");
log.warn("No support for DELETE");
}
break;
case NONE:
Expand Down Expand Up @@ -155,7 +155,9 @@ private void applyDatabaseAndTableChanges(String sql) {
private void insert(String sql, Schema schema, DataChanges dataChanges) throws SQLException {
final Transaction transaction = session.beginTransaction();
try {
LogUtils.debug(log, "execute insert sql: {}", sql);
if (log.isDebugEnabled()) {
log.debug("execute insert sql: {}", sql);
}
final NativeQuery<?> query = session.createNativeQuery(sql);
AtomicInteger index = new AtomicInteger(1);
Map<String, Object> dataChangesAfter = (Map<String, Object>) dataChanges.getAfter();
Expand All @@ -176,7 +178,28 @@ private void insert(String sql, Schema schema, DataChanges dataChanges) throws S
}

private void update(String sql, Schema schema, DataChanges dataChanges) throws SQLException {

final Transaction transaction = session.beginTransaction();
try {
if (log.isDebugEnabled()) {
log.debug("execute update sql: {}", sql);
}
final NativeQuery<?> query = session.createNativeQuery(sql);
AtomicInteger index = new AtomicInteger(1);
Map<String, Object> dataChangesAfter = (Map<String, Object>) dataChanges.getAfter();
Field after = schema.getFields().get(0);
after.getFields().stream().map(field -> field.getColumn()).sorted(Comparator.comparingInt(Column::getOrder)).forEach(column -> {
Type type = eventMeshDialect.getType(column);
query.setParameter(index.getAndIncrement(), type.convert2DatabaseTypeValue(dataChangesAfter.get(column.getName())));
});
final int result = query.executeUpdate();
if (result != 1) {
throw new SQLException("Failed to update row from table");
}
transaction.commit();
} catch (SQLException e) {
transaction.rollback();
throw e;
}
}

private void upsert(String sql, Schema schema, DataChanges dataChanges) throws SQLException {
Expand All @@ -194,7 +217,7 @@ private void delete(String sql, Schema schema, DataChanges dataChanges) throws S
}
final NativeQuery<?> query = session.createNativeQuery(sql);
AtomicInteger index = new AtomicInteger(1);
Map<String, Object> dataChangesAfter = (Map<String, Object>) dataChanges.getAfter();
Map<String, Object> dataChangesAfter = (Map<String, Object>) dataChanges.getBefore();
final Map<String, ? extends Column<?>> columnMap = schema.getFields().get(0).getFields().stream().map(field -> field.getColumn())
.collect(Collectors.toMap(Column::getName, column -> column));
schema.getKeySet().stream().forEach(columnName -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import org.hibernate.dialect.Dialect;
Expand Down Expand Up @@ -143,19 +144,22 @@ public String getDeleteStatement(SourceMateData sourceMateData, Schema schema, S
public String getUpdateStatement(SourceMateData sourceMateData, Schema schema, String originStatement) {
final SqlStatementAssembler sqlStatementAssembler = new SqlStatementAssembler();
final TableId tableId = sourceMateData.ofTableId();
//primary key set
final Set<String> keySet = schema.getKeySet();
Field tableColumns = schema.getFields().get(0);
sqlStatementAssembler.appendSqlSlice("UPDATE ");
sqlStatementAssembler.appendSqlSlice(((AbstractGeneralDatabaseDialect<?, ?>) databaseDialect).getQualifiedTableName(tableId));
sqlStatementAssembler.appendSqlSlice(" SET ");
sqlStatementAssembler.appendSqlSliceLists(", ", schema.getFields().stream().map(Field::getName).collect(Collectors.toList()),
sqlStatementAssembler.appendSqlSliceLists(", ",
tableColumns.getFields().stream().filter(field -> keySet.contains(field.getName())).map(Field::getName).collect(Collectors.toList()),
(columnName) -> columnName + " =?");
if (schema.containsKey()) {
sqlStatementAssembler.appendSqlSlice(" WHERE ");
sqlStatementAssembler.appendSqlSliceLists(" AND ", schema.getKeySet(), (columnName) -> columnName + " =?");
sqlStatementAssembler.appendSqlSliceLists(" AND ", keySet, (columnName) -> columnName + " =?");
} else {
Field after = schema.getFields().get(0);
sqlStatementAssembler.appendSqlSlice(" WHERE ");
sqlStatementAssembler.appendSqlSliceOfColumns(" AND ",
after.getFields().stream().map(field -> field.getColumn()).sorted(Comparator.comparingInt(Column::getOrder))
tableColumns.getFields().stream().map(field -> field.getColumn()).sorted(Comparator.comparingInt(Column::getOrder))
.collect(Collectors.toList()), column -> column.getName() + " =?");
}
return sqlStatementAssembler.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ private void createDataEvents(Jc context, SnapshotContext<Part, Offset> snapshot
private Callable<Void> createSnapshotDataEvent4TableCallable(Jc context, SnapshotContext<Part, Offset> snapshotContext,
Queue<JdbcConnection> connectionPool, String sql, TableId tableId) {
UniversalJdbcContext<?, ?, ?> universalJdbcContext = (UniversalJdbcContext<?, ?, ?>) context;
universalJdbcContext.withTableId(tableId);
//universalJdbcContext.withTableId(tableId);
return () -> {
JdbcConnection connection = connectionPool.poll();
SourceMateData sourceMateData = buildSourceMateData(context, snapshotContext);
SourceMateData sourceMateData = buildSourceMateData(context, snapshotContext,tableId);
TableSchema tableSchema = universalJdbcContext.getCatalogTableSet().getTableSchema(tableId);
Field field = new Field().withField("after").withName("payload.after").withRequired(false);
List<? extends Column> columns = tableSchema.getColumns();
Expand Down Expand Up @@ -277,9 +277,10 @@ private Queue<JdbcConnection> createConnectionPool(final SnapshotContext<Part, O
*
* @param context The context.
* @param snapshotContext The snapshot context.
* @param tableId The table id
* @return The source metadata.
*/
protected abstract SourceMateData buildSourceMateData(Jc context, SnapshotContext<Part, Offset> snapshotContext);
protected abstract SourceMateData buildSourceMateData(Jc context, SnapshotContext<Part, Offset> snapshotContext,TableId tableId);

/**
* Pre-snapshot preparations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,15 @@ public void close() throws Exception {
*
* @param context The context.
* @param snapshotContext The snapshot context.
* @param tableId The table id
* @return The source metadata.
*/
@Override
protected SourceMateData buildSourceMateData(MysqlJdbcContext context, SnapshotContext<MysqlPartition, MysqlOffsetContext> snapshotContext) {
protected SourceMateData buildSourceMateData(MysqlJdbcContext context, SnapshotContext<MysqlPartition, MysqlOffsetContext> snapshotContext,TableId tableId) {

MysqlSourceMateData sourceMateData = MysqlSourceMateData.newBuilder()
.name(sourceConnectorConfig.getName())
.withTableId(context.ofCurrentTableId())
.withTableId(tableId)
.serverId(sourceConnectorConfig.getMysqlConfig().getServerId())
.snapshot(true)
.position(context.getSourceInfo().getCurrentBinlogPosition())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
# limitations under the License.
#

sourceEnable: false
sinkEnable: true
sourceEnable: true
sinkEnable: false

0 comments on commit 694e090

Please sign in to comment.