From 7e50617433d46f926a40bbe3a0cfc3132b98ae36 Mon Sep 17 00:00:00 2001 From: Zongwen Li Date: Fri, 9 Sep 2022 16:00:48 +0800 Subject: [PATCH] [Improve][e2e] Container only copy required connector jars (#2675) * [Improve][e2e] flink container only copy required connector jars * rename flink-e2e-common * remove useless imported * [Improve][e2e] flink sql container refactoring * remove useless imported * remove useless * [Improve][e2e] spark container only copy required connector jars * change for code review * Use e2e-common module directly * checkstyle * code format --- .../api/configuration/ReadonlyConfig.java | 12 +- seatunnel-e2e/pom.xml | 8 +- seatunnel-e2e/seatunnel-e2e-common/pom.xml | 55 ++++++ .../e2e/common/AbstractContainer.java | 103 +++++++++++ .../e2e/common/AbstractFlinkContainer.java | 121 +++++++++++++ .../e2e/common/AbstractSparkContainer.java | 87 +++++++++ .../seatunnel/e2e/common/ContainerUtil.java | 165 ++++++++++++++++++ .../connector-flink-e2e-base/pom.xml | 11 +- .../seatunnel/e2e/flink/FlinkContainer.java | 150 +++------------- .../seatunnel-flink-connector-v2-e2e/pom.xml | 2 +- .../pom.xml | 12 +- .../seatunnel/e2e/flink/FlinkContainer.java | 149 ++-------------- .../pom.xml | 11 +- .../e2e/flink/sql/FlinkContainer.java | 129 ++------------ .../flink/sql/fake/DatagenToConsoleIT.java | 5 +- .../connector-spark-e2e-base/pom.xml | 11 +- .../seatunnel/e2e/spark/SparkContainer.java | 143 ++------------- .../seatunnel-spark-connector-v2-e2e/pom.xml | 2 +- .../pom.xml | 11 +- .../seatunnel/e2e/spark/SparkContainer.java | 139 ++------------- 20 files changed, 679 insertions(+), 647 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-e2e-common/pom.xml create mode 100644 seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java create mode 100644 seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java create mode 100644 seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java create mode 100644 seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/ContainerUtil.java diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java index beb9d08a9f01..32e8db2734e5 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ReadonlyConfig.java @@ -66,18 +66,24 @@ public T get(Option option) { return getOptional(option).orElseGet(option::defaultValue); } - @SuppressWarnings("MagicNumber") public Map toMap() { if (confData.isEmpty()) { return Collections.emptyMap(); } + Map result = new HashMap<>(); + toMap(result); + return result; + } + + public void toMap(Map result) { + if (confData.isEmpty()) { + return; + } Map flatteningMap = flatteningMap(confData); - Map result = new HashMap<>((flatteningMap.size() << 2) / 3 + 1); for (Map.Entry entry : flatteningMap.entrySet()) { result.put(entry.getKey(), convertToJsonString(entry.getValue())); } - return result; } @SuppressWarnings("unchecked") diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml index c9b0b975a776..c8bb6374e787 100644 --- a/seatunnel-e2e/pom.xml +++ b/seatunnel-e2e/pom.xml @@ -27,15 +27,17 @@ pom - seatunnel-flink-e2e - seatunnel-spark-e2e + seatunnel-e2e-common seatunnel-flink-connector-v2-e2e - seatunnel-spark-connector-v2-e2e + seatunnel-flink-e2e seatunnel-flink-sql-e2e + seatunnel-spark-connector-v2-e2e + seatunnel-spark-e2e 4.13.2 + 2.4 diff --git a/seatunnel-e2e/seatunnel-e2e-common/pom.xml b/seatunnel-e2e/seatunnel-e2e-common/pom.xml new file mode 100644 index 000000000000..3ddc46cb5c16 --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/pom.xml @@ -0,0 +1,55 @@ + + + + + seatunnel-e2e + org.apache.seatunnel + ${revision} + + 4.0.0 + + seatunnel-e2e-common + + + + org.apache.seatunnel + seatunnel-api + ${project.version} + + + + + + + org.apache.maven.plugins + maven-jar-plugin + ${maven-jar-plugin.version} + + false + + + + + test-jar + + + + + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java new file mode 100644 index 000000000000..f4fb9d1169f9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.common; + +import static org.apache.seatunnel.e2e.common.ContainerUtil.PROJECT_ROOT_PATH; +import static org.apache.seatunnel.e2e.common.ContainerUtil.adaptPathForWin; +import static org.apache.seatunnel.e2e.common.ContainerUtil.copyConfigFileToContainer; +import static org.apache.seatunnel.e2e.common.ContainerUtil.copyConnectorJarToContainer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +public abstract class AbstractContainer { + protected static final Logger LOG = LoggerFactory.getLogger(AbstractContainer.class); + protected static final String START_ROOT_MODULE_NAME = "seatunnel-core"; + + protected final String startModuleName; + + protected final String startModuleFullPath; + + public AbstractContainer() { + String[] modules = getStartModulePath().split(File.separator); + this.startModuleName = modules[modules.length - 1]; + this.startModuleFullPath = PROJECT_ROOT_PATH + File.separator + + START_ROOT_MODULE_NAME + File.separator + getStartModulePath(); + } + + protected abstract String getDockerImage(); + + protected abstract String getStartModulePath(); + + protected abstract String getStartShellName(); + + protected abstract String getConnectorModulePath(); + + protected abstract String getConnectorType(); + + protected abstract String getConnectorNamePrefix(); + + protected abstract String getSeaTunnelHomeInContainer(); + + protected abstract List getExtraStartShellCommands(); + + protected void copySeaTunnelStarter(GenericContainer container) { + String[] modules = getStartModulePath().split(File.separator); + final String startModuleName = modules[modules.length - 1]; + ContainerUtil.copySeaTunnelStarter(container, + startModuleName, + PROJECT_ROOT_PATH + File.separator + START_ROOT_MODULE_NAME + File.separator + getStartModulePath(), + getSeaTunnelHomeInContainer(), + getStartShellName()); + } + + protected Container.ExecResult executeJob(GenericContainer container, String confFile) throws IOException, InterruptedException { + final String confInContainerPath = copyConfigFileToContainer(container, confFile); + // copy connectors + copyConnectorJarToContainer(container, + confFile, + getConnectorModulePath(), + getConnectorNamePrefix(), + getConnectorType(), + getSeaTunnelHomeInContainer()); + return executeCommand(container, confInContainerPath); + } + + protected Container.ExecResult executeCommand(GenericContainer container, String configPath) throws IOException, InterruptedException { + final List command = new ArrayList<>(); + String binPath = Paths.get(getSeaTunnelHomeInContainer(), "bin", getStartShellName()).toString(); + // base command + command.add(adaptPathForWin(binPath)); + command.add("--config"); + command.add(adaptPathForWin(configPath)); + command.addAll(getExtraStartShellCommands()); + + Container.ExecResult execResult = container.execInContainer("bash", "-c", String.join(" ", command)); + LOG.info(execResult.getStdout()); + LOG.error(execResult.getStderr()); + return execResult; + } +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java new file mode 100644 index 000000000000..1f06648eb917 --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.common; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +/** + * This class is the base class of FlinkEnvironment test. + * The before method will create a Flink cluster, and after method will close the Flink cluster. + * You can use {@link AbstractFlinkContainer#executeSeaTunnelFlinkJob} to submit a seatunnel config and run a seatunnel job. + */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class AbstractFlinkContainer extends AbstractContainer { + + protected static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkContainer.class); + + protected static final String FLINK_SEATUNNEL_HOME = "/tmp/flink/seatunnel"; + + protected static final Network NETWORK = Network.newNetwork(); + + protected static final List DEFAULT_FLINK_PROPERTIES = Arrays.asList( + "jobmanager.rpc.address: jobmanager", + "taskmanager.numberOfTaskSlots: 10", + "parallelism.default: 4", + "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"); + + protected static final String DEFAULT_DOCKER_IMAGE = "flink:1.13.6-scala_2.11"; + + protected GenericContainer jobManager; + protected GenericContainer taskManager; + + @Override + protected String getDockerImage() { + return DEFAULT_DOCKER_IMAGE; + } + + @Override + protected String getSeaTunnelHomeInContainer() { + return FLINK_SEATUNNEL_HOME; + } + + @BeforeAll + public void before() { + final String dockerImage = getDockerImage(); + final String properties = String.join("\n", getFlinkProperties()); + jobManager = new GenericContainer<>(dockerImage) + .withCommand("jobmanager") + .withNetwork(NETWORK) + .withNetworkAliases("jobmanager") + .withExposedPorts() + .withEnv("FLINK_PROPERTIES", properties) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + taskManager = + new GenericContainer<>(dockerImage) + .withCommand("taskmanager") + .withNetwork(NETWORK) + .withNetworkAliases("taskmanager") + .withEnv("FLINK_PROPERTIES", properties) + .dependsOn(jobManager) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + Startables.deepStart(Stream.of(jobManager)).join(); + Startables.deepStart(Stream.of(taskManager)).join(); + copySeaTunnelStarter(jobManager); + LOG.info("Flink containers are started."); + } + + protected List getFlinkProperties() { + return DEFAULT_FLINK_PROPERTIES; + } + + @AfterAll + public void close() { + if (taskManager != null) { + taskManager.stop(); + } + if (jobManager != null) { + jobManager.stop(); + } + } + + @Override + protected List getExtraStartShellCommands() { + return Collections.emptyList(); + } + + public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOException, InterruptedException { + return executeJob(jobManager, confFile); + } +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java new file mode 100644 index 000000000000..fc16846c1db7 --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.common; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class AbstractSparkContainer extends AbstractContainer { + private static final Logger LOG = LoggerFactory.getLogger(AbstractSparkContainer.class); + + private static final String SPARK_SEATUNNEL_HOME = "/tmp/spark/seatunnel"; + private static final String DEFAULT_DOCKER_IMAGE = "bitnami/spark:2.4.3"; + public static final Network NETWORK = Network.newNetwork(); + + protected GenericContainer master; + + @Override + protected String getDockerImage() { + return DEFAULT_DOCKER_IMAGE; + } + + @Override + protected String getSeaTunnelHomeInContainer() { + return SPARK_SEATUNNEL_HOME; + } + + @BeforeAll + public void before() { + master = new GenericContainer<>(getDockerImage()) + .withNetwork(NETWORK) + .withNetworkAliases("spark-master") + .withExposedPorts() + .withEnv("SPARK_MODE", "master") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to + // start a worker. + Startables.deepStart(Stream.of(master)).join(); + copySeaTunnelStarter(master); + LOG.info("Spark container started"); + } + + @AfterAll + public void close() { + if (master != null) { + master.stop(); + } + } + + @Override + protected List getExtraStartShellCommands() { + return Arrays.asList("--master local", + "--deploy-mode client"); + } + + public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException { + return executeJob(master, confFile); + } +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/ContainerUtil.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/ContainerUtil.java new file mode 100644 index 000000000000..237e30f0911c --- /dev/null +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/ContainerUtil.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.common; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.MountableFile; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; + +public final class ContainerUtil { + + public static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties"; + + /** + * An error occurs when the user is not a submodule of seatunnel-e2e. + */ + public static final String PROJECT_ROOT_PATH = System.getProperty("user.dir").split("/seatunnel-e2e/")[0]; + + public static void copyConnectorJarToContainer(GenericContainer container, + String confFile, + String connectorsRootPath, + String connectorPrefix, + String connectorType, + String seatunnelHome) { + Config jobConfig = getConfig(getConfigFile(confFile)); + Config connectorsMapping = getConfig(new File(PROJECT_ROOT_PATH + File.separator + PLUGIN_MAPPING_FILE)); + if (!connectorsMapping.hasPath(connectorType) || connectorsMapping.getConfig(connectorType).isEmpty()) { + return; + } + Config connectors = connectorsMapping.getConfig(connectorType); + Set connectorNames = getConnectors(jobConfig, connectors, "source"); + connectorNames.addAll(getConnectors(jobConfig, connectors, "sink")); + File module = new File(PROJECT_ROOT_PATH + File.separator + connectorsRootPath); + + List connectorFiles = getConnectorFiles(module, connectorNames, connectorPrefix); + connectorFiles.forEach(jar -> + container.copyFileToContainer( + MountableFile.forHostPath(jar.getAbsolutePath()), + Paths.get(Paths.get(seatunnelHome, "connectors").toString(), connectorType, jar.getName()).toString())); + } + + public static String copyConfigFileToContainer(GenericContainer container, String confFile) { + final String targetConfInContainer = Paths.get("/tmp", confFile).toString(); + container.copyFileToContainer(MountableFile.forHostPath(getConfigFile(confFile).getAbsolutePath()), targetConfInContainer); + return targetConfInContainer; + } + + public static void copySeaTunnelStarter(GenericContainer container, + String startModuleName, + String startModulePath, + String seatunnelHomeInContainer, + String startShellName) { + final String startJarName = startModuleName + ".jar"; + // copy lib + final String startJarPath = startModulePath + File.separator + "target" + File.separator + startJarName; + container.copyFileToContainer( + MountableFile.forHostPath(startJarPath), + Paths.get(Paths.get(seatunnelHomeInContainer, "lib").toString(), startJarName).toString()); + + // copy bin + final String startBinPath = startModulePath + File.separator + "/src/main/bin/" + startShellName; + container.copyFileToContainer( + MountableFile.forHostPath(startBinPath), + Paths.get(Paths.get(seatunnelHomeInContainer, "bin").toString(), startShellName).toString()); + + // copy plugin-mapping.properties + container.copyFileToContainer( + MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"), + Paths.get(Paths.get(seatunnelHomeInContainer, "connectors").toString(), PLUGIN_MAPPING_FILE).toString()); + } + + public static String adaptPathForWin(String path) { + // Running IT use cases under Windows requires replacing \ with / + return path == null ? "" : path.replaceAll("\\\\", "/"); + } + + private static List getConnectorFiles(File currentModule, Set connectorNames, String connectorPrefix) { + List connectorFiles = new ArrayList<>(); + for (File file : Objects.requireNonNull(currentModule.listFiles())) { + getConnectorFiles(file, connectorNames, connectorPrefix, connectorFiles); + } + return connectorFiles; + } + + private static void getConnectorFiles(File currentModule, Set connectorNames, String connectorPrefix, List connectors) { + if (currentModule.isFile() || connectorNames.size() == connectors.size()) { + return; + } + if (connectorNames.contains(currentModule.getName())) { + File targetPath = new File(currentModule.getAbsolutePath() + File.separator + "target"); + for (File file : Objects.requireNonNull(targetPath.listFiles())) { + if (file.getName().startsWith(currentModule.getName()) && !file.getName().endsWith("javadoc.jar")) { + connectors.add(file); + return; + } + } + } + + if (currentModule.getName().startsWith(connectorPrefix)) { + for (File file : Objects.requireNonNull(currentModule.listFiles())) { + getConnectorFiles(file, connectorNames, connectorPrefix, connectors); + } + } + } + + private static Set getConnectors(Config jobConfig, Config connectorsMap, String pluginType) { + List connectorConfigList = jobConfig.getConfigList(pluginType); + Map connectors = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + ReadonlyConfig.fromConfig(connectorsMap.getConfig(pluginType)).toMap(connectors); + return connectorConfigList.stream() + .map(config -> config.getString("plugin_name")) + .filter(connectors::containsKey) + .map(connectors::get) + .collect(Collectors.toSet()); + } + + public static Path getCurrentModulePath() { + return Paths.get(System.getProperty("user.dir")); + } + + private static File getConfigFile(String confFile) { + File file = new File(getCurrentModulePath() + "/src/test/resources" + confFile); + if (file.exists()) { + return file; + } + throw new IllegalArgumentException(confFile + " doesn't exist"); + } + + private static Config getConfig(File file) { + return ConfigFactory + .parseFile(file) + .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true)) + .resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true)); + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/pom.xml index 57a12fa6cca8..d2f48c420095 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/pom.xml @@ -25,9 +25,14 @@ connector-flink-e2e-base - - 2.4 - + + + org.apache.seatunnel + seatunnel-e2e-common + ${project.version} + test-jar + + diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java index 4fe612a09c0c..6ecaa77cbb93 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java @@ -17,152 +17,42 @@ package org.apache.seatunnel.e2e.flink; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.MountableFile; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.stream.Stream; +import org.apache.seatunnel.e2e.common.AbstractFlinkContainer; /** * This class is the base class of FlinkEnvironment test for new seatunnel connector API. * The before method will create a Flink cluster, and after method will close the Flink cluster. * You can use {@link FlinkContainer#executeSeaTunnelFlinkJob} to submit a seatunnel config and run a seatunnel job. */ -public abstract class FlinkContainer { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class); - - private static final String FLINK_DOCKER_IMAGE = "tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27"; - protected static final Network NETWORK = Network.newNetwork(); - - protected GenericContainer jobManager; - protected GenericContainer taskManager; - private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent(); - private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-flink-new-connector.sh"; - private static final String SEATUNNEL_FLINK_JAR = "seatunnel-flink-starter.jar"; - private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties"; - private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel"; - private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString(); - private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, "lib").toString(); - private static final String SEATUNNEL_CONNECTORS = Paths.get(SEATUNNEL_HOME, "connectors").toString(); - - private static final int WAIT_FLINK_JOB_SUBMIT = 5000; - - private static final String FLINK_PROPERTIES = String.join( - "\n", - Arrays.asList( - "jobmanager.rpc.address: jobmanager", - "taskmanager.numberOfTaskSlots: 10", - "parallelism.default: 4", - "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false")); - - @BeforeEach - public void before() { - jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE) - .withCommand("jobmanager") - .withNetwork(NETWORK) - .withNetworkAliases("jobmanager") - .withExposedPorts() - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .withLogConsumer(new Slf4jLogConsumer(LOG)); +public abstract class FlinkContainer extends AbstractFlinkContainer { - taskManager = - new GenericContainer<>(FLINK_DOCKER_IMAGE) - .withCommand("taskmanager") - .withNetwork(NETWORK) - .withNetworkAliases("taskmanager") - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .dependsOn(jobManager) - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - Startables.deepStart(Stream.of(jobManager)).join(); - Startables.deepStart(Stream.of(taskManager)).join(); - copySeaTunnelFlinkFile(); - LOG.info("Flink containers are started."); + @Override + protected String getDockerImage() { + return "tyrantlucifer/flink:1.13.6-scala_2.11_hadoop27"; } - @AfterEach - public void close() { - if (taskManager != null) { - taskManager.stop(); - } - if (jobManager != null) { - jobManager.stop(); - } + @Override + protected String getStartModulePath() { + return "seatunnel-flink-starter"; } - public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOException, InterruptedException { - final String confPath = getResource(confFile); - if (!new File(confPath).exists()) { - throw new IllegalArgumentException(confFile + " doesn't exist"); - } - final String targetConfInContainer = Paths.get("/tmp", confFile).toString(); - jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer); - - // Running IT use cases under Windows requires replacing \ with / - String conf = targetConfInContainer.replaceAll("\\\\", "/"); - String binPath = Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_FLINK_BIN).toString().replaceAll("\\\\", "/"); - final List command = new ArrayList<>(); - command.add(binPath); - command.add("--config " + conf); - - Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command)); - LOG.info(execResult.getStdout()); - LOG.error(execResult.getStderr()); - // wait job start - Thread.sleep(WAIT_FLINK_JOB_SUBMIT); - return execResult; + @Override + protected String getStartShellName() { + return "start-seatunnel-flink-new-connector.sh"; } - protected void copySeaTunnelFlinkFile() { - // copy lib - String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH - + "/seatunnel-core/seatunnel-flink-starter/target/" + SEATUNNEL_FLINK_JAR; - jobManager.copyFileToContainer( - MountableFile.forHostPath(seatunnelCoreFlinkJarPath), - Paths.get(SEATUNNEL_LIB, SEATUNNEL_FLINK_JAR).toString()); - - // copy bin - String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-flink-starter/src/main/bin/" + SEATUNNEL_FLINK_BIN; - jobManager.copyFileToContainer( - MountableFile.forHostPath(seatunnelFlinkBinPath), - Paths.get(SEATUNNEL_BIN, SEATUNNEL_FLINK_BIN).toString()); - - // copy connectors - File jars = new File(PROJECT_ROOT_PATH + - "/seatunnel-connectors-v2-dist/target/lib"); - Arrays.stream(Objects.requireNonNull(jars.listFiles(f -> f.getName().startsWith("connector-")))) - .forEach(jar -> - jobManager.copyFileToContainer( - MountableFile.forHostPath(jar.getAbsolutePath()), - getConnectorPath(jar.getName()))); - - // copy plugin-mapping.properties - jobManager.copyFileToContainer( - MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"), - Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString()); + @Override + protected String getConnectorType() { + return "seatunnel"; } - private String getResource(String confFile) { - return System.getProperty("user.dir") + "/src/test/resources" + confFile; + @Override + protected String getConnectorModulePath() { + return "seatunnel-connectors-v2"; } - private String getConnectorPath(String fileName) { - return Paths.get(SEATUNNEL_CONNECTORS, "seatunnel", fileName).toString(); + @Override + protected String getConnectorNamePrefix() { + return "connector-"; } } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml index 4f5f037dd7db..045579f37016 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml @@ -41,7 +41,7 @@ org.apache.seatunnel - seatunnel-core-flink + seatunnel-flink-starter ${project.version} test diff --git a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/pom.xml index 32f26486dfad..7968e8526bd5 100644 --- a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/pom.xml @@ -25,10 +25,14 @@ seatunnel-connector-flink-e2e-base - - 2.4 - - + + + org.apache.seatunnel + seatunnel-e2e-common + ${project.version} + test-jar + + diff --git a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java index f3308dead785..a1ec702690e0 100644 --- a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java +++ b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java @@ -17,152 +17,37 @@ package org.apache.seatunnel.e2e.flink; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.MountableFile; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.stream.Stream; +import org.apache.seatunnel.e2e.common.AbstractFlinkContainer; /** * This class is the base class of FlinkEnvironment test. * The before method will create a Flink cluster, and after method will close the Flink cluster. * You can use {@link FlinkContainer#executeSeaTunnelFlinkJob} to submit a seatunnel config and run a seatunnel job. */ -public abstract class FlinkContainer { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class); - - private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11"; - protected static final Network NETWORK = Network.newNetwork(); - - protected GenericContainer jobManager; - protected GenericContainer taskManager; - private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent(); - private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-flink.sh"; - private static final String SEATUNNEL_FLINK_JAR = "seatunnel-core-flink.jar"; - private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties"; - private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel"; - private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString(); - private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, "lib").toString(); - private static final String SEATUNNEL_CONNECTORS = Paths.get(SEATUNNEL_HOME, "connectors").toString(); - - private static final int WAIT_FLINK_JOB_SUBMIT = 5000; - - private static final String FLINK_PROPERTIES = String.join( - "\n", - Arrays.asList( - "jobmanager.rpc.address: jobmanager", - "taskmanager.numberOfTaskSlots: 10", - "parallelism.default: 4", - "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false")); - - @BeforeEach - public void before() { - jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE) - .withCommand("jobmanager") - .withNetwork(NETWORK) - .withNetworkAliases("jobmanager") - .withExposedPorts() - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - taskManager = - new GenericContainer<>(FLINK_DOCKER_IMAGE) - .withCommand("taskmanager") - .withNetwork(NETWORK) - .withNetworkAliases("taskmanager") - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .dependsOn(jobManager) - .withLogConsumer(new Slf4jLogConsumer(LOG)); +public abstract class FlinkContainer extends AbstractFlinkContainer { - Startables.deepStart(Stream.of(jobManager)).join(); - Startables.deepStart(Stream.of(taskManager)).join(); - copySeaTunnelFlinkFile(); - LOG.info("Flink containers are started."); + @Override + protected String getStartModulePath() { + return "seatunnel-core-flink"; } - @AfterEach - public void close() { - if (taskManager != null) { - taskManager.stop(); - } - if (jobManager != null) { - jobManager.stop(); - } + @Override + protected String getStartShellName() { + return "start-seatunnel-flink.sh"; } - public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOException, InterruptedException { - final String confPath = getResource(confFile); - if (!new File(confPath).exists()) { - throw new IllegalArgumentException(confFile + " doesn't exist"); - } - final String targetConfInContainer = Paths.get("/tmp", confFile).toString(); - jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer); - - // Running IT use cases under Windows requires replacing \ with / - String conf = targetConfInContainer.replaceAll("\\\\", "/"); - final List command = new ArrayList<>(); - command.add(Paths.get(SEATUNNEL_HOME, "bin/start-seatunnel-flink.sh").toString()); - command.add("--config " + conf); - - Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command)); - LOG.info(execResult.getStdout()); - LOG.error(execResult.getStderr()); - // wait job start - Thread.sleep(WAIT_FLINK_JOB_SUBMIT); - return execResult; - } - - protected void copySeaTunnelFlinkFile() { - // copy lib - String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH - + "/seatunnel-core/seatunnel-core-flink/target/seatunnel-core-flink.jar"; - jobManager.copyFileToContainer( - MountableFile.forHostPath(seatunnelCoreFlinkJarPath), - Paths.get(SEATUNNEL_LIB, SEATUNNEL_FLINK_JAR).toString()); - - // copy bin - String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh"; - jobManager.copyFileToContainer( - MountableFile.forHostPath(seatunnelFlinkBinPath), - Paths.get(SEATUNNEL_BIN, SEATUNNEL_FLINK_BIN).toString()); - - // copy connectors - File jars = new File(PROJECT_ROOT_PATH + - "/seatunnel-connectors/seatunnel-connectors-flink-dist/target/lib"); - Arrays.stream(Objects.requireNonNull(jars.listFiles(f -> f.getName().startsWith("seatunnel-connector-flink")))) - .forEach(jar -> - jobManager.copyFileToContainer( - MountableFile.forHostPath(jar.getAbsolutePath()), - getConnectorPath(jar.getName()))); - - // copy plugin-mapping.properties - jobManager.copyFileToContainer( - MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"), - Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString()); + @Override + protected String getConnectorType() { + return "seatunnel"; } - private String getResource(String confFile) { - return System.getProperty("user.dir") + "/src/test/resources" + confFile; + @Override + protected String getConnectorModulePath() { + return "seatunnel-connectors/seatunnel-connectors-flink"; } - private String getConnectorPath(String fileName) { - return Paths.get(SEATUNNEL_CONNECTORS, "flink", fileName).toString(); + @Override + protected String getConnectorNamePrefix() { + return "seatunnel-connector-flink-"; } - } diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml index 2cd8cacdb0cf..5de92cec77d8 100644 --- a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/pom.xml @@ -25,9 +25,14 @@ setunnel-connector-flink-sql-e2e-base - - 2.4 - + + + org.apache.seatunnel + seatunnel-e2e-common + ${project.version} + test-jar + + diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java index 082500856ca1..82a7159c17ce 100644 --- a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java +++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-e2e-base/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java @@ -17,128 +17,37 @@ package org.apache.seatunnel.e2e.flink.sql; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.MountableFile; - -import java.io.File; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Stream; +import org.apache.seatunnel.e2e.common.AbstractFlinkContainer; /** * This class is the base class of FlinkEnvironment test. * The before method will create a Flink cluster, and after method will close the Flink cluster. - * You can use {@link FlinkContainer#executeSeaTunnelFlinkSqlJob(String)} to submit a seatunnel config and run a seatunnel job. + * You can use {@link FlinkContainer#executeSeaTunnelFlinkJob(String)} to submit a seatunnel config and run a seatunnel job. */ -public abstract class FlinkContainer { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkContainer.class); - - private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11"; - protected static final Network NETWORK = Network.newNetwork(); - - protected GenericContainer jobManager; - protected GenericContainer taskManager; - private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent(); - private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-sql.sh"; - private static final String SEATUNNEL_FLINK_SQL_JAR = "seatunnel-core-flink-sql.jar"; - private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel"; - private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_FLINK_BIN).toString(); - private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_FLINK_SQL_JAR).toString(); - - private static final int WAIT_FLINK_JOB_SUBMIT = 5000; - - private static final String FLINK_PROPERTIES = String.join( - "\n", - Arrays.asList( - "jobmanager.rpc.address: jobmanager", - "taskmanager.numberOfTaskSlots: 10", - "parallelism.default: 4", - "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false")); +public abstract class FlinkContainer extends AbstractFlinkContainer { - @BeforeEach - public void before() { - jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE) - .withCommand("jobmanager") - .withNetwork(NETWORK) - .withNetworkAliases("jobmanager") - .withExposedPorts() - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - taskManager = - new GenericContainer<>(FLINK_DOCKER_IMAGE) - .withCommand("taskmanager") - .withNetwork(NETWORK) - .withNetworkAliases("taskmanager") - .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES) - .dependsOn(jobManager) - .withLogConsumer(new Slf4jLogConsumer(LOG)); - - Startables.deepStart(Stream.of(jobManager)).join(); - Startables.deepStart(Stream.of(taskManager)).join(); - copySeaTunnelFlinkFile(); - LOG.info("Flink containers are started."); + @Override + protected String getStartModulePath() { + return "seatunnel-core-flink-sql"; } - @AfterEach - public void close() { - if (taskManager != null) { - taskManager.stop(); - } - if (jobManager != null) { - jobManager.stop(); - } + @Override + protected String getStartShellName() { + return "start-seatunnel-sql.sh"; } - public Container.ExecResult executeSeaTunnelFlinkSqlJob(String confFile) - throws IOException, InterruptedException, URISyntaxException { - final String confPath = Paths.get(FlinkContainer.class.getResource(confFile).toURI()).toString(); - if (!new File(confPath).exists()) { - throw new IllegalArgumentException(confFile + " doesn't exist"); - } - final String targetConfInContainer = Paths.get("/tmp", confFile).toString(); - jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer); - - // Running IT use cases under Windows requires replacing \ with / - String conf = targetConfInContainer.replaceAll("\\\\", "/"); - final List command = new ArrayList<>(); - command.add(Paths.get(SEATUNNEL_HOME, "bin/start-seatunnel-sql.sh").toString()); - command.add("--config " + conf); - - Container.ExecResult execResult = jobManager.execInContainer("bash", "-c", String.join(" ", command)); - LOG.info(execResult.getStdout()); - LOG.error(execResult.getStderr()); - // wait job start - Thread.sleep(WAIT_FLINK_JOB_SUBMIT); - return execResult; + @Override + protected String getConnectorType() { + return "flink-sql"; } - protected void copySeaTunnelFlinkFile() { - // copy lib - String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink-sql/target/" + SEATUNNEL_FLINK_SQL_JAR; - jobManager.copyFileToContainer( - MountableFile.forHostPath(seatunnelCoreFlinkJarPath), - FLINK_JAR_PATH); - - // copy bin - String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh"; - jobManager.copyFileToContainer( - MountableFile.forHostPath(seatunnelFlinkBinPath), - Paths.get(SEATUNNEL_BIN).toString()); + @Override + protected String getConnectorModulePath() { + return "seatunnel-connectors/seatunnel-connectors-flink-sql"; } + @Override + protected String getConnectorNamePrefix() { + return "flink-sql-connector-"; + } } diff --git a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-fake-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-fake-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java index 05c5eb09c4ac..b9f57920c682 100644 --- a/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-fake-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java +++ b/seatunnel-e2e/seatunnel-flink-sql-e2e/setunnel-connector-flink-sql-fake-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/fake/DatagenToConsoleIT.java @@ -24,14 +24,13 @@ import org.testcontainers.containers.Container; import java.io.IOException; -import java.net.URISyntaxException; public class DatagenToConsoleIT extends FlinkContainer { @Test - public void testDatagenToConsole() throws IOException, URISyntaxException, InterruptedException { + public void testDatagenToConsole() throws IOException, InterruptedException { final String configFile = "/fake/flink.sql.conf"; - Container.ExecResult execResult = executeSeaTunnelFlinkSqlJob(configFile); + Container.ExecResult execResult = executeSeaTunnelFlinkJob(configFile); Assertions.assertEquals(0, execResult.getExitCode()); } } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/pom.xml index cbc8495c5944..fcb3569ed31a 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/pom.xml @@ -25,9 +25,14 @@ connector-spark-e2e-base - - 2.4 - + + + org.apache.seatunnel + seatunnel-e2e-common + ${project.version} + test-jar + + diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java index 2f74483f0069..7ad7c69f5939 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java @@ -17,145 +17,36 @@ package org.apache.seatunnel.e2e.spark; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.MountableFile; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import org.apache.seatunnel.e2e.common.AbstractSparkContainer; /** * This class is the base class of SparkEnvironment test. The before method will create a Spark master, and after method will close the Spark master. * You can use {@link SparkContainer#executeSeaTunnelSparkJob} to submit a seatunnel conf and a seatunnel spark job. */ -public abstract class SparkContainer { - - private static final Logger LOG = LoggerFactory.getLogger(SparkContainer.class); - - private static final String SPARK_DOCKER_IMAGE = "bitnami/spark:2.4.3"; - public static final Network NETWORK = Network.newNetwork(); - - protected GenericContainer master; - private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent(); - private static final String SEATUNNEL_SPARK_BIN = "start-seatunnel-spark-new-connector.sh"; - private static final String SEATUNNEL_SPARK_JAR = "seatunnel-spark-starter.jar"; - private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties"; - private static final String SEATUNNEL_HOME = "/tmp/spark/seatunnel"; - private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString(); - private static final String SPARK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_SPARK_JAR).toString(); - private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, "connectors").toString(); - - private static final int WAIT_SPARK_JOB_SUBMIT = 5000; - - @BeforeEach - public void before() { - master = new GenericContainer<>(SPARK_DOCKER_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases("spark-master") - .withExposedPorts() - .withEnv("SPARK_MODE", "master") - .withLogConsumer(new Slf4jLogConsumer(LOG)); - // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to - // start a worker. - - Startables.deepStart(Stream.of(master)).join(); - copySeaTunnelSparkFile(); - LOG.info("Spark container started"); - } - - @AfterEach - public void close() { - if (master != null) { - master.stop(); - } - } - - public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException { - final String confPath = getResource(confFile); - if (!new File(confPath).exists()) { - throw new IllegalArgumentException(confFile + " doesn't exist"); - } - final String targetConfInContainer = Paths.get("/tmp", confFile).toString(); - master.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer); - - // TODO: use start-seatunnel-spark.sh to run the spark job. Need to modified the SparkStarter can find the seatunnel-core-spark.jar. - final List command = new ArrayList<>(); - String sparkBinPath = Paths.get(SEATUNNEL_HOME, "bin", SEATUNNEL_SPARK_BIN).toString(); - command.add(adaptPathForWin(sparkBinPath)); - command.add("--master"); - command.add("local"); - command.add("--deploy-mode"); - command.add("client"); - command.add("--config " + adaptPathForWin(targetConfInContainer)); - - Container.ExecResult execResult = master.execInContainer("bash", "-c", String.join(" ", command)); - LOG.info(execResult.getStdout()); - LOG.error(execResult.getStderr()); - // wait job start - Thread.sleep(WAIT_SPARK_JOB_SUBMIT); - return execResult; - } - - protected void copySeaTunnelSparkFile() { - // copy lib - String seatunnelCoreSparkJarPath = Paths.get(PROJECT_ROOT_PATH.toString(), - "seatunnel-core", "seatunnel-spark-starter", "target", SEATUNNEL_SPARK_JAR).toString(); - master.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreSparkJarPath), SPARK_JAR_PATH); - - // copy bin - String seatunnelSparkBinPath = Paths.get(PROJECT_ROOT_PATH.toString(), - "seatunnel-core", "seatunnel-spark-starter", "src", "main", "bin", SEATUNNEL_SPARK_BIN).toString(); - master.copyFileToContainer( - MountableFile.forHostPath(seatunnelSparkBinPath), - Paths.get(SEATUNNEL_BIN, SEATUNNEL_SPARK_BIN).toString()); - - // copy connectors - getConnectorJarFiles() - .forEach(jar -> - master.copyFileToContainer( - MountableFile.forHostPath(jar.getAbsolutePath()), - getConnectorPath(jar.getName()))); +public abstract class SparkContainer extends AbstractSparkContainer { - // copy plugin-mapping.properties - master.copyFileToContainer( - MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"), - Paths.get(CONNECTORS_PATH, PLUGIN_MAPPING_FILE).toString()); + @Override + protected String getStartModulePath() { + return "seatunnel-spark-starter"; } - private String getResource(String confFile) { - return System.getProperty("user.dir") + "/src/test/resources" + confFile; + @Override + protected String getStartShellName() { + return "start-seatunnel-spark-new-connector.sh"; } - private String getConnectorPath(String fileName) { - return Paths.get(CONNECTORS_PATH, "seatunnel", fileName).toString(); + @Override + protected String getConnectorType() { + return "seatunnel"; } - private List getConnectorJarFiles() { - File jars = new File(PROJECT_ROOT_PATH + - "/seatunnel-connectors-v2-dist/target/lib"); - return Arrays.stream( - Objects.requireNonNull( - jars.listFiles( - f -> f.getName().contains("connector-")))) - .collect(Collectors.toList()); + @Override + protected String getConnectorModulePath() { + return "seatunnel-connectors-v2"; } - private String adaptPathForWin(String path) { - return path == null ? "" : path.replaceAll("\\\\", "/"); + @Override + protected String getConnectorNamePrefix() { + return "connector-"; } } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml index a7a084bac140..b5aef0ec7862 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml @@ -39,7 +39,7 @@ org.apache.seatunnel - seatunnel-core-spark + seatunnel-spark-starter ${project.version} test diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/pom.xml b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/pom.xml index 7fa7dfaa0352..2ff76f7db643 100644 --- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/pom.xml @@ -25,9 +25,14 @@ seatunnel-connector-spark-e2e-base - - 2.4 - + + + org.apache.seatunnel + seatunnel-e2e-common + ${project.version} + test-jar + + diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java index 6dca627ac0c2..8811c06a4903 100644 --- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java +++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-e2e-base/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java @@ -17,141 +17,36 @@ package org.apache.seatunnel.e2e.spark; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.Network; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.MountableFile; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import org.apache.seatunnel.e2e.common.AbstractSparkContainer; /** * This class is the base class of SparkEnvironment test. The before method will create a Spark master, and after method will close the Spark master. * You can use {@link SparkContainer#executeSeaTunnelSparkJob} to submit a seatunnel conf and a seatunnel spark job. */ -public abstract class SparkContainer { - - private static final Logger LOG = LoggerFactory.getLogger(SparkContainer.class); - - private static final String SPARK_DOCKER_IMAGE = "bitnami/spark:2.4.3"; - public static final Network NETWORK = Network.newNetwork(); - - protected GenericContainer master; - private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent(); - private static final String SEATUNNEL_SPARK_BIN = "start-seatunnel-spark.sh"; - private static final String SEATUNNEL_SPARK_JAR = "seatunnel-core-spark.jar"; - private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties"; - private static final String SEATUNNEL_HOME = "/tmp/spark/seatunnel"; - private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString(); - private static final String SPARK_JAR_PATH = Paths.get(SEATUNNEL_HOME, "lib", SEATUNNEL_SPARK_JAR).toString(); - private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, "connectors").toString(); - - private static final int WAIT_SPARK_JOB_SUBMIT = 5000; - - @BeforeEach - public void before() { - master = new GenericContainer<>(SPARK_DOCKER_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases("spark-master") - .withExposedPorts() - .withEnv("SPARK_MODE", "master") - .withLogConsumer(new Slf4jLogConsumer(LOG)); - // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to - // start a worker. - - Startables.deepStart(Stream.of(master)).join(); - copySeaTunnelSparkFile(); - LOG.info("Spark container started"); - } +public abstract class SparkContainer extends AbstractSparkContainer { - @AfterEach - public void close() { - if (master != null) { - master.stop(); - } + @Override + protected String getStartModulePath() { + return "seatunnel-core-spark"; } - public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException { - final String confPath = getResource(confFile); - if (!new File(confPath).exists()) { - throw new IllegalArgumentException(confFile + " doesn't exist"); - } - final String targetConfInContainer = Paths.get("/tmp", confFile).toString(); - master.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer); - - // TODO: use start-seatunnel-spark.sh to run the spark job. Need to modified the SparkStarter can find the seatunnel-core-spark.jar. - // Running IT use cases under Windows requires replacing \ with / - String conf = targetConfInContainer.replaceAll("\\\\", "/"); - final List command = new ArrayList<>(); - command.add(Paths.get(SEATUNNEL_HOME, "bin/start-seatunnel-spark.sh").toString()); - command.add("--master"); - command.add("local"); - command.add("--deploy-mode"); - command.add("client"); - command.add("--config " + conf); - - Container.ExecResult execResult = master.execInContainer("bash", "-c", String.join(" ", command)); - LOG.info(execResult.getStdout()); - LOG.error(execResult.getStderr()); - // wait job start - Thread.sleep(WAIT_SPARK_JOB_SUBMIT); - return execResult; - } - - protected void copySeaTunnelSparkFile() { - // copy lib - String seatunnelCoreSparkJarPath = PROJECT_ROOT_PATH - + "/seatunnel-core/seatunnel-core-spark/target/seatunnel-core-spark.jar"; - master.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreSparkJarPath), SPARK_JAR_PATH); - - // copy bin - String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh"; - master.copyFileToContainer( - MountableFile.forHostPath(seatunnelFlinkBinPath), - Paths.get(SEATUNNEL_BIN, SEATUNNEL_SPARK_BIN).toString()); - - // copy connectors - getConnectorJarFiles() - .forEach(jar -> - master.copyFileToContainer( - MountableFile.forHostPath(jar.getAbsolutePath()), - getConnectorPath(jar.getName()))); - - // copy plugin-mapping.properties - master.copyFileToContainer( - MountableFile.forHostPath(PROJECT_ROOT_PATH + "/plugin-mapping.properties"), - Paths.get(CONNECTORS_PATH, PLUGIN_MAPPING_FILE).toString()); + @Override + protected String getStartShellName() { + return "start-seatunnel-spark.sh"; } - private String getResource(String confFile) { - return System.getProperty("user.dir") + "/src/test/resources" + confFile; + @Override + protected String getConnectorType() { + return "spark"; } - private String getConnectorPath(String fileName) { - return Paths.get(CONNECTORS_PATH, "spark", fileName).toString(); + @Override + protected String getConnectorModulePath() { + return "seatunnel-connectors/seatunnel-connectors-spark"; } - private List getConnectorJarFiles() { - File jars = new File(PROJECT_ROOT_PATH + - "/seatunnel-connectors/seatunnel-connectors-spark-dist/target/lib"); - return Arrays.stream( - Objects.requireNonNull( - jars.listFiles( - f -> f.getName().contains("seatunnel-connector-spark")))) - .collect(Collectors.toList()); + @Override + protected String getConnectorNamePrefix() { + return "seatunnel-connector-spark-"; } }