Skip to content

Commit

Permalink
[Improve][Connector-V2] Add test case for table structure vertify
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Jun 18, 2024
1 parent 379c61a commit 244fe08
Showing 1 changed file with 28 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class MysqlCDCWithSchemaChangeIT extends TestSuiteBase implements TestRes
private static final String MYSQL_USER_PASSWORD = "mysqlpw";

private static final String QUERY = "select * from %s.%s";
private static final String DESC = "desc %s.%s";
private static final String PROJECTION_QUERY =
"select id,name,description,weight,add_column1,add_column2,add_column3 from %s.%s;";

Expand Down Expand Up @@ -148,7 +149,7 @@ public void testMysqlCdcWithSchemaEvolutionCaseExactlyOnce(TestContainer contain
}

private void assertSchemaEvolution(String database, String sourceTable, String sinkTable) {
await().atMost(60000, TimeUnit.MILLISECONDS)
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertIterableEquals(
Expand All @@ -157,7 +158,13 @@ private void assertSchemaEvolution(String database, String sourceTable, String s

// case1 add columns with cdc data at same time
shopDatabase.setTemplateName("add_columns").createAndInitialize();
await().atMost(60000, TimeUnit.MILLISECONDS)
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertIterableEquals(
query(String.format(DESC, database, sourceTable)),
query(String.format(DESC, database, sinkTable))));
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertIterableEquals(
Expand Down Expand Up @@ -198,26 +205,30 @@ private void assertSchemaEvolution(String database, String sourceTable, String s
});

// case2 drop columns with cdc data at same time
shopDatabase.setTemplateName("drop_columns").createAndInitialize();
await().atMost(60000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertIterableEquals(
query(String.format(QUERY, database, sourceTable)),
query(String.format(QUERY, database, sinkTable))));
assertCaseByDdlName("drop_columns", database, sourceTable, sinkTable);

// case3 change column name with cdc data at same time
shopDatabase.setTemplateName("change_columns").createAndInitialize();
await().atMost(60000, TimeUnit.MILLISECONDS)
assertCaseByDdlName("change_columns", database, sourceTable, sinkTable);

// case4 modify column data type with cdc data at same time
assertCaseByDdlName("modify_columns", database, sourceTable, sinkTable);
}

private void assertCaseByDdlName(
String drop_columns, String database, String sourceTable, String sinkTable) {
shopDatabase.setTemplateName(drop_columns).createAndInitialize();
assertTableStructureAndData(database, sourceTable, sinkTable);
}

private void assertTableStructureAndData(
String database, String sourceTable, String sinkTable) {
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertIterableEquals(
query(String.format(QUERY, database, sourceTable)),
query(String.format(QUERY, database, sinkTable))));

// case4 modify column data type with cdc data at same time
shopDatabase.setTemplateName("modify_columns").createAndInitialize();
await().atMost(60000, TimeUnit.MILLISECONDS)
query(String.format(DESC, database, sourceTable)),
query(String.format(DESC, database, sinkTable))));
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertIterableEquals(
Expand Down

0 comments on commit 244fe08

Please sign in to comment.