Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix][CDC] Fix split schema change stream #7003

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,69 +171,7 @@ private Iterator<SourceRecords> splitNormalStream(List<DataChangeEvent> batchEve
* checkpoint-after] [a, b, c, d, e]
*/
Iterator<SourceRecords> splitSchemaChangeStream(List<DataChangeEvent> batchEvents) {
List<SourceRecords> sourceRecordsSet = new ArrayList<>();

List<SourceRecord> sourceRecordList = new ArrayList<>();
SourceRecord previousRecord = null;
for (int i = 0; i < batchEvents.size(); i++) {
DataChangeEvent event = batchEvents.get(i);
SourceRecord currentRecord = event.getRecord();
if (!shouldEmit(currentRecord)) {
continue;
}

if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
if (!schemaChangeResolver.support(currentRecord)) {
continue;
}

if (previousRecord == null) {
// add schema-change-before to first
sourceRecordList.add(
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
sourceRecordsSet.add(new SourceRecords(sourceRecordList));
sourceRecordList = new ArrayList<>();
sourceRecordList.add(currentRecord);
} else if (SourceRecordUtils.isSchemaChangeEvent(previousRecord)) {
sourceRecordList.add(currentRecord);
} else {
sourceRecordList.add(
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
sourceRecordsSet.add(new SourceRecords(sourceRecordList));
sourceRecordList = new ArrayList<>();
sourceRecordList.add(currentRecord);
}
} else if (SourceRecordUtils.isDataChangeRecord(currentRecord)
|| SourceRecordUtils.isHeartbeatRecord(currentRecord)) {
if (previousRecord == null
|| SourceRecordUtils.isDataChangeRecord(previousRecord)
|| SourceRecordUtils.isHeartbeatRecord(previousRecord)) {
sourceRecordList.add(currentRecord);
} else {
sourceRecordList.add(
WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord));
sourceRecordsSet.add(new SourceRecords(sourceRecordList));
sourceRecordList = new ArrayList<>();
sourceRecordList.add(currentRecord);
}
}
previousRecord = currentRecord;
if (i == batchEvents.size() - 1) {
if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
sourceRecordList.add(
WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord));
}
sourceRecordsSet.add(new SourceRecords(sourceRecordList));
}
}

if (sourceRecordsSet.size() > 1) {
log.debug(
"Split events stream into {} batches and mark schema checkpoint before/after",
sourceRecordsSet.size());
}

return sourceRecordsSet.iterator();
return new SchemaChangeStreamSplitter().split(batchEvents);
}

