diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java index 75f3564c6c6..6f36f4be830 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java @@ -65,6 +65,7 @@ import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ENCODE_VALUE_FIELD; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; public class MongoDBConnectorDeserializationSchema @@ -154,17 +155,6 @@ private SeaTunnelRow extractRowData(BsonDocument document) { return (SeaTunnelRow) physicalConverter.convert(document); } - private BsonDocument extractBsonDocument( - Struct value, @Nonnull Schema valueSchema, String fieldName) { - if (valueSchema.field(fieldName) != null) { - String docString = value.getString(fieldName); - if (docString != null) { - return BsonDocument.parse(docString); - } - } - return null; - } - // ------------------------------------------------------------------------------------- // Runtime Converters // ------------------------------------------------------------------------------------- diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java index d9aee5ef979..fa0931a8070 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java @@ -27,10 +27,13 @@ import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset; import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.bson.BsonDocument; +import org.bson.BsonInt64; +import org.bson.BsonString; import org.bson.BsonType; import org.bson.BsonValue; @@ -50,12 +53,21 @@ import java.util.stream.Collectors; import static org.apache.seatunnel.common.exception.CommonErrorCode.ILLEGAL_ARGUMENT; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE_INSERT; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_FIELD; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_TRUE; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SOURCE_FIELD; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.TS_MS_FIELD; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.BsonUtils.compareBsonValue; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.buildSourceRecord; +import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getDocumentKey; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.getResumeToken; import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils.createMongoClient; @@ -172,9 +184,27 @@ public void rewriteOutputBuffer( switch (OperationType.fromString(operationType)) { case INSERT: + outputBuffer.put(key, changeRecord); + break; case UPDATE: case REPLACE: - outputBuffer.put(key, changeRecord); + Schema valueSchema = changeRecord.valueSchema(); + BsonDocument fullDocument = + extractBsonDocument(value, valueSchema, FULL_DOCUMENT); + if (fullDocument == null) { + break; + } + BsonDocument valueDocument = normalizeSnapshotDocument(fullDocument, value); + SourceRecord record = + buildSourceRecord( + changeRecord.sourcePartition(), + changeRecord.sourceOffset(), + changeRecord.topic(), + changeRecord.kafkaPartition(), + changeRecord.keySchema(), + changeRecord.key(), + valueDocument); + outputBuffer.put(key, record); break; case DELETE: outputBuffer.remove(key); @@ -202,6 +232,30 @@ record -> { .collect(Collectors.toList()); } + private BsonDocument normalizeSnapshotDocument( + @Nonnull final BsonDocument fullDocument, Struct value) { + return new BsonDocument() + .append(ID_FIELD, new BsonString(value.getString(DOCUMENT_KEY))) + .append(OPERATION_TYPE, new BsonString(OPERATION_TYPE_INSERT)) + .append( + NS_FIELD, + new BsonDocument( + DB_FIELD, + new BsonString( + value.getStruct(NS_FIELD).getString(DB_FIELD))) + .append( + COLL_FIELD, + new BsonString( + value.getStruct(NS_FIELD).getString(COLL_FIELD)))) + .append(DOCUMENT_KEY, new BsonString(value.getString(DOCUMENT_KEY))) + .append(FULL_DOCUMENT, fullDocument) + .append(TS_MS_FIELD, new BsonInt64(value.getInt64(TS_MS_FIELD))) + .append( + SOURCE_FIELD, + new BsonDocument(SNAPSHOT_FIELD, new BsonString(SNAPSHOT_TRUE)) + .append(TS_MS_FIELD, new BsonInt64(0L))); + } + @Override public void close() { Runtime.getRuntime() diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java index c4d51c59e41..1e9ab577229 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/utils/MongodbRecordUtils.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils; import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -66,7 +67,18 @@ public static BsonDocument getResumeToken(SourceRecord sourceRecord) { public static BsonDocument getDocumentKey(@Nonnull SourceRecord sourceRecord) { Struct value = (Struct) sourceRecord.value(); - return BsonDocument.parse(value.getString(DOCUMENT_KEY)); + return extractBsonDocument(value, sourceRecord.valueSchema(), DOCUMENT_KEY); + } + + public static BsonDocument extractBsonDocument( + Struct value, @Nonnull Schema valueSchema, String fieldName) { + if (valueSchema.field(fieldName) != null) { + String docString = value.getString(fieldName); + if (docString != null) { + return BsonDocument.parse(docString); + } + } + return null; } public static String getOffsetValue(@Nonnull SourceRecord sourceRecord, String key) { @@ -139,6 +151,30 @@ public static String getOffsetValue(@Nonnull SourceRecord sourceRecord, String k valueSchemaAndValue.value()); } + public static @Nonnull SourceRecord buildSourceRecord( + Map sourcePartition, + Map sourceOffset, + String topicName, + Integer partition, + Schema keySchema, + Object key, + BsonDocument valueDocument) { + BsonValueToSchemaAndValue schemaAndValue = + new BsonValueToSchemaAndValue(new DefaultJson().getJsonWriterSettings()); + SchemaAndValue valueSchemaAndValue = + schemaAndValue.toSchemaAndValue(fromJson(OUTPUT_SCHEMA), valueDocument); + + return new SourceRecord( + sourcePartition, + sourceOffset, + topicName, + partition, + keySchema, + key, + valueSchemaAndValue.schema(), + valueSchemaAndValue.value()); + } + public static @Nonnull Map createSourceOffsetMap( @Nonnull BsonDocument idDocument, boolean isSnapshotRecord) { Map sourceOffset = new HashMap<>();