Skip to content

Commit

Permalink
[Core][Flink] refactor flink proxy source/sink
Browse files Browse the repository at this point in the history
  • Loading branch information
TyrantLucifer committed Aug 12, 2024
1 parent d46cf16 commit 31d06b2
Show file tree
Hide file tree
Showing 23 changed files with 103 additions and 724 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,16 @@ public void setTableId(String tableId) {
this.tableId = tableId;
}

/**
* The method will be removed in the future, please use {@link #setKind(RowKind)} instanced of
* it.
*/
@Deprecated
public void setRowKind(RowKind kind) {
setKind(kind);
}

public void setKind(RowKind kind) {
this.kind = kind;
}

Expand All @@ -62,7 +71,13 @@ public String getTableId() {
return tableId;
}

/** The method will be removed in the future, please use {@link #getKind()} instanced of it. */
@Deprecated
public RowKind getRowKind() {
return getKind();
}

public RowKind getKind() {
return this.kind;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID;

/** The util used for Spark/Flink to create to SeaTunnelSource etc. */
@SuppressWarnings("rawtypes")
public class PluginUtil {

protected static final String ENGINE_TYPE = "seatunnel";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

@Data
@AllArgsConstructor
@SuppressWarnings("rawtypes")
public class SourceTableInfo {

private SeaTunnelSource source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,6 @@

import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -50,7 +43,6 @@ public FlinkRuntimeEnvironment setConfig(Config config) {
@Override
public FlinkRuntimeEnvironment prepare() {
createStreamEnvironment();
createStreamTableEnvironment();
if (config.hasPath("job.name")) {
jobName = config.getString("job.name");
}
Expand All @@ -63,23 +55,6 @@ public FlinkRuntimeEnvironment setJobMode(JobMode jobMode) {
return this;
}

private void createStreamTableEnvironment() {
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
tableEnvironment =
StreamTableEnvironment.create(getStreamExecutionEnvironment(), environmentSettings);
TableConfig config = tableEnvironment.getConfig();
if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.MAX_STATE_RETENTION_TIME)
&& EnvironmentUtil.hasPathAndWaring(
this.config, ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
long max = this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
long min = this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
config.setIdleStateRetentionTime(Time.seconds(min), Time.seconds(max));
}
// init flink table env config
EnvironmentUtil.initTableEnvironmentConfiguration(this.config, config.getConfiguration());
}

public static FlinkRuntimeEnvironment getInstance(Config config) {
if (INSTANCE == null) {
synchronized (FlinkRuntimeEnvironment.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;

@SuppressWarnings({"unchecked", "rawtypes"})
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {

Expand Down Expand Up @@ -107,12 +108,15 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
sink.setJobContext(jobContext);
SeaTunnelRowType sourceType = stream.getCatalogTable().getSeaTunnelRowType();
// TODO support sink multi sink
SeaTunnelRowType sourceType =
stream.getCatalogTables().get(0).getSeaTunnelRowType();
sink.setTypeInfo(sourceType);
} else {
// TODO support sink multi sink
TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
stream.getCatalogTable(),
stream.getCatalogTables().get(0),
ReadonlyConfig.fromConfig(sinkConfig),
classLoader,
((TableSinkFactory) factory.get())
Expand All @@ -134,8 +138,8 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
}
DataStreamSink<Row> dataStreamSink =
stream.getDataStream()
.sinkTo(new FlinkSink<>(sink, stream.getCatalogTable()))
.name(sink.getPluginName());
.sinkTo(new FlinkSink<>(sink, stream.getCatalogTables().get(0)))
.name(String.format("%s-Sink", sink.getPluginName()));
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;
import org.apache.seatunnel.core.starter.flink.utils.TableUtil;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
Expand All @@ -37,11 +36,8 @@
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.TernaryBoolean;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -55,10 +51,8 @@
@Slf4j
public abstract class AbstractFlinkRuntimeEnvironment implements RuntimeEnvironment {

protected static final String RESULT_TABLE_NAME = "result_table_name";
protected Config config;
protected StreamExecutionEnvironment environment;
protected StreamTableEnvironment tableEnvironment;
protected JobMode jobMode;
protected String jobName = Constants.LOGO;

Expand All @@ -78,10 +72,6 @@ public CheckResult checkConfig() {
return EnvironmentUtil.checkRestartStrategy(config);
}

public StreamTableEnvironment getStreamTableEnvironment() {
return tableEnvironment;
}

public StreamExecutionEnvironment getStreamExecutionEnvironment() {
return environment;
}
Expand Down Expand Up @@ -228,14 +218,6 @@ private void setTimeCharacteristic() {
}
}

public void registerResultTable(Config config, DataStream<Row> dataStream, String name) {
StreamTableEnvironment tableEnvironment = this.getStreamTableEnvironment();
if (!TableUtil.tableExists(tableEnvironment, name)) {
tableEnvironment.createTemporaryView(
name, tableEnvironment.fromChangelogStream(dataStream));
}
}

public boolean isStreaming() {
return JobMode.STREAMING.equals(jobMode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@
package org.apache.seatunnel.core.starter.flink.execution;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.List;

@Data
@AllArgsConstructor
public class DataStreamTableInfo {

private DataStream<Row> dataStream;
private DataStream<SeaTunnelRow> dataStream;

private CatalogTable catalogTable;
private List<CatalogTable> catalogTables;

private String tableName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,13 @@
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
import org.apache.seatunnel.core.starter.flink.utils.TableUtil;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;

import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;

public abstract class FlinkAbstractPluginExecuteProcessor<T>
implements PluginExecuteProcessor<DataStreamTableInfo, FlinkRuntimeEnvironment> {

Expand Down Expand Up @@ -84,10 +76,7 @@ public void setRuntimeEnvironment(FlinkRuntimeEnvironment flinkRuntimeEnvironmen
protected Optional<DataStreamTableInfo> fromSourceTable(
Config pluginConfig, List<DataStreamTableInfo> upstreamDataStreams) {
if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
StreamTableEnvironment tableEnvironment =
flinkRuntimeEnvironment.getStreamTableEnvironment();
String tableName = pluginConfig.getString(SOURCE_TABLE_NAME);
Table table = tableEnvironment.from(tableName);
DataStreamTableInfo dataStreamTableInfo =
upstreamDataStreams.stream()
.filter(info -> tableName.equals(info.getTableName()))
Expand All @@ -99,20 +88,13 @@ protected Optional<DataStreamTableInfo> fromSourceTable(
"table %s not found", tableName)));
return Optional.of(
new DataStreamTableInfo(
TableUtil.tableToDataStream(tableEnvironment, table),
dataStreamTableInfo.getCatalogTable(),
dataStreamTableInfo.getDataStream(),
dataStreamTableInfo.getCatalogTables(),
tableName));
}
return Optional.empty();
}

protected void registerResultTable(Config pluginConfig, DataStream<Row> dataStream) {
if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
String resultTable = pluginConfig.getString(RESULT_TABLE_NAME.key());
flinkRuntimeEnvironment.registerResultTable(pluginConfig, dataStream, resultTable);
}
}

protected abstract List<T> initializePlugins(
List<URL> jarPaths, List<? extends Config> pluginConfigs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,6 @@

import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -50,7 +43,6 @@ public FlinkRuntimeEnvironment setConfig(Config config) {
@Override
public FlinkRuntimeEnvironment prepare() {
createStreamEnvironment();
createStreamTableEnvironment();
if (config.hasPath("job.name")) {
jobName = config.getString("job.name");
}
Expand All @@ -63,23 +55,6 @@ public FlinkRuntimeEnvironment setJobMode(JobMode jobMode) {
return this;
}

private void createStreamTableEnvironment() {
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().build();
tableEnvironment =
StreamTableEnvironment.create(getStreamExecutionEnvironment(), environmentSettings);
TableConfig config = tableEnvironment.getConfig();
if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.MAX_STATE_RETENTION_TIME)
&& EnvironmentUtil.hasPathAndWaring(
this.config, ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
long max = this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
long min = this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
config.setIdleStateRetentionTime(Time.seconds(min), Time.seconds(max));
}
// init flink table env config
EnvironmentUtil.initTableEnvironmentConfiguration(this.config, config.getConfiguration());
}

public static FlinkRuntimeEnvironment getInstance(Config config) {
if (INSTANCE == null) {
synchronized (FlinkRuntimeEnvironment.class) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;

@SuppressWarnings("unchecked,rawtypes")
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {

Expand Down Expand Up @@ -108,12 +109,15 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
sink.setJobContext(jobContext);
SeaTunnelRowType sourceType = stream.getCatalogTable().getSeaTunnelRowType();
// TODO sink support multi table
SeaTunnelRowType sourceType =
stream.getCatalogTables().get(0).getSeaTunnelRowType();
sink.setTypeInfo(sourceType);
} else {
// TODO sink support multi table
TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
stream.getCatalogTable(),
stream.getCatalogTables().get(0),
ReadonlyConfig.fromConfig(sinkConfig),
classLoader,
((TableSinkFactory) factory.get())
Expand All @@ -137,8 +141,9 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
stream.getDataStream()
.sinkTo(
SinkV1Adapter.wrap(
new FlinkSink<>(sink, stream.getCatalogTable())))
.name(sink.getPluginName());
new FlinkSink<>(
sink, stream.getCatalogTables().get(0))))
.name(String.format("%s-Sink", sink.getPluginName()));
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
Expand Down Expand Up @@ -71,21 +72,20 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
Config pluginConfig = pluginConfigs.get(i);
FlinkSource flinkSource = new FlinkSource<>(internalSource, envConfig);

DataStreamSource sourceStream =
DataStreamSource<SeaTunnelRow> sourceStream =
executionEnvironment.fromSource(
flinkSource,
WatermarkStrategy.noWatermarks(),
String.format("%s-source", internalSource.getPluginName()));
String.format("%s-Source", internalSource.getPluginName()));

if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key());
sourceStream.setParallelism(parallelism);
}
registerResultTable(pluginConfig, sourceStream);
sources.add(
new DataStreamTableInfo(
sourceStream,
sourceTableInfo.getCatalogTables().get(0),
sourceTableInfo.getCatalogTables(),
pluginConfig.hasPath(RESULT_TABLE_NAME.key())
? pluginConfig.getString(RESULT_TABLE_NAME.key())
: null));
Expand Down
Loading

0 comments on commit 31d06b2

Please sign in to comment.