diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml index c8bb6374e78..a800ca3e1a4 100644 --- a/seatunnel-e2e/pom.xml +++ b/seatunnel-e2e/pom.xml @@ -56,6 +56,11 @@ ${junit4.version} test + + org.awaitility + awaitility + test + diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/FakeSourceToIoTDBIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/FakeSourceToIoTDBIT.java index b126831cc47..704d23228ac 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/FakeSourceToIoTDBIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/FakeSourceToIoTDBIT.java @@ -70,9 +70,11 @@ public void startIoTDBContainer() throws Exception { // wait for IoTDB fully start session = createSession(); given().ignoreExceptions() - .await() - .atMost(30, TimeUnit.SECONDS) - .untilAsserted(() -> session.open()); + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> session.open()); initIoTDBTimeseries(); } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java index 9bc241bf477..f837815719e 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.e2e.flink.v2.jdbc; +import static org.awaitility.Awaitility.given; + import org.apache.seatunnel.e2e.flink.FlinkContainer; import com.google.common.collect.Lists; @@ -39,6 +41,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; public class FakeSourceToJdbcIT extends FlinkContainer { @@ -54,9 +57,13 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun .withLogConsumer(new Slf4jLogConsumer(LOGGER)); Startables.deepStart(Stream.of(psl)).join(); LOGGER.info("PostgreSql container started"); - Thread.sleep(5000L); Class.forName(psl.getDriverClassName()); - initializeJdbcTable(); + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> initializeJdbcTable()); } private void initializeJdbcTable() { diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java index b88aab09350..861200526e6 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.e2e.flink.v2.jdbc; -import static org.testcontainers.shaded.org.awaitility.Awaitility.given; +import static org.awaitility.Awaitility.given; import org.apache.seatunnel.e2e.flink.FlinkContainer; diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java index 715441032e1..ac436650165 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.e2e.flink.v2.jdbc; -import static org.testcontainers.shaded.org.awaitility.Awaitility.given; +import static org.awaitility.Awaitility.given; import org.apache.seatunnel.e2e.flink.FlinkContainer; @@ -76,9 +76,11 @@ public void startGreenplumContainer() throws ClassNotFoundException, SQLExceptio // wait for Greenplum fully start Class.forName(GREENPLUM_DRIVER); given().ignoreExceptions() - .await() - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(() -> initializeJdbcConnection()); + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(() -> initializeJdbcConnection()); initializeJdbcTable(); batchInsertData(); } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java index 14eba119d06..bee52537353 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.e2e.flink.v2.jdbc; +import static org.awaitility.Awaitility.given; + import org.apache.seatunnel.e2e.flink.FlinkContainer; import org.junit.jupiter.api.AfterEach; @@ -37,6 +39,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; public class JdbcSourceToConsoleIT extends FlinkContainer { @@ -52,9 +55,13 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun .withLogConsumer(new Slf4jLogConsumer(LOGGER)); Startables.deepStart(Stream.of(psl)).join(); LOGGER.info("PostgreSql container started"); - Thread.sleep(5000L); Class.forName(psl.getDriverClassName()); - initializeJdbcTable(); + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> initializeJdbcTable()); batchInsertData(); } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java index 1c5dc90ceae..2ce0d42413e 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbSourceToConsoleIT.java @@ -26,6 +26,7 @@ import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; import org.bson.Document; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -36,7 +37,6 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.lifecycle.Startables; -import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.utility.DockerImageName; import java.io.IOException; @@ -77,7 +77,8 @@ public void startMongoContainer() { Startables.deepStart(Stream.of(mongodbContainer)).join(); log.info("Mongodb container started"); Awaitility.given().ignoreExceptions() - .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) .atMost(180, TimeUnit.SECONDS) .untilAsserted(this::initConnection); this.generateTestData(); diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java index 9fd929d2f71..de2e8950fc8 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-redis-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/redis/RedisIT.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.e2e.flink.v2.redis; -import static org.testcontainers.shaded.org.awaitility.Awaitility.given; +import static org.awaitility.Awaitility.given; import org.apache.seatunnel.e2e.flink.FlinkContainer; @@ -56,9 +56,11 @@ public void startRedisContainer() { Startables.deepStart(Stream.of(redisContainer)).join(); log.info("Redis container started"); given().ignoreExceptions() - .await() - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(this::initJedis); + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initJedis); this.generateTestData(); } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml index 61d7911a548..f9f46ffa507 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml @@ -45,10 +45,5 @@ ${project.version} test - - org.awaitility - awaitility - test - \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java index 7340f9b5322..4790c2bb7b7 100644 --- a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.e2e.flink.clickhouse; +import static org.awaitility.Awaitility.given; + import org.apache.seatunnel.e2e.flink.FlinkContainer; import com.google.common.collect.Lists; @@ -39,6 +41,7 @@ import java.sql.SQLException; import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; public class FakeSourceToClickhouseIT extends FlinkContainer { @@ -59,9 +62,13 @@ public void startClickhouseContainer() throws InterruptedException { Startables.deepStart(Stream.of(clickhouseServer)).join(); LOGGER.info("Clickhouse container started"); // wait for clickhouse fully start - Thread.sleep(5000L); dataSource = createDatasource(); - initializeClickhouseTable(); + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> initializeClickhouseTable()); } /** diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/FakeSourceToIoTDBIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/FakeSourceToIoTDBIT.java index 66b044fb044..40a9a8ce6b1 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/FakeSourceToIoTDBIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/FakeSourceToIoTDBIT.java @@ -69,9 +69,11 @@ public void startIoTDBContainer() throws Exception { // wait for IoTDB fully start session = createSession(); given().ignoreExceptions() - .await() - .atMost(30, TimeUnit.SECONDS) - .untilAsserted(() -> session.open()); + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> session.open()); initIoTDBTimeseries(); } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java index d1eb5043d9e..9845c3c20fe 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.e2e.spark.v2.jdbc; -import static org.testcontainers.shaded.org.awaitility.Awaitility.given; +import static org.awaitility.Awaitility.given; import org.apache.seatunnel.e2e.spark.SparkContainer; diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java index 7910a0dde5d..b11910eeef1 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.e2e.spark.v2.jdbc; -import static org.testcontainers.shaded.org.awaitility.Awaitility.given; +import static org.awaitility.Awaitility.given; import org.apache.seatunnel.e2e.spark.SparkContainer; @@ -76,9 +76,11 @@ public void startGreenplumContainer() throws ClassNotFoundException, SQLExceptio // wait for Greenplum fully start Class.forName(GREENPLUM_DRIVER); given().ignoreExceptions() - .await() - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(() -> initializeJdbcConnection()); + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(() -> initializeJdbcConnection()); initializeJdbcTable(); batchInsertData(); } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java index 00b84531746..7efdd438a47 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-redis-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/redis/RedisIT.java @@ -17,7 +17,7 @@ package org.apache.seatunnel.e2e.spark.v2.redis; -import static org.testcontainers.shaded.org.awaitility.Awaitility.given; +import static org.awaitility.Awaitility.given; import org.apache.seatunnel.e2e.spark.SparkContainer; @@ -56,9 +56,11 @@ public void startRedisContainer() { Startables.deepStart(Stream.of(redisContainer)).join(); log.info("Redis container started"); given().ignoreExceptions() - .await() - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(this::initJedis); + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::initJedis); this.generateTestData(); } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml index 898bf04c930..02dcbe620da 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml @@ -43,12 +43,6 @@ ${project.version} test - - - org.awaitility - awaitility - test - \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java index 061de4e1ab2..c2d2a142661 100644 --- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java +++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.e2e.spark.jdbc; +import static org.awaitility.Awaitility.given; + import org.apache.seatunnel.e2e.spark.SparkContainer; import com.google.common.collect.Lists; @@ -39,6 +41,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; public class FakeSourceToJdbcIT extends SparkContainer { @@ -54,9 +57,13 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun .withLogConsumer(new Slf4jLogConsumer(LOGGER)); Startables.deepStart(Stream.of(psl)).join(); LOGGER.info("PostgreSql container started"); - Thread.sleep(5000L); Class.forName(psl.getDriverClassName()); - initializeJdbcTable(); + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> initializeJdbcTable()); } private void initializeJdbcTable() { diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java index f430070197d..cf9be5e9ea6 100644 --- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java +++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.e2e.spark.jdbc; +import static org.awaitility.Awaitility.given; + import org.apache.seatunnel.e2e.spark.SparkContainer; import com.google.common.collect.Lists; @@ -38,6 +40,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; public class JdbcSourceToConsoleIT extends SparkContainer { @@ -54,9 +57,13 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun psl.setPortBindings(Lists.newArrayList("33306:3306")); Startables.deepStart(Stream.of(psl)).join(); LOGGER.info("PostgreSql container started"); - Thread.sleep(5000L); Class.forName(psl.getDriverClassName()); - initializeJdbcTable(); + given().ignoreExceptions() + .await() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> initializeJdbcTable()); batchInsertData(); }