private void checkReadException() {
Expand Down Expand Up @@ -349,4 +287,97 @@ private void configureFilter() {
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
this.pureBinlogPhaseTables.clear();
}

class SchemaChangeStreamSplitter {
private List<SourceRecords> blockSet;
private List<SourceRecord> currentBlock;
private SourceRecord previousRecord;

public SchemaChangeStreamSplitter() {
blockSet = new ArrayList<>();
currentBlock = new ArrayList<>();
previousRecord = null;
}

public Iterator<SourceRecords> split(List<DataChangeEvent> batchEvents) {
for (int i = 0; i < batchEvents.size(); i++) {
DataChangeEvent event = batchEvents.get(i);
SourceRecord currentRecord = event.getRecord();
if (!shouldEmit(currentRecord)) {
continue;
}

if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) {
if (!schemaChangeResolver.support(currentRecord)) {
continue;
}

if (previousRecord == null) {
// add schema-change-before to first
currentBlock.add(
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
flipBlock();

currentBlock.add(currentRecord);
} else if (SourceRecordUtils.isSchemaChangeEvent(previousRecord)) {
currentBlock.add(currentRecord);
} else {
currentBlock.add(
WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord));
flipBlock();

currentBlock.add(currentRecord);
}
} else if (SourceRecordUtils.isDataChangeRecord(currentRecord)
|| SourceRecordUtils.isHeartbeatRecord(currentRecord)) {
if (previousRecord == null
|| SourceRecordUtils.isDataChangeRecord(previousRecord)
|| SourceRecordUtils.isHeartbeatRecord(previousRecord)) {
currentBlock.add(currentRecord);
} else {
endBlock(previousRecord);
flipBlock();

currentBlock.add(currentRecord);
}
}

previousRecord = currentRecord;
if (i == batchEvents.size() - 1) {
endBlock(currentRecord);
flipBlock();
}
}

endLastBlock(previousRecord);

if (blockSet.size() > 1) {
log.debug(
"Split events stream into {} batches and mark schema change checkpoint",
blockSet.size());
}

return blockSet.iterator();
}

void flipBlock() {
if (!currentBlock.isEmpty()) {
blockSet.add(new SourceRecords(currentBlock));
currentBlock = new ArrayList<>();
}
}

void endBlock(SourceRecord lastRecord) {
if (!currentBlock.isEmpty()) {
if (SourceRecordUtils.isSchemaChangeEvent(lastRecord)) {
currentBlock.add(WatermarkEvent.createSchemaChangeAfterWatermark(lastRecord));
}
}
}

void endLastBlock(SourceRecord lastRecord) {
endBlock(lastRecord);
flipBlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class IncrementalSourceStreamFetcherTest {
.with(Heartbeat.HEARTBEAT_INTERVAL, 1)
.with(TRANSACTION_TOPIC, "test")
.build();
private static final String UNKNOWN_SCHEMA_KEY = "UNKNOWN";

@Test
public void testSplitSchemaChangeStream() throws Exception {
Expand Down Expand Up @@ -107,6 +109,7 @@ public void testSplitSchemaChangeStream() throws Exception {
inputEvents.add(new DataChangeEvent(createDataEvent()));
inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
inputEvents.add(new DataChangeEvent(createSchemaChangeUnknownEvent()));
outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
outputEvents.forEachRemaining(records::add);

Expand Down Expand Up @@ -134,6 +137,7 @@ public void testSplitSchemaChangeStream() throws Exception {
inputEvents.add(new DataChangeEvent(createSchemaChangeEvent()));
inputEvents.add(new DataChangeEvent(createDataEvent()));
inputEvents.add(new DataChangeEvent(createDataEvent()));
inputEvents.add(new DataChangeEvent(createSchemaChangeUnknownEvent()));
outputEvents = fetcher.splitSchemaChangeStream(inputEvents);
outputEvents.forEachRemaining(records::add);

Expand Down Expand Up @@ -323,13 +327,21 @@ public void testSplitSchemaChangeStream() throws Exception {
}

static SourceRecord createSchemaChangeEvent() {
return createSchemaChangeEvent("SCHEMA_CHANGE_TOPIC");
}

static SourceRecord createSchemaChangeUnknownEvent() {
return createSchemaChangeEvent(UNKNOWN_SCHEMA_KEY);
}

static SourceRecord createSchemaChangeEvent(String topic) {
Schema keySchema =
SchemaBuilder.struct().name(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME).build();
SourceRecord record =
new SourceRecord(
Collections.emptyMap(),
Collections.emptyMap(),
null,
topic,
keySchema,
null,
null,
Expand Down Expand Up @@ -377,7 +389,14 @@ static SourceRecord createHeartbeatEvent() throws InterruptedException {

static IncrementalSourceStreamFetcher createFetcher() {
SchemaChangeResolver schemaChangeResolver = mock(SchemaChangeResolver.class);
when(schemaChangeResolver.support(any())).thenReturn(true);
when(schemaChangeResolver.support(any()))
.thenAnswer(
(Answer<Boolean>)
invocationOnMock -> {
SourceRecord record = invocationOnMock.getArgument(0);
return record.topic() == null
|| !record.topic().equalsIgnoreCase(UNKNOWN_SCHEMA_KEY);
});
IncrementalSourceStreamFetcher fetcher =
new IncrementalSourceStreamFetcher(null, 0, schemaChangeResolver);
IncrementalSourceStreamFetcher spy = spy(fetcher);
Expand Down
Loading