Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ST-Engine][Starter] Add seatunnel own engine starter and e2e #2690

Merged
merged 29 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b41ad33
[Engine][Example] Add seatunnel example module
Hisoka-X Sep 6, 2022
c174877
[Engine][Example] Add seatunnel e2e module
Hisoka-X Sep 6, 2022
581f576
[Engine] [Core] Add seatunnel engine example
Hisoka-X Sep 6, 2022
4798825
[Engine] [Core] Add seatunnel engine example
Hisoka-X Sep 6, 2022
ff0f019
[Engine] [Core] Add seatunnel engine starter
Hisoka-X Sep 7, 2022
2531fd2
[Engine] [Example] Add random cluster name when use local mode
Hisoka-X Sep 7, 2022
e259eab
[Engine] [Starter] Remove SeaTunnel engine Starter spark/flink depend…
Hisoka-X Sep 7, 2022
015118a
[Engine] [Example] Fix review
Hisoka-X Sep 7, 2022
0fcda62
Merge branch 'st-engine-example' into st-engine-starter
Hisoka-X Sep 7, 2022
eac73f9
[Engine] [Starter] Add seatunnel cluster and client entrance
Hisoka-X Sep 7, 2022
0ace7eb
Merge branch 'st-engine-e2e' into st-engine-starter
Hisoka-X Sep 8, 2022
bcb7659
[Engine] [Starter] Add seatunnel engine e2e
Hisoka-X Sep 8, 2022
247c196
Merge branch 'st-engine' into st-engine-starter
Hisoka-X Sep 8, 2022
882f0a5
[Engine] [Starter] Add seatunnel engine e2e
Hisoka-X Sep 8, 2022
51e176b
[Engine] [Starter] Add seatunnel engine e2e
Hisoka-X Sep 8, 2022
b130542
[Engine] [Starter] add miss class file
Hisoka-X Sep 9, 2022
a326c5b
[Engine] [Starter] fix ci problems
Hisoka-X Sep 9, 2022
00bc32e
[Engine] [Starter] rename e2e module
Hisoka-X Sep 9, 2022
1f34c74
[Engine] [Starter] fix license
Hisoka-X Sep 9, 2022
f51c633
[Engine] [Starter] fix license
Hisoka-X Sep 9, 2022
8e6d641
[Engine] [E2E] Change shell name in test container
Hisoka-X Sep 9, 2022
122ae8b
[Engine] [E2E] Fix ci problems
Hisoka-X Sep 13, 2022
9efca4b
Merge branch 'st-engine' into st-engine-starter
Hisoka-X Sep 13, 2022
0dd52e6
Merge branch 'st-engine' into st-engine-starter
Hisoka-X Sep 13, 2022
702da46
[Engine] [Committer] Add committer classloader
Hisoka-X Sep 13, 2022
cbc70de
[Starter] [Core] Change README.md
Hisoka-X Sep 14, 2022
f27f4d7
Merge branch 'st-engine' into st-engine-starter
Hisoka-X Sep 15, 2022
23680c9
[Starter] [Core] Resolved conflict
Hisoka-X Sep 15, 2022
cf23e12
Merge branch 'st-engine' into st-engine-starter
Hisoka-X Sep 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
<commons-io.version>2.11.0</commons-io.version>
<commons-collections4.version>4.4</commons-collections4.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<commons-compress.version>1.20</commons-compress.version>
<protostuff.version>1.8.0</protostuff.version>
<spark.scope>provided</spark.scope>
<flink.scope>provided</flink.scope>
Expand Down Expand Up @@ -629,6 +630,12 @@
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>${commons-compress.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
Expand Down
4 changes: 0 additions & 4 deletions seatunnel-apis/seatunnel-api-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-config-shade</artifactId>
</dependency>
</dependencies>

</project>
5 changes: 4 additions & 1 deletion seatunnel-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ This module is the seatunnel job entrypoint. Seatunnel jobs are started by the b

- seatunnel-core-flink: The flink job starter.
- seatunnel-core-flink-sql: The flink sql job starter.
- seatunnel-core-spark: The spark job starter.
- seatunnel-core-spark: The spark job starter.
EricJoy2048 marked this conversation as resolved.
Show resolved Hide resolved
- seatunnel-spark-starter: The spark job starter for connector-v2.
- seatunnel-flink-starter: The flink job starter for connector-v2.
- seatunnel-starter: The seatunnel engine job starter for connector-v2.
6 changes: 3 additions & 3 deletions seatunnel-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
<module>seatunnel-core-starter</module>
<module>seatunnel-flink-starter</module>
<module>seatunnel-spark-starter</module>
<module>seatunnel-seatunnel-starter</module>
<module>seatunnel-starter</module>
</modules>
</profile>
<profile>
<id>engine-all</id>
<modules>
<module>seatunnel-core-base</module>
<module>seatunnel-core-starter</module>
<module>seatunnel-seatunnel-starter</module>
<module>seatunnel-starter</module>
</modules>
</profile>
<profile>
Expand All @@ -65,7 +65,7 @@
<module>seatunnel-core-starter</module>
<module>seatunnel-flink-starter</module>
<module>seatunnel-spark-starter</module>
<module>seatunnel-seatunnel-starter</module>
<module>seatunnel-starter</module>
</modules>
</profile>
</profiles>
Expand Down
10 changes: 5 additions & 5 deletions seatunnel-core/seatunnel-core-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api-flink</artifactId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api-spark</artifactId>
<artifactId>seatunnel-plugin-discovery</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-plugin-discovery</artifactId>
<version>${project.version}</version>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>

<dependency>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.spark.SparkEnvironment;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand All @@ -31,39 +29,28 @@
*
* @param <ENVIRONMENT> environment type
*/
public class EnvironmentFactory<ENVIRONMENT extends RuntimeEnv> {
public abstract class EnvironmentFactory<ENVIRONMENT extends RuntimeEnv> {

private static final String PLUGIN_NAME_KEY = "plugin_name";

private final Config config;
private final EngineType engine;

public EnvironmentFactory(Config config, EngineType engine) {
public EnvironmentFactory(Config config) {
this.config = config;
this.engine = engine;
}

// todo:put this method into submodule to avoid dependency on the engine
public synchronized ENVIRONMENT getEnvironment() {
Config envConfig = config.getConfig("env");
boolean enableHive = checkIsContainHive();
ENVIRONMENT env;
switch (engine) {
case SPARK:
env = (ENVIRONMENT) new SparkEnvironment().setEnableHive(enableHive);
break;
case FLINK:
env = (ENVIRONMENT) new FlinkEnvironment();
break;
default:
throw new IllegalArgumentException("Engine: " + engine + " is not supported");
}
ENVIRONMENT env = newEnvironment();
env.setConfig(envConfig)
.setJobMode(getJobMode(envConfig)).prepare();
return env;
}

private boolean checkIsContainHive() {
protected abstract ENVIRONMENT newEnvironment();

protected boolean checkIsContainHive() {
List<? extends Config> sourceConfigList = config.getConfigList(PluginType.SOURCE.getType());
for (Config c : sourceConfigList) {
if (c.getString(PLUGIN_NAME_KEY).toLowerCase().contains("hive")) {
Expand Down

This file was deleted.

6 changes: 0 additions & 6 deletions seatunnel-core/seatunnel-flink-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-translation-flink</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.starter.Starter;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
import org.apache.seatunnel.core.starter.flink.utils.CommandLineUtils;

import java.util.List;
Expand All @@ -44,7 +43,7 @@ public class FlinkStarter implements Starter {
private final String appJar;

FlinkStarter(String[] args) {
this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args);
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode());
Common.setStarter(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder;
import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
import org.apache.seatunnel.core.starter.flink.utils.CommandLineUtils;

public class SeatunnelFlink {

public static void main(String[] args) throws CommandException {
FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args);
Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
.buildCommand(flinkCommandArgs);
Seatunnel.run(flinkCommand);
Expand Down
Loading