Skip to content

Commit

Permalink
[Feature][Transform-v2] Add metadata transform
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Oct 29, 2024
1 parent 37dfc00 commit ddafc8f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ public static Long getDelay(SeaTunnelRow row) {
}

public static String getDatabase(SeaTunnelRow row) {
TablePath tablePath = TablePath.of(row.getTableId());
TablePath tablePath =
row.getTableId() == null ? TablePath.DEFAULT : TablePath.of(row.getTableId());
return tablePath.getDatabaseName();
}

public static String getTable(SeaTunnelRow row) {
TablePath tablePath = TablePath.of(row.getTableId());
TablePath tablePath =
row.getTableId() == null ? TablePath.DEFAULT : TablePath.of(row.getTableId());
return tablePath.getTableName();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;

Expand Down Expand Up @@ -105,18 +107,27 @@ public void deserialize(@Nonnull SourceRecord record, Collector<SeaTunnelRow> ou
log.debug("Ignore newly added table {}", tableId);
return;
}

Long fetchTimestamp = SourceRecordUtils.getFetchTimestamp(record);
Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(record);
long delay = -1L;
if (fetchTimestamp != null && messageTimestamp != null) {
delay = fetchTimestamp - messageTimestamp;
}
switch (op) {
case INSERT:
SeaTunnelRow insert = extractRowData(tableRowConverter, fullDocument);
insert.setRowKind(RowKind.INSERT);
insert.setTableId(tableId);
MetadataUtil.setDelay(insert, delay);
MetadataUtil.setEventTime(insert, fetchTimestamp);
emit(record, insert, out);
break;
case DELETE:
SeaTunnelRow delete = extractRowData(tableRowConverter, documentKey);
delete.setRowKind(RowKind.DELETE);
delete.setTableId(tableId);
MetadataUtil.setDelay(delete, delay);
MetadataUtil.setEventTime(delete, fetchTimestamp);
emit(record, delete, out);
break;
case UPDATE:
Expand All @@ -126,12 +137,16 @@ public void deserialize(@Nonnull SourceRecord record, Collector<SeaTunnelRow> ou
SeaTunnelRow updateAfter = extractRowData(tableRowConverter, fullDocument);
updateAfter.setRowKind(RowKind.UPDATE_AFTER);
updateAfter.setTableId(tableId);
MetadataUtil.setDelay(updateAfter, delay);
MetadataUtil.setEventTime(updateAfter, fetchTimestamp);
emit(record, updateAfter, out);
break;
case REPLACE:
SeaTunnelRow replaceAfter = extractRowData(tableRowConverter, fullDocument);
replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
replaceAfter.setTableId(tableId);
MetadataUtil.setDelay(replaceAfter, delay);
MetadataUtil.setEventTime(replaceAfter, fetchTimestamp);
emit(record, replaceAfter, out);
break;
case INVALIDATE:
Expand Down

0 comments on commit ddafc8f

Please sign in to comment.