diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java index 6f1b12e00b7..531f6f03f96 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCWithSchemaChangeIT.java @@ -73,6 +73,7 @@ public class MysqlCDCWithSchemaChangeIT extends TestSuiteBase implements TestRes private static final String MYSQL_USER_PASSWORD = "mysqlpw"; private static final String QUERY = "select * from %s.%s"; + private static final String DESC = "desc %s.%s"; private static final String PROJECTION_QUERY = "select id,name,description,weight,add_column1,add_column2,add_column3 from %s.%s;"; @@ -148,7 +149,7 @@ public void testMysqlCdcWithSchemaEvolutionCaseExactlyOnce(TestContainer contain } private void assertSchemaEvolution(String database, String sourceTable, String sinkTable) { - await().atMost(60000, TimeUnit.MILLISECONDS) + await().atMost(30000, TimeUnit.MILLISECONDS) .untilAsserted( () -> Assertions.assertIterableEquals( @@ -157,7 +158,13 @@ private void assertSchemaEvolution(String database, String sourceTable, String s // case1 add columns with cdc data at same time shopDatabase.setTemplateName("add_columns").createAndInitialize(); - await().atMost(60000, TimeUnit.MILLISECONDS) + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertIterableEquals( + query(String.format(DESC, database, sourceTable)), + query(String.format(DESC, database, sinkTable)))); + await().atMost(30000, TimeUnit.MILLISECONDS) .untilAsserted( () -> { Assertions.assertIterableEquals( @@ -198,26 +205,30 @@ private void assertSchemaEvolution(String database, String sourceTable, String s }); // case2 drop columns with cdc data at same time - shopDatabase.setTemplateName("drop_columns").createAndInitialize(); - await().atMost(60000, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> - Assertions.assertIterableEquals( - query(String.format(QUERY, database, sourceTable)), - query(String.format(QUERY, database, sinkTable)))); + assertCaseByDdlName("drop_columns", database, sourceTable, sinkTable); // case3 change column name with cdc data at same time - shopDatabase.setTemplateName("change_columns").createAndInitialize(); - await().atMost(60000, TimeUnit.MILLISECONDS) + assertCaseByDdlName("change_columns", database, sourceTable, sinkTable); + + // case4 modify column data type with cdc data at same time + assertCaseByDdlName("modify_columns", database, sourceTable, sinkTable); + } + + private void assertCaseByDdlName( + String drop_columns, String database, String sourceTable, String sinkTable) { + shopDatabase.setTemplateName(drop_columns).createAndInitialize(); + assertTableStructureAndData(database, sourceTable, sinkTable); + } + + private void assertTableStructureAndData( + String database, String sourceTable, String sinkTable) { + await().atMost(30000, TimeUnit.MILLISECONDS) .untilAsserted( () -> Assertions.assertIterableEquals( - query(String.format(QUERY, database, sourceTable)), - query(String.format(QUERY, database, sinkTable)))); - - // case4 modify column data type with cdc data at same time - shopDatabase.setTemplateName("modify_columns").createAndInitialize(); - await().atMost(60000, TimeUnit.MILLISECONDS) + query(String.format(DESC, database, sourceTable)), + query(String.format(DESC, database, sinkTable)))); + await().atMost(30000, TimeUnit.MILLISECONDS) .untilAsserted( () -> Assertions.assertIterableEquals(