Skip to content

Commit

Permalink
Add plugin discovery module (#1881)
Browse files Browse the repository at this point in the history
* Add plugin discovery module
* Remove PluginFactory
  • Loading branch information
ruanwenjun authored May 15, 2022
1 parent af7f6e0 commit 5f79f45
Show file tree
Hide file tree
Showing 24 changed files with 863 additions and 285 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
<module>seatunnel-e2e</module>
<module>seatunnel-api</module>
<module>seatunnel-translation</module>
<module>seatunnel-plugin-discovery</module>
</modules>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ private Common() {
throw new IllegalStateException("Utility class");
}

/**
* Used to set the size when create a new collection(just to pass the checkstyle).
*/
public static final int COLLECTION_SIZE = 16;

private static final List<String> ALLOWED_MODES = Arrays.stream(DeployMode.values())
.map(DeployMode::getName).collect(Collectors.toList());
.map(DeployMode::getName).collect(Collectors.toList());

private static Optional<String> MODE = Optional.empty();

Expand Down
6 changes: 6 additions & 0 deletions seatunnel-core/seatunnel-core-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-plugin-discovery</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,35 @@
import org.apache.seatunnel.apis.base.api.BaseTransform;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* The ExecutionContext contains all configuration needed to run the job.
*
* @param <ENVIRONMENT> environment type.
*/
public class ExecutionContext<ENVIRONMENT extends RuntimeEnv> {
public abstract class AbstractExecutionContext<ENVIRONMENT extends RuntimeEnv> {

private final Config config;
private final EngineType engine;

private final ENVIRONMENT environment;
private final JobMode jobMode;
private final List<BaseSource<ENVIRONMENT>> sources;
private final List<BaseTransform<ENVIRONMENT>> transforms;
private final List<BaseSink<ENVIRONMENT>> sinks;

public ExecutionContext(Config config, EngineType engine) {
public AbstractExecutionContext(Config config, EngineType engine) {
this.config = config;
this.engine = engine;
this.environment = new EnvironmentFactory<ENVIRONMENT>(config, engine).getEnvironment();
this.jobMode = environment.getJobMode();
PluginFactory<ENVIRONMENT> pluginFactory = new PluginFactory<>(config, engine);
this.environment.registerPlugin(pluginFactory.getPluginJarPaths());
this.sources = pluginFactory.createPlugins(PluginType.SOURCE);
this.transforms = pluginFactory.createPlugins(PluginType.TRANSFORM);
this.sinks = pluginFactory.createPlugins(PluginType.SINK);
}

public Config getRootConfig() {
Expand All @@ -71,15 +69,26 @@ public JobMode getJobMode() {
return jobMode;
}

public List<BaseSource<ENVIRONMENT>> getSources() {
return sources;
}
public abstract List<BaseSource<ENVIRONMENT>> getSources();

public List<BaseTransform<ENVIRONMENT>> getTransforms() {
return transforms;
}
public abstract List<BaseTransform<ENVIRONMENT>> getTransforms();

public abstract List<BaseSink<ENVIRONMENT>> getSinks();

public abstract List<URL> getPluginJars();

public List<BaseSink<ENVIRONMENT>> getSinks() {
return sinks;
@SuppressWarnings("checkstyle:Indentation")
protected List<PluginIdentifier> getPluginIdentifiers(PluginType... pluginTypes) {
return Arrays.stream(pluginTypes).flatMap(new Function<PluginType, Stream<PluginIdentifier>>() {
@Override
public Stream<PluginIdentifier> apply(PluginType pluginType) {
List<? extends Config> configList = config.getConfigList(pluginType.getType());
return configList.stream()
.map(pluginConfig -> PluginIdentifier
.of(engine.getEngine(),
pluginType.getType(),
pluginConfig.getString("plugin_name")));
}
}).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
/**
* Used to build the {@link Config} from file.
*
* @param <ENVIRONMENT> environment type.
*/
public class ConfigBuilder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public class ExecutionFactory<ENVIRONMENT extends RuntimeEnv> {

private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionFactory.class);

public ExecutionContext<ENVIRONMENT> executionContext;
public AbstractExecutionContext<ENVIRONMENT> executionContext;

public ExecutionFactory(ExecutionContext<ENVIRONMENT> executionContext) {
public ExecutionFactory(AbstractExecutionContext<ENVIRONMENT> executionContext) {
this.executionContext = executionContext;
}

Expand Down

This file was deleted.

Loading

0 comments on commit 5f79f45

Please sign in to comment.