diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java index 4cad739ac6a..338cb657b3b 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -171,69 +171,7 @@ private Iterator splitNormalStream(List batchEve * checkpoint-after] [a, b, c, d, e] */ Iterator splitSchemaChangeStream(List batchEvents) { - List sourceRecordsSet = new ArrayList<>(); - - List sourceRecordList = new ArrayList<>(); - SourceRecord previousRecord = null; - for (int i = 0; i < batchEvents.size(); i++) { - DataChangeEvent event = batchEvents.get(i); - SourceRecord currentRecord = event.getRecord(); - if (!shouldEmit(currentRecord)) { - continue; - } - - if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) { - if (!schemaChangeResolver.support(currentRecord)) { - continue; - } - - if (previousRecord == null) { - // add schema-change-before to first - sourceRecordList.add( - WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord)); - sourceRecordsSet.add(new SourceRecords(sourceRecordList)); - sourceRecordList = new ArrayList<>(); - sourceRecordList.add(currentRecord); - } else if (SourceRecordUtils.isSchemaChangeEvent(previousRecord)) { - sourceRecordList.add(currentRecord); - } else { - sourceRecordList.add( - WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord)); - sourceRecordsSet.add(new SourceRecords(sourceRecordList)); - sourceRecordList = new ArrayList<>(); - sourceRecordList.add(currentRecord); - } - } else if (SourceRecordUtils.isDataChangeRecord(currentRecord) - || SourceRecordUtils.isHeartbeatRecord(currentRecord)) { - if (previousRecord == null - || SourceRecordUtils.isDataChangeRecord(previousRecord) - || SourceRecordUtils.isHeartbeatRecord(previousRecord)) { - sourceRecordList.add(currentRecord); - } else { - sourceRecordList.add( - WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord)); - sourceRecordsSet.add(new SourceRecords(sourceRecordList)); - sourceRecordList = new ArrayList<>(); - sourceRecordList.add(currentRecord); - } - } - previousRecord = currentRecord; - if (i == batchEvents.size() - 1) { - if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) { - sourceRecordList.add( - WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord)); - } - sourceRecordsSet.add(new SourceRecords(sourceRecordList)); - } - } - - if (sourceRecordsSet.size() > 1) { - log.debug( - "Split events stream into {} batches and mark schema checkpoint before/after", - sourceRecordsSet.size()); - } - - return sourceRecordsSet.iterator(); + return new SchemaChangeStreamSplitter().split(batchEvents); } private void checkReadException() { @@ -349,4 +287,97 @@ private void configureFilter() { this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap; this.pureBinlogPhaseTables.clear(); } + + class SchemaChangeStreamSplitter { + private List blockSet; + private List currentBlock; + private SourceRecord previousRecord; + + public SchemaChangeStreamSplitter() { + blockSet = new ArrayList<>(); + currentBlock = new ArrayList<>(); + previousRecord = null; + } + + public Iterator split(List batchEvents) { + for (int i = 0; i < batchEvents.size(); i++) { + DataChangeEvent event = batchEvents.get(i); + SourceRecord currentRecord = event.getRecord(); + if (!shouldEmit(currentRecord)) { + continue; + } + + if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) { + if (!schemaChangeResolver.support(currentRecord)) { + continue; + } + + if (previousRecord == null) { + // add schema-change-before to first + currentBlock.add( + WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord)); + flipBlock(); + + currentBlock.add(currentRecord); + } else if (SourceRecordUtils.isSchemaChangeEvent(previousRecord)) { + currentBlock.add(currentRecord); + } else { + currentBlock.add( + WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord)); + flipBlock(); + + currentBlock.add(currentRecord); + } + } else if (SourceRecordUtils.isDataChangeRecord(currentRecord) + || SourceRecordUtils.isHeartbeatRecord(currentRecord)) { + if (previousRecord == null + || SourceRecordUtils.isDataChangeRecord(previousRecord) + || SourceRecordUtils.isHeartbeatRecord(previousRecord)) { + currentBlock.add(currentRecord); + } else { + endBlock(previousRecord); + flipBlock(); + + currentBlock.add(currentRecord); + } + } + + previousRecord = currentRecord; + if (i == batchEvents.size() - 1) { + endBlock(currentRecord); + flipBlock(); + } + } + + endLastBlock(previousRecord); + + if (blockSet.size() > 1) { + log.debug( + "Split events stream into {} batches and mark schema change checkpoint", + blockSet.size()); + } + + return blockSet.iterator(); + } + + void flipBlock() { + if (!currentBlock.isEmpty()) { + blockSet.add(new SourceRecords(currentBlock)); + currentBlock = new ArrayList<>(); + } + } + + void endBlock(SourceRecord lastRecord) { + if (!currentBlock.isEmpty()) { + if (SourceRecordUtils.isSchemaChangeEvent(lastRecord)) { + currentBlock.add(WatermarkEvent.createSchemaChangeAfterWatermark(lastRecord)); + } + } + } + + void endLastBlock(SourceRecord lastRecord) { + endBlock(lastRecord); + flipBlock(); + } + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java index a17f7f86738..ee8d4d7e5df 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java @@ -29,6 +29,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.stubbing.Answer; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; @@ -61,6 +62,7 @@ public class IncrementalSourceStreamFetcherTest { .with(Heartbeat.HEARTBEAT_INTERVAL, 1) .with(TRANSACTION_TOPIC, "test") .build(); + private static final String UNKNOWN_SCHEMA_KEY = "UNKNOWN"; @Test public void testSplitSchemaChangeStream() throws Exception { @@ -107,6 +109,7 @@ public void testSplitSchemaChangeStream() throws Exception { inputEvents.add(new DataChangeEvent(createDataEvent())); inputEvents.add(new DataChangeEvent(createSchemaChangeEvent())); inputEvents.add(new DataChangeEvent(createSchemaChangeEvent())); + inputEvents.add(new DataChangeEvent(createSchemaChangeUnknownEvent())); outputEvents = fetcher.splitSchemaChangeStream(inputEvents); outputEvents.forEachRemaining(records::add); @@ -134,6 +137,7 @@ public void testSplitSchemaChangeStream() throws Exception { inputEvents.add(new DataChangeEvent(createSchemaChangeEvent())); inputEvents.add(new DataChangeEvent(createDataEvent())); inputEvents.add(new DataChangeEvent(createDataEvent())); + inputEvents.add(new DataChangeEvent(createSchemaChangeUnknownEvent())); outputEvents = fetcher.splitSchemaChangeStream(inputEvents); outputEvents.forEachRemaining(records::add); @@ -323,13 +327,21 @@ public void testSplitSchemaChangeStream() throws Exception { } static SourceRecord createSchemaChangeEvent() { + return createSchemaChangeEvent("SCHEMA_CHANGE_TOPIC"); + } + + static SourceRecord createSchemaChangeUnknownEvent() { + return createSchemaChangeEvent(UNKNOWN_SCHEMA_KEY); + } + + static SourceRecord createSchemaChangeEvent(String topic) { Schema keySchema = SchemaBuilder.struct().name(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME).build(); SourceRecord record = new SourceRecord( Collections.emptyMap(), Collections.emptyMap(), - null, + topic, keySchema, null, null, @@ -377,7 +389,14 @@ static SourceRecord createHeartbeatEvent() throws InterruptedException { static IncrementalSourceStreamFetcher createFetcher() { SchemaChangeResolver schemaChangeResolver = mock(SchemaChangeResolver.class); - when(schemaChangeResolver.support(any())).thenReturn(true); + when(schemaChangeResolver.support(any())) + .thenAnswer( + (Answer) + invocationOnMock -> { + SourceRecord record = invocationOnMock.getArgument(0); + return record.topic() == null + || !record.topic().equalsIgnoreCase(UNKNOWN_SCHEMA_KEY); + }); IncrementalSourceStreamFetcher fetcher = new IncrementalSourceStreamFetcher(null, 0, schemaChangeResolver); IncrementalSourceStreamFetcher spy = spy(fetcher);