From 557c796294547164285eb746f069a64ab233dbe5 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Wed, 29 Jun 2022 15:55:39 +0800 Subject: [PATCH] [API-DRAFT] [MERGE] Fix obvious bugs that don't work properly before merge --- config/flink.batch.conf.template | 11 ++++---- config/spark.streaming.conf.template | 2 +- plugin-mapping.properties | 1 + pom.xml | 6 ++--- .../base/config/AbstractExecutionContext.java | 17 +++++------- .../seatunnel/core/spark/SparkStarter.java | 27 ++++++++++++++++--- .../src/main/assembly/assembly-bin.xml | 1 + 7 files changed, 42 insertions(+), 23 deletions(-) diff --git a/config/flink.batch.conf.template b/config/flink.batch.conf.template index 45b3e853272..65f614df0b6 100644 --- a/config/flink.batch.conf.template +++ b/config/flink.batch.conf.template @@ -26,12 +26,11 @@ env { source { # This is a example input plugin **only for test and demonstrate the feature input plugin** - FileSource { - path = "hdfs://localhost:9000/output/text" - format.type = "text" - schema = "string" - result_table_name = "test" - } + FakeSource { + result_table_name = "test" + field_name = "name,age" + } + # If you would like to get more information about how to configure seatunnel and see full list of input plugins, # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake diff --git a/config/spark.streaming.conf.template b/config/spark.streaming.conf.template index 3c87f56e895..b79a9dec64b 100644 --- a/config/spark.streaming.conf.template +++ b/config/spark.streaming.conf.template @@ -31,7 +31,7 @@ env { source { # This is a example input plugin **only for test and demonstrate the feature input plugin** - fakeStream { + FakeStream { content = ["Hello World, SeaTunnel"] } diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 01bb3abc419..5ba2e8121d0 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -45,6 +45,7 @@ flink.sink.Kafka = seatunnel-connector-flink-kafka spark.source.ElasticSearch = seatunnel-connector-spark-elasticsearch spark.source.Fake = seatunnel-connector-spark-fake +spark.source.FakeStream = seatunnel-connector-spark-fake spark.source.FeishuSheet = seatunnel-connector-spark-feishu spark.source.File = seatunnel-connector-spark-file spark.source.Hbase = seatunnel-connector-spark-hbase diff --git a/pom.xml b/pom.xml index 9d21a4343b7..d60f43735ad 100644 --- a/pom.xml +++ b/pom.xml @@ -82,15 +82,15 @@ seatunnel-core seatunnel-transforms seatunnel-connectors - seatunnel-dist + seatunnel-connectors-v2 + seatunnel-connectors-v2-dist seatunnel-examples seatunnel-e2e seatunnel-api seatunnel-translation seatunnel-plugin-discovery seatunnel-formats - seatunnel-connectors-v2 - seatunnel-connectors-v2-dist + seatunnel-dist diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java index e3ed3d72e6b..92d4232c350 100644 --- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java +++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/AbstractExecutionContext.java @@ -80,16 +80,13 @@ public JobMode getJobMode() { @SuppressWarnings("checkstyle:Indentation") protected List getPluginIdentifiers(PluginType... pluginTypes) { - return Arrays.stream(pluginTypes).flatMap(new Function>() { - @Override - public Stream apply(PluginType pluginType) { - List configList = config.getConfigList(pluginType.getType()); - return configList.stream() - .map(pluginConfig -> PluginIdentifier - .of(engine.getEngine(), - pluginType.getType(), - pluginConfig.getString("plugin_name"))); - } + return Arrays.stream(pluginTypes).flatMap((Function>) pluginType -> { + List configList = config.getConfigList(pluginType.getType()); + return configList.stream() + .map(pluginConfig -> PluginIdentifier + .of(engine.getEngine(), + pluginType.getType(), + pluginConfig.getString("plugin_name"))); }).collect(Collectors.toList()); } } diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java index 02678f76853..44c4454038b 100644 --- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java +++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java @@ -22,12 +22,15 @@ import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.core.base.Starter; import org.apache.seatunnel.core.base.config.ConfigBuilder; import org.apache.seatunnel.core.base.config.EngineType; import org.apache.seatunnel.core.base.utils.CompressionUtils; import org.apache.seatunnel.core.spark.args.SparkCommandArgs; -import org.apache.seatunnel.core.spark.config.SparkExecutionContext; +import org.apache.seatunnel.plugin.discovery.PluginIdentifier; +import org.apache.seatunnel.plugin.discovery.spark.SparkSinkPluginDiscovery; +import org.apache.seatunnel.plugin.discovery.spark.SparkSourcePluginDiscovery; import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; @@ -40,6 +43,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -50,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -220,8 +225,12 @@ private List getConnectorJarDependencies() { return Collections.emptyList(); } Config config = new ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig(); - SparkExecutionContext sparkExecutionContext = new SparkExecutionContext(config, EngineType.SPARK); - return sparkExecutionContext.getPluginJars().stream().map(url -> new File(url.getPath()).toPath()).collect(Collectors.toList()); + List pluginJars = new ArrayList<>(); + SparkSourcePluginDiscovery sparkSourcePluginDiscovery = new SparkSourcePluginDiscovery(); + SparkSinkPluginDiscovery sparkSinkPluginDiscovery = new SparkSinkPluginDiscovery(); + pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SOURCE))); + pluginJars.addAll(sparkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SINK))); + return pluginJars.stream().map(url -> new File(url.getPath()).toPath()).collect(Collectors.toList()); } /** @@ -313,6 +322,18 @@ protected void appendAppJar(List commands) { commands.add(Common.appLibDir().resolve("seatunnel-core-spark.jar").toString()); } + @SuppressWarnings("checkstyle:Indentation") + private List getPluginIdentifiers(Config config, PluginType... pluginTypes) { + return Arrays.stream(pluginTypes).flatMap((Function>) pluginType -> { + List configList = config.getConfigList(pluginType.getType()); + return configList.stream() + .map(pluginConfig -> PluginIdentifier + .of(EngineType.SPARK.getEngine(), + pluginType.getType(), + pluginConfig.getString("plugin_name"))); + }).collect(Collectors.toList()); + } + /** * a Starter for building spark-submit commands with client mode options */ diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml b/seatunnel-dist/src/main/assembly/assembly-bin.xml index 258d8b916b6..afd0c7150d9 100644 --- a/seatunnel-dist/src/main/assembly/assembly-bin.xml +++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml @@ -146,6 +146,7 @@ %regex[.*((javadoc)|(sources))\.jar] + connector-common*.jar /connectors/seatunnel