Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][e2e] Improved e2e start sleep #2677

Merged
merged 6 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ public abstract class FlinkContainer {
private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, "lib").toString();
private static final String SEATUNNEL_CONNECTORS = Paths.get(SEATUNNEL_HOME, "connectors").toString();

private static final int WAIT_FLINK_JOB_SUBMIT = 5000;

private static final String FLINK_PROPERTIES = String.join(
"\n",
Arrays.asList(
Expand Down Expand Up @@ -124,8 +122,6 @@ public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOE
Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
LOG.info(execResult.getStdout());
LOG.error(execResult.getStderr());
// wait job start
Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
return execResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.e2e.flink.v2.jdbc;

import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

import com.google.common.collect.Lists;
Expand All @@ -39,6 +41,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class FakeSourceToJdbcIT extends FlinkContainer {
Expand All @@ -54,9 +57,11 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(psl)).join();
LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
Class.forName(psl.getDriverClassName());
initializeJdbcTable();
given().ignoreExceptions()
.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
hailin0 marked this conversation as resolved.
Show resolved Hide resolved
}

private void initializeJdbcTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.e2e.flink.v2.jdbc;

import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

import org.junit.jupiter.api.AfterEach;
Expand All @@ -37,6 +39,7 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class JdbcSourceToConsoleIT extends FlinkContainer {
Expand All @@ -52,9 +55,11 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(psl)).join();
LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
Class.forName(psl.getDriverClassName());
initializeJdbcTable();
given().ignoreExceptions()
.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
batchInsertData();
}

Expand Down
5 changes: 5 additions & 0 deletions seatunnel-e2e/seatunnel-flink-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.e2e.flink.clickhouse;

import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.flink.FlinkContainer;

import com.google.common.collect.Lists;
Expand All @@ -39,6 +41,7 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class FakeSourceToClickhouseIT extends FlinkContainer {
Expand All @@ -59,9 +62,11 @@ public void startClickhouseContainer() throws InterruptedException {
Startables.deepStart(Stream.of(clickhouseServer)).join();
LOGGER.info("Clickhouse container started");
// wait for clickhouse fully start
Thread.sleep(5000L);
dataSource = createDatasource();
initializeClickhouseTable();
given().ignoreExceptions()
.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeClickhouseTable());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ public abstract class FlinkContainer {
private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, "lib").toString();
private static final String SEATUNNEL_CONNECTORS = Paths.get(SEATUNNEL_HOME, "connectors").toString();

private static final int WAIT_FLINK_JOB_SUBMIT = 5000;

private static final String FLINK_PROPERTIES = String.join(
"\n",
Arrays.asList(
Expand Down Expand Up @@ -123,8 +121,6 @@ public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOE
Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
LOG.info(execResult.getStdout());
LOG.error(execResult.getStderr());
// wait job start
Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
return execResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ public abstract class FlinkContainer {
private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_FLINK_BIN).toString();
private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_FLINK_SQL_JAR).toString();

private static final int WAIT_FLINK_JOB_SUBMIT = 5000;

private static final String FLINK_PROPERTIES = String.join(
"\n",
Arrays.asList(
Expand Down Expand Up @@ -122,8 +120,6 @@ public Container.ExecResult executeSeaTunnelFlinkSqlJob(String confFile)
Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command));
LOG.info(execResult.getStdout());
LOG.error(execResult.getStderr());
// wait job start
Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
return execResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ public abstract class SparkContainer {
private static final String SPARK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_SPARK_JAR).toString();
private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, "connectors").toString();

private static final int WAIT_SPARK_JOB_SUBMIT = 5000;

@BeforeEach
public void before() {
master = new GenericContainer<>(SPARK_DOCKER_IMAGE)
Expand Down Expand Up @@ -106,8 +104,6 @@ public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOE
Container.ExecResult execResult = master.execInContainer("bash", "-c", String.join(" ", command));
LOG.info(execResult.getStdout());
LOG.error(execResult.getStderr());
// wait job start
Thread.sleep(WAIT_SPARK_JOB_SUBMIT);
return execResult;
}

Expand Down
5 changes: 5 additions & 0 deletions seatunnel-e2e/seatunnel-spark-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ public abstract class SparkContainer {
private static final String SPARK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_SPARK_JAR).toString();
private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, "connectors").toString();

private static final int WAIT_SPARK_JOB_SUBMIT = 5000;

@BeforeEach
public void before() {
master = new GenericContainer<>(SPARK_DOCKER_IMAGE)
Expand Down Expand Up @@ -107,8 +105,6 @@ public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOE
Container.ExecResult execResult = master.execInContainer("bash", "-c", String.join(" ", command));
LOG.info(execResult.getStdout());
LOG.error(execResult.getStderr());
// wait job start
Thread.sleep(WAIT_SPARK_JOB_SUBMIT);
return execResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.e2e.spark.jdbc;

import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.spark.SparkContainer;

import com.google.common.collect.Lists;
Expand All @@ -39,6 +41,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class FakeSourceToJdbcIT extends SparkContainer {
Expand All @@ -54,9 +57,11 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
Startables.deepStart(Stream.of(psl)).join();
LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
Class.forName(psl.getDriverClassName());
initializeJdbcTable();
given().ignoreExceptions()
.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
}

private void initializeJdbcTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.e2e.spark.jdbc;

import static org.awaitility.Awaitility.given;

import org.apache.seatunnel.e2e.spark.SparkContainer;

import com.google.common.collect.Lists;
Expand All @@ -38,6 +40,7 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class JdbcSourceToConsoleIT extends SparkContainer {
Expand All @@ -54,9 +57,11 @@ public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoun
psl.setPortBindings(Lists.newArrayList("33306:3306"));
Startables.deepStart(Stream.of(psl)).join();
LOGGER.info("PostgreSql container started");
Thread.sleep(5000L);
Class.forName(psl.getDriverClassName());
initializeJdbcTable();
given().ignoreExceptions()
.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> initializeJdbcTable());
batchInsertData();
}

Expand Down