Skip to content

Commit

Permalink
[Fix](cdc) fix enable-delete option not work (apache#455)
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 authored Aug 5, 2024
1 parent 27bf18e commit 43f9e01
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<Strin
private final boolean newSchemaChange;
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
private boolean enableDelete = true;
// <cdc db.schema.table, doris db.table>
private Map<String, String> tableMapping;
// create table properties
Expand Down Expand Up @@ -111,6 +112,7 @@ public JsonDebeziumSchemaSerializer(
.getStreamLoadProp()
.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
this.enableDelete = executionOptions.getDeletable();
}
}

Expand Down Expand Up @@ -149,7 +151,8 @@ private void init() {
lineDelimiter,
ignoreUpdateBefore,
targetTablePrefix,
targetTableSuffix);
targetTableSuffix,
enableDelete);
initSchemaChangeInstance(changeContext);
this.dataChange = new JsonDebeziumDataChange(changeContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class JsonDebeziumChangeContext implements Serializable {
private final Pattern pattern;
private final String lineDelimiter;
private final boolean ignoreUpdateBefore;
private final boolean enableDelete;
private final String targetTablePrefix;
private final String targetTableSuffix;

Expand All @@ -55,7 +56,8 @@ public JsonDebeziumChangeContext(
String lineDelimiter,
boolean ignoreUpdateBefore,
String targetTablePrefix,
String targetTableSuffix) {
String targetTableSuffix,
boolean enableDelete) {
this.dorisOptions = dorisOptions;
this.tableMapping = tableMapping;
this.sourceTableName = sourceTableName;
Expand All @@ -65,6 +67,7 @@ public JsonDebeziumChangeContext(
this.pattern = pattern;
this.lineDelimiter = lineDelimiter;
this.ignoreUpdateBefore = ignoreUpdateBefore;
this.enableDelete = enableDelete;
this.targetTablePrefix = targetTablePrefix;
this.targetTableSuffix = targetTableSuffix;
}
Expand Down Expand Up @@ -116,6 +119,10 @@ public String getTargetTableSuffix() {
return targetTableSuffix;
}

public boolean enableDelete() {
return enableDelete;
}

public DorisTableConfig getDorisTableConf() {
return dorisTableConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class JsonDebeziumDataChange extends CdcDataChange {
private final ObjectMapper objectMapper;
private final DorisOptions dorisOptions;
private final boolean ignoreUpdateBefore;
private final boolean enableDelete;
private final String lineDelimiter;
private final JsonDebeziumChangeContext changeContext;

Expand All @@ -59,6 +60,7 @@ public JsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) {
this.objectMapper = changeContext.getObjectMapper();
this.ignoreUpdateBefore = changeContext.isIgnoreUpdateBefore();
this.lineDelimiter = changeContext.getLineDelimiter();
this.enableDelete = changeContext.enableDelete();
}

public DorisRecord serialize(String record, JsonNode recordRoot, String op) throws IOException {
Expand Down Expand Up @@ -87,7 +89,7 @@ public DorisRecord serialize(String record, JsonNode recordRoot, String op) thro
return DorisRecord.of(dorisTableIdentifier, extractUpdate(recordRoot));
case OP_DELETE:
valueMap = extractBeforeRow(recordRoot);
addDeleteSign(valueMap, true);
addDeleteSign(valueMap, enableDelete);
break;
default:
LOG.error("parse record fail, unknown op {} in {}", op, record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize
private final String sourceTableName;
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
private boolean enableDelete = true;
// <cdc db.schema.table, doris db.table>
private Map<String, String> tableMapping;
// create table properties
Expand Down Expand Up @@ -90,6 +91,7 @@ public MongoDBJsonDebeziumSchemaSerializer(
.getStreamLoadProp()
.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
this.enableDelete = executionOptions.getDeletable();
}
init();
}
Expand All @@ -107,7 +109,8 @@ private void init() {
lineDelimiter,
ignoreUpdateBefore,
targetTablePrefix,
targetTableSuffix);
targetTableSuffix,
enableDelete);
this.dataChange = new MongoJsonDebeziumDataChange(changeContext);
this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ public class MongoJsonDebeziumDataChange extends CdcDataChange implements Change
public JsonDebeziumChangeContext changeContext;
public ObjectMapper objectMapper;
public Map<String, String> tableMapping;
private final boolean enableDelete;

public MongoJsonDebeziumDataChange(JsonDebeziumChangeContext changeContext) {
this.changeContext = changeContext;
this.dorisOptions = changeContext.getDorisOptions();
this.objectMapper = changeContext.getObjectMapper();
this.lineDelimiter = changeContext.getLineDelimiter();
this.tableMapping = changeContext.getTableMapping();
this.enableDelete = changeContext.enableDelete();
}

@Override
Expand All @@ -93,7 +95,7 @@ public DorisRecord serialize(String record, JsonNode recordRoot, String op) thro
break;
case OP_DELETE:
valueMap = extractDeleteRow(recordRoot);
addDeleteSign(valueMap, true);
addDeleteSign(valueMap, enableDelete);
break;
default:
LOG.error("parse record fail, unknown op {} in {}", op, record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public void setUp() {
lineDelimiter,
ignoreUpdateBefore,
"",
"");
"",
true);
dataChange = new JsonDebeziumDataChange(changeContext);
}

Expand Down Expand Up @@ -113,7 +114,8 @@ public void testSerializeUpdateBefore() throws IOException {
lineDelimiter,
false,
"",
"");
"",
true);
dataChange = new JsonDebeziumDataChange(changeContext);

// update t1 set name='doris-update' WHERE id =1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public void setUp() {
lineDelimiter,
ignoreUpdateBefore,
"",
"");
"",
true);
schemaChange = new JsonDebeziumSchemaChangeImpl(changeContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public void setUp() {
lineDelimiter,
ignoreUpdateBefore,
"",
"");
"",
true);
schemaChange = new JsonDebeziumSchemaChangeImplV2(changeContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public void setUp() {
lineDelimiter,
ignoreUpdateBefore,
"",
"");
"",
true);
schemaChange = new SQLParserSchemaChange(changeContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,122 @@ public void testMySQL2DorisByDefault() throws Exception {
jobClient.cancel().get();
}

@Test
public void testMySQL2DorisEnableDelete() throws Exception {
printClusterStatus();
initializeMySQLTable();
initializeDorisTable();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
Map<String, String> flinkMap = new HashMap<>();
flinkMap.put("execution.checkpointing.interval", "10s");
flinkMap.put("pipeline.operator-chaining", "false");
flinkMap.put("parallelism.default", "1");

Configuration configuration = Configuration.fromMap(flinkMap);
env.configure(configuration);

String database = DATABASE;
Map<String, String> mysqlConfig = new HashMap<>();
mysqlConfig.put("database-name", DATABASE);
mysqlConfig.put("hostname", MYSQL_CONTAINER.getHost());
mysqlConfig.put("port", MYSQL_CONTAINER.getMappedPort(3306) + "");
mysqlConfig.put("username", MYSQL_USER);
mysqlConfig.put("password", MYSQL_PASSWD);
mysqlConfig.put("server-time-zone", "Asia/Shanghai");
Configuration config = Configuration.fromMap(mysqlConfig);

Map<String, String> sinkConfig = new HashMap<>();
sinkConfig.put("fenodes", getFenodes());
sinkConfig.put("username", USERNAME);
sinkConfig.put("password", PASSWORD);
sinkConfig.put("jdbc-url", String.format(DorisTestBase.URL, DORIS_CONTAINER.getHost()));
sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
sinkConfig.put("sink.check-interval", "5000");
sinkConfig.put("sink.enable-delete", "false");
Configuration sinkConf = Configuration.fromMap(sinkConfig);

Map<String, String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");

String includingTables = "tbl1|tbl2|tbl3|tbl5";
String excludingTables = "";
DatabaseSync databaseSync = new MysqlDatabaseSync();
databaseSync
.setEnv(env)
.setDatabase(database)
.setConfig(config)
.setIncludingTables(includingTables)
.setExcludingTables(excludingTables)
.setIgnoreDefaultValue(false)
.setSinkConfig(sinkConf)
.setTableConfig(tableConfig)
.setCreateTableOnly(false)
.setNewSchemaChange(true)
// no single sink
.setSingleSink(false)
.create();
databaseSync.build();
JobClient jobClient = env.executeAsync();
waitForJobStatus(
jobClient,
Collections.singletonList(RUNNING),
Deadline.fromNow(Duration.ofSeconds(10)));

// wait 2 times checkpoint
Thread.sleep(20000);
List<String> expected = Arrays.asList("doris_1,1", "doris_2,2", "doris_3,3", "doris_5,5");
String sql =
"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
String query1 =
String.format(
sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
TABLE_5);
checkResult(expected, query1, 2);

// add incremental data
try (Connection connection =
DriverManager.getConnection(
MYSQL_CONTAINER.getJdbcUrl(), MYSQL_USER, MYSQL_PASSWD);
Statement statement = connection.createStatement()) {
statement.execute(
String.format("insert into %s.%s values ('doris_1_1',10)", DATABASE, TABLE_1));
statement.execute(
String.format("insert into %s.%s values ('doris_2_1',11)", DATABASE, TABLE_2));
statement.execute(
String.format("insert into %s.%s values ('doris_3_1',12)", DATABASE, TABLE_3));

statement.execute(
String.format(
"update %s.%s set age=18 where name='doris_1'", DATABASE, TABLE_1));
statement.execute(
String.format("delete from %s.%s where name='doris_2'", DATABASE, TABLE_2));
statement.execute(
String.format("delete from %s.%s where name='doris_3'", DATABASE, TABLE_3));
statement.execute(
String.format("delete from %s.%s where name='doris_5'", DATABASE, TABLE_5));
}

Thread.sleep(20000);
List<String> expected2 =
Arrays.asList(
"doris_1,18",
"doris_1_1,10",
"doris_2,2",
"doris_2_1,11",
"doris_3,3",
"doris_3_1,12",
"doris_5,5");
sql =
"select * from ( select * from %s.%s union all select * from %s.%s union all select * from %s.%s union all select * from %s.%s) res order by 1";
String query2 =
String.format(
sql, DATABASE, TABLE_1, DATABASE, TABLE_2, DATABASE, TABLE_3, DATABASE,
TABLE_5);
checkResult(expected2, query2, 2);
jobClient.cancel().get();
}

private void initializeDorisTable() throws Exception {
try (Connection connection =
DriverManager.getConnection(
Expand Down

0 comments on commit 43f9e01

Please sign in to comment.