Skip to content

Commit

Permalink
[Core][Flink] refactor flink proxy source/sink
Browse files Browse the repository at this point in the history
  • Loading branch information
TyrantLucifer committed Aug 9, 2024
1 parent 9216627 commit 84c8357
Show file tree
Hide file tree
Showing 18 changed files with 573 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public void setRowKind(RowKind kind) {
this.kind = kind;
}

public void setKind(RowKind kind) {
this.kind = kind;
}

public int getArity() {
return fields.length;
}
Expand All @@ -66,6 +70,10 @@ public RowKind getRowKind() {
return this.kind;
}

public RowKind getKind() {
return this.kind;
}

public Object[] getFields() {
return fields;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;

/** The util used for Spark/Flink to create to SeaTunnelSource etc. */
@SuppressWarnings("rawtypes")
public class PluginUtil {

protected static final String ENGINE_TYPE = "seatunnel";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

@Data
@AllArgsConstructor
@SuppressWarnings("rawtypes")
public class SourceTableInfo {

private SeaTunnelSource source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,48 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName;
import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.TernaryBoolean;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FlinkRuntimeEnvironment extends AbstractFlinkRuntimeEnvironment
implements RuntimeEnvironment {
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

@Slf4j
public class FlinkRuntimeEnvironment implements RuntimeEnvironment {
private static volatile FlinkRuntimeEnvironment INSTANCE = null;
private Config config;

private StreamExecutionEnvironment environment;

private JobMode jobMode;

private String jobName = Constants.LOGO;

private FlinkRuntimeEnvironment(Config config) {
super(config);
this.initialize(config);
}

@Override
Expand All @@ -47,37 +69,235 @@ public FlinkRuntimeEnvironment setConfig(Config config) {
return this;
}

@Override
public Config getConfig() {
return config;
}

@Override
public CheckResult checkConfig() {
return EnvironmentUtil.checkRestartStrategy(config);
}

@Override
public FlinkRuntimeEnvironment prepare() {
createStreamEnvironment();
createStreamTableEnvironment();
if (config.hasPath("job.name")) {
jobName = config.getString("job.name");
}
return this;
}

public String getJobName() {
return jobName;
}

public boolean isStreaming() {
return JobMode.STREAMING.equals(jobMode);
}

@Override
public FlinkRuntimeEnvironment setJobMode(JobMode jobMode) {
this.jobMode = jobMode;
return this;
}

private void createStreamTableEnvironment() {
EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
tableEnvironment =
StreamTableEnvironment.create(getStreamExecutionEnvironment(), environmentSettings);
TableConfig config = tableEnvironment.getConfig();
if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.MAX_STATE_RETENTION_TIME)
&& EnvironmentUtil.hasPathAndWaring(
this.config, ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
long max = this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
long min = this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
config.setIdleStateRetentionTime(Time.seconds(min), Time.seconds(max));
@Override
public JobMode getJobMode() {
return jobMode;
}

@Override
public void registerPlugin(List<URL> pluginPaths) {
pluginPaths.forEach(url -> log.info("register plugins : {}", url));
List<Configuration> configurations = new ArrayList<>();
try {
configurations.add(
(Configuration)
Objects.requireNonNull(
ReflectionUtils.getDeclaredMethod(
StreamExecutionEnvironment.class,
"getConfiguration"))
.orElseThrow(
() ->
new RuntimeException(
"can't find "
+ "method: getConfiguration"))
.invoke(this.environment));
} catch (Exception e) {
throw new RuntimeException(e);
}
configurations.forEach(
configuration -> {
List<String> jars = configuration.get(PipelineOptions.JARS);
if (jars == null) {
jars = new ArrayList<>();
}
jars.addAll(
pluginPaths.stream().map(URL::toString).collect(Collectors.toList()));
configuration.set(
PipelineOptions.JARS,
jars.stream().distinct().collect(Collectors.toList()));
List<String> classpath = configuration.get(PipelineOptions.CLASSPATHS);
if (classpath == null) {
classpath = new ArrayList<>();
}
classpath.addAll(
pluginPaths.stream().map(URL::toString).collect(Collectors.toList()));
configuration.set(
PipelineOptions.CLASSPATHS,
classpath.stream().distinct().collect(Collectors.toList()));
});
}

public StreamExecutionEnvironment getStreamExecutionEnvironment() {
return environment;
}

private void createStreamEnvironment() {
Configuration configuration = new Configuration();
EnvironmentUtil.initConfiguration(config, configuration);
environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
setTimeCharacteristic();

setCheckpoint();

EnvironmentUtil.setRestartStrategy(config, environment.getConfig());

if (EnvironmentUtil.hasPathAndWaring(config, ConfigKeyName.BUFFER_TIMEOUT_MILLIS)) {
long timeout = config.getLong(ConfigKeyName.BUFFER_TIMEOUT_MILLIS);
environment.setBufferTimeout(timeout);
}

if (config.hasPath(EnvCommonOptions.PARALLELISM.key())) {
int parallelism = config.getInt(EnvCommonOptions.PARALLELISM.key());
environment.setParallelism(parallelism);
} else if (config.hasPath(ConfigKeyName.PARALLELISM)) {
log.warn(
"the parameter 'execution.parallelism' will be deprecated, please use common parameter 'parallelism' to set it");
int parallelism = config.getInt(ConfigKeyName.PARALLELISM);
environment.setParallelism(parallelism);
}

if (EnvironmentUtil.hasPathAndWaring(config, ConfigKeyName.MAX_PARALLELISM)) {
int max = config.getInt(ConfigKeyName.MAX_PARALLELISM);
environment.setMaxParallelism(max);
}

if (this.jobMode.equals(JobMode.BATCH)) {
environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
}
}

private void setTimeCharacteristic() {
if (EnvironmentUtil.hasPathAndWaring(config, ConfigKeyName.TIME_CHARACTERISTIC)) {
String timeType = config.getString(ConfigKeyName.TIME_CHARACTERISTIC);
switch (timeType.toLowerCase()) {
case "event-time":
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
break;
case "ingestion-time":
environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
break;
case "processing-time":
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
break;
default:
log.warn(
"set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time",
timeType);
break;
}
}
}

private void setCheckpoint() {
if (jobMode == JobMode.BATCH) {
log.warn(
"Disabled Checkpointing. In flink execution environment, checkpointing is not supported and not needed when executing jobs in BATCH mode");
}
long interval = 0;
if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
interval = config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
} else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
log.warn(
"the parameter 'execution.checkpoint.interval' will be deprecated, please use common parameter 'checkpoint.interval' to set it");
interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
}

if (interval > 0) {
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
environment.enableCheckpointing(interval);

if (EnvironmentUtil.hasPathAndWaring(config, ConfigKeyName.CHECKPOINT_MODE)) {
String mode = config.getString(ConfigKeyName.CHECKPOINT_MODE);
switch (mode.toLowerCase()) {
case "exactly-once":
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
break;
case "at-least-once":
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
break;
default:
log.warn(
"set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once",
mode);
break;
}
}

if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
long timeout = config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
checkpointConfig.setCheckpointTimeout(timeout);
} else if (EnvironmentUtil.hasPathAndWaring(config, ConfigKeyName.CHECKPOINT_TIMEOUT)) {
long timeout = config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
checkpointConfig.setCheckpointTimeout(timeout);
}

if (EnvironmentUtil.hasPathAndWaring(config, ConfigKeyName.CHECKPOINT_DATA_URI)) {
String uri = config.getString(ConfigKeyName.CHECKPOINT_DATA_URI);
StateBackend fsStateBackend = new FsStateBackend(uri);
if (EnvironmentUtil.hasPathAndWaring(config, ConfigKeyName.STATE_BACKEND)) {
String stateBackend = config.getString(ConfigKeyName.STATE_BACKEND);
if ("rocksdb".equalsIgnoreCase(stateBackend)) {
StateBackend rocksDBStateBackend =
new RocksDBStateBackend(fsStateBackend, TernaryBoolean.TRUE);
environment.setStateBackend(rocksDBStateBackend);
}
} else {
environment.setStateBackend(fsStateBackend);
}
}

if (EnvironmentUtil.hasPathAndWaring(
config, ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) {
int max = config.getInt(ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS);
checkpointConfig.setMaxConcurrentCheckpoints(max);
}

if (EnvironmentUtil.hasPathAndWaring(config, ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) {
boolean cleanup = config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE);
if (cleanup) {
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
} else {
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
}

if (EnvironmentUtil.hasPathAndWaring(
config, ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) {
long minPause = config.getLong(ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS);
checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
}

if (EnvironmentUtil.hasPathAndWaring(
config, ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS)) {
int failNum = config.getInt(ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS);
checkpointConfig.setTolerableCheckpointFailureNumber(failNum);
}
}
// init flink table env config
EnvironmentUtil.initTableEnvironmentConfiguration(this.config, config.getConfiguration());
}

public static FlinkRuntimeEnvironment getInstance(Config config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;

@SuppressWarnings({"unchecked", "rawtypes"})
public class SinkExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<Optional<? extends Factory>> {

Expand Down Expand Up @@ -107,12 +108,14 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
sinkConfig.getString(PLUGIN_NAME.key())),
sinkConfig);
sink.setJobContext(jobContext);
SeaTunnelRowType sourceType = stream.getCatalogTable().getSeaTunnelRowType();
// TODO support sink multi sink
SeaTunnelRowType sourceType = stream.getCatalogTable().get(0).getSeaTunnelRowType();
sink.setTypeInfo(sourceType);
} else {
// TODO support sink multi sink
TableSinkFactoryContext context =
TableSinkFactoryContext.replacePlaceholderAndCreate(
stream.getCatalogTable(),
stream.getCatalogTable().get(0),
ReadonlyConfig.fromConfig(sinkConfig),
classLoader,
((TableSinkFactory) factory.get())
Expand All @@ -134,7 +137,7 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
}
DataStreamSink<Row> dataStreamSink =
stream.getDataStream()
.sinkTo(new FlinkSink<>(sink, stream.getCatalogTable()))
.sinkTo(new FlinkSink<>(sink, stream.getCatalogTable().get(0)))
.name(sink.getPluginName());
if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
int parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key());
Expand Down
Loading

0 comments on commit 84c8357

Please sign in to comment.