From 31d06b2538257f0bd87d6489d43ab8fdeed66416 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Sun, 11 Aug 2024 09:51:07 +0800 Subject: [PATCH] [Core][Flink] refactor flink proxy source/sink --- .../api/table/type/SeaTunnelRow.java | 15 ++ .../core/starter/execution/PluginUtil.java | 1 + .../starter/execution/SourceTableInfo.java | 1 + .../execution/FlinkRuntimeEnvironment.java | 25 --- .../flink/execution/SinkExecuteProcessor.java | 12 +- .../AbstractFlinkRuntimeEnvironment.java | 18 -- .../flink/execution/DataStreamTableInfo.java | 8 +- .../FlinkAbstractPluginExecuteProcessor.java | 22 +- .../execution/FlinkRuntimeEnvironment.java | 25 --- .../flink/execution/SinkExecuteProcessor.java | 13 +- .../execution/SourceExecuteProcessor.java | 8 +- .../execution/TransformExecuteProcessor.java | 53 ++--- .../engine/e2e/UnifyEnvParameterIT.java | 16 -- .../pom.xml | 6 + .../resources/examples/fake_to_console.conf | 12 +- .../discovery/AbstractPluginDiscovery.java | 1 + .../flink/utils/TypeConverterUtilsTest.java | 161 -------------- .../serialization/FlinkRowConverter.java | 154 ------------- .../flink/sink/FlinkSinkWriter.java | 16 +- .../flink/source/FlinkRowCollector.java | 21 +- .../translation/flink/source/FlinkSource.java | 17 +- .../flink/source/FlinkSourceReader.java | 12 +- .../flink/utils/TypeConverterUtils.java | 210 ------------------ 23 files changed, 103 insertions(+), 724 deletions(-) delete mode 100644 seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java delete mode 100644 seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java delete mode 100644 seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java index 95a36b796c4..11388dbb6a7 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java @@ -50,7 +50,16 @@ public void setTableId(String tableId) { this.tableId = tableId; } + /** + * The method will be removed in the future, please use {@link #setKind(RowKind)} instanced of + * it. + */ + @Deprecated public void setRowKind(RowKind kind) { + setKind(kind); + } + + public void setKind(RowKind kind) { this.kind = kind; } @@ -62,7 +71,13 @@ public String getTableId() { return tableId; } + /** The method will be removed in the future, please use {@link #getKind()} instanced of it. */ + @Deprecated public RowKind getRowKind() { + return getKind(); + } + + public RowKind getKind() { return this.kind; } diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java index 166e581e2d9..c47ea0b1215 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/PluginUtil.java @@ -50,6 +50,7 @@ import static org.apache.seatunnel.api.table.factory.FactoryUtil.DEFAULT_ID; /** The util used for Spark/Flink to create to SeaTunnelSource etc. */ +@SuppressWarnings("rawtypes") public class PluginUtil { protected static final String ENGINE_TYPE = "seatunnel"; diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java index 529b9b42078..43642f57352 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/SourceTableInfo.java @@ -27,6 +27,7 @@ @Data @AllArgsConstructor +@SuppressWarnings("rawtypes") public class SourceTableInfo { private SeaTunnelSource source; diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index fcc25a6b9e6..e3428c751e6 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -21,13 +21,6 @@ import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment; -import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName; -import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import lombok.extern.slf4j.Slf4j; @@ -50,7 +43,6 @@ public FlinkRuntimeEnvironment setConfig(Config config) { @Override public FlinkRuntimeEnvironment prepare() { createStreamEnvironment(); - createStreamTableEnvironment(); if (config.hasPath("job.name")) { jobName = config.getString("job.name"); } @@ -63,23 +55,6 @@ public FlinkRuntimeEnvironment setJobMode(JobMode jobMode) { return this; } - private void createStreamTableEnvironment() { - EnvironmentSettings environmentSettings = - EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); - tableEnvironment = - StreamTableEnvironment.create(getStreamExecutionEnvironment(), environmentSettings); - TableConfig config = tableEnvironment.getConfig(); - if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.MAX_STATE_RETENTION_TIME) - && EnvironmentUtil.hasPathAndWaring( - this.config, ConfigKeyName.MIN_STATE_RETENTION_TIME)) { - long max = this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME); - long min = this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME); - config.setIdleStateRetentionTime(Time.seconds(min), Time.seconds(max)); - } - // init flink table env config - EnvironmentUtil.initTableEnvironmentConfiguration(this.config, config.getConfiguration()); - } - public static FlinkRuntimeEnvironment getInstance(Config config) { if (INSTANCE == null) { synchronized (FlinkRuntimeEnvironment.class) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index 6a272aadb21..51586beaf0f 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -51,6 +51,7 @@ import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; +@SuppressWarnings({"unchecked", "rawtypes"}) public class SinkExecuteProcessor extends FlinkAbstractPluginExecuteProcessor> { @@ -107,12 +108,15 @@ public List execute(List upstreamDataS sinkConfig.getString(PLUGIN_NAME.key())), sinkConfig); sink.setJobContext(jobContext); - SeaTunnelRowType sourceType = stream.getCatalogTable().getSeaTunnelRowType(); + // TODO support sink multi sink + SeaTunnelRowType sourceType = + stream.getCatalogTables().get(0).getSeaTunnelRowType(); sink.setTypeInfo(sourceType); } else { + // TODO support sink multi sink TableSinkFactoryContext context = TableSinkFactoryContext.replacePlaceholderAndCreate( - stream.getCatalogTable(), + stream.getCatalogTables().get(0), ReadonlyConfig.fromConfig(sinkConfig), classLoader, ((TableSinkFactory) factory.get()) @@ -134,8 +138,8 @@ public List execute(List upstreamDataS } DataStreamSink dataStreamSink = stream.getDataStream() - .sinkTo(new FlinkSink<>(sink, stream.getCatalogTable())) - .name(sink.getPluginName()); + .sinkTo(new FlinkSink<>(sink, stream.getCatalogTables().get(0))) + .name(String.format("%s-Sink", sink.getPluginName())); if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) { int parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key()); dataStreamSink.setParallelism(parallelism); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java index d805c286f86..34d91842771 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java @@ -27,7 +27,6 @@ import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment; import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName; import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil; -import org.apache.seatunnel.core.starter.flink.utils.TableUtil; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.Configuration; @@ -37,11 +36,8 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.types.Row; import org.apache.flink.util.TernaryBoolean; import lombok.extern.slf4j.Slf4j; @@ -55,10 +51,8 @@ @Slf4j public abstract class AbstractFlinkRuntimeEnvironment implements RuntimeEnvironment { - protected static final String RESULT_TABLE_NAME = "result_table_name"; protected Config config; protected StreamExecutionEnvironment environment; - protected StreamTableEnvironment tableEnvironment; protected JobMode jobMode; protected String jobName = Constants.LOGO; @@ -78,10 +72,6 @@ public CheckResult checkConfig() { return EnvironmentUtil.checkRestartStrategy(config); } - public StreamTableEnvironment getStreamTableEnvironment() { - return tableEnvironment; - } - public StreamExecutionEnvironment getStreamExecutionEnvironment() { return environment; } @@ -228,14 +218,6 @@ private void setTimeCharacteristic() { } } - public void registerResultTable(Config config, DataStream dataStream, String name) { - StreamTableEnvironment tableEnvironment = this.getStreamTableEnvironment(); - if (!TableUtil.tableExists(tableEnvironment, name)) { - tableEnvironment.createTemporaryView( - name, tableEnvironment.fromChangelogStream(dataStream)); - } - } - public boolean isStreaming() { return JobMode.STREAMING.equals(jobMode); } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java index 7b158ee60b9..a80a09b5067 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/DataStreamTableInfo.java @@ -18,20 +18,22 @@ package org.apache.seatunnel.core.starter.flink.execution; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.types.Row; import lombok.AllArgsConstructor; import lombok.Data; +import java.util.List; + @Data @AllArgsConstructor public class DataStreamTableInfo { - private DataStream dataStream; + private DataStream dataStream; - private CatalogTable catalogTable; + private List catalogTables; private String tableName; } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java index 565b7379bf2..57956db56c1 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java @@ -23,12 +23,6 @@ import org.apache.seatunnel.common.utils.ReflectionUtils; import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor; -import org.apache.seatunnel.core.starter.flink.utils.TableUtil; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.types.Row; import java.net.URL; import java.net.URLClassLoader; @@ -36,8 +30,6 @@ import java.util.Optional; import java.util.function.BiConsumer; -import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME; - public abstract class FlinkAbstractPluginExecuteProcessor implements PluginExecuteProcessor { @@ -84,10 +76,7 @@ public void setRuntimeEnvironment(FlinkRuntimeEnvironment flinkRuntimeEnvironmen protected Optional fromSourceTable( Config pluginConfig, List upstreamDataStreams) { if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) { - StreamTableEnvironment tableEnvironment = - flinkRuntimeEnvironment.getStreamTableEnvironment(); String tableName = pluginConfig.getString(SOURCE_TABLE_NAME); - Table table = tableEnvironment.from(tableName); DataStreamTableInfo dataStreamTableInfo = upstreamDataStreams.stream() .filter(info -> tableName.equals(info.getTableName())) @@ -99,20 +88,13 @@ protected Optional fromSourceTable( "table %s not found", tableName))); return Optional.of( new DataStreamTableInfo( - TableUtil.tableToDataStream(tableEnvironment, table), - dataStreamTableInfo.getCatalogTable(), + dataStreamTableInfo.getDataStream(), + dataStreamTableInfo.getCatalogTables(), tableName)); } return Optional.empty(); } - protected void registerResultTable(Config pluginConfig, DataStream dataStream) { - if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) { - String resultTable = pluginConfig.getString(RESULT_TABLE_NAME.key()); - flinkRuntimeEnvironment.registerResultTable(pluginConfig, dataStream, resultTable); - } - } - protected abstract List initializePlugins( List jarPaths, List pluginConfigs); } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index c1de8ff4f71..e3428c751e6 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -21,13 +21,6 @@ import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment; -import org.apache.seatunnel.core.starter.flink.utils.ConfigKeyName; -import org.apache.seatunnel.core.starter.flink.utils.EnvironmentUtil; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import lombok.extern.slf4j.Slf4j; @@ -50,7 +43,6 @@ public FlinkRuntimeEnvironment setConfig(Config config) { @Override public FlinkRuntimeEnvironment prepare() { createStreamEnvironment(); - createStreamTableEnvironment(); if (config.hasPath("job.name")) { jobName = config.getString("job.name"); } @@ -63,23 +55,6 @@ public FlinkRuntimeEnvironment setJobMode(JobMode jobMode) { return this; } - private void createStreamTableEnvironment() { - EnvironmentSettings environmentSettings = - EnvironmentSettings.newInstance().inStreamingMode().build(); - tableEnvironment = - StreamTableEnvironment.create(getStreamExecutionEnvironment(), environmentSettings); - TableConfig config = tableEnvironment.getConfig(); - if (EnvironmentUtil.hasPathAndWaring(this.config, ConfigKeyName.MAX_STATE_RETENTION_TIME) - && EnvironmentUtil.hasPathAndWaring( - this.config, ConfigKeyName.MIN_STATE_RETENTION_TIME)) { - long max = this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME); - long min = this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME); - config.setIdleStateRetentionTime(Time.seconds(min), Time.seconds(max)); - } - // init flink table env config - EnvironmentUtil.initTableEnvironmentConfiguration(this.config, config.getConfiguration()); - } - public static FlinkRuntimeEnvironment getInstance(Config config) { if (INSTANCE == null) { synchronized (FlinkRuntimeEnvironment.class) { diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java index 14247464551..c713593821e 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java @@ -52,6 +52,7 @@ import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME; import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED; +@SuppressWarnings("unchecked,rawtypes") public class SinkExecuteProcessor extends FlinkAbstractPluginExecuteProcessor> { @@ -108,12 +109,15 @@ public List execute(List upstreamDataS sinkConfig.getString(PLUGIN_NAME.key())), sinkConfig); sink.setJobContext(jobContext); - SeaTunnelRowType sourceType = stream.getCatalogTable().getSeaTunnelRowType(); + // TODO sink support multi table + SeaTunnelRowType sourceType = + stream.getCatalogTables().get(0).getSeaTunnelRowType(); sink.setTypeInfo(sourceType); } else { + // TODO sink support multi table TableSinkFactoryContext context = TableSinkFactoryContext.replacePlaceholderAndCreate( - stream.getCatalogTable(), + stream.getCatalogTables().get(0), ReadonlyConfig.fromConfig(sinkConfig), classLoader, ((TableSinkFactory) factory.get()) @@ -137,8 +141,9 @@ public List execute(List upstreamDataS stream.getDataStream() .sinkTo( SinkV1Adapter.wrap( - new FlinkSink<>(sink, stream.getCatalogTable()))) - .name(sink.getPluginName()); + new FlinkSink<>( + sink, stream.getCatalogTables().get(0)))) + .name(String.format("%s-Sink", sink.getPluginName())); if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) { int parallelism = sinkConfig.getInt(CommonOptions.PARALLELISM.key()); dataStreamSink.setParallelism(parallelism); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java index 20b74f4b71e..eeb757a8536 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.core.starter.enums.PluginType; import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.core.starter.execution.SourceTableInfo; @@ -71,21 +72,20 @@ public List execute(List upstreamDataS Config pluginConfig = pluginConfigs.get(i); FlinkSource flinkSource = new FlinkSource<>(internalSource, envConfig); - DataStreamSource sourceStream = + DataStreamSource sourceStream = executionEnvironment.fromSource( flinkSource, WatermarkStrategy.noWatermarks(), - String.format("%s-source", internalSource.getPluginName())); + String.format("%s-Source", internalSource.getPluginName())); if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) { int parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key()); sourceStream.setParallelism(parallelism); } - registerResultTable(pluginConfig, sourceStream); sources.add( new DataStreamTableInfo( sourceStream, - sourceTableInfo.getCatalogTables().get(0), + sourceTableInfo.getCatalogTables(), pluginConfig.hasPath(RESULT_TABLE_NAME.key()) ? pluginConfig.getString(RESULT_TABLE_NAME.key()) : null)); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java index 1ff2cf64372..c92eaf42a9a 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java @@ -25,19 +25,15 @@ import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.core.starter.exception.TaskExecuteException; import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; -import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter; -import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils; -import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.types.Row; +import org.apache.flink.streaming.api.operators.StreamMap; import java.net.URL; import java.util.Collections; @@ -47,6 +43,7 @@ import static org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME; +@SuppressWarnings("unchecked,rawtypes") public class TransformExecuteProcessor extends FlinkAbstractPluginExecuteProcessor { @@ -97,21 +94,20 @@ public List execute(List upstreamDataS TableTransformFactory factory = plugins.get(i); TableTransformFactoryContext context = new TableTransformFactoryContext( - Collections.singletonList(stream.getCatalogTable()), + stream.getCatalogTables(), ReadonlyConfig.fromConfig(pluginConfig), classLoader); ConfigValidator.of(context.getOptions()).validate(factory.optionRule()); SeaTunnelTransform transform = factory.createTransform(context).createTransform(); - SeaTunnelRowType sourceType = stream.getCatalogTable().getSeaTunnelRowType(); transform.setJobContext(jobContext); - DataStream inputStream = - flinkTransform(sourceType, transform, stream.getDataStream()); - registerResultTable(pluginConfig, inputStream); + DataStream inputStream = + flinkTransform(transform, stream.getDataStream()); + // TODO transform support multi tables upstreamDataStreams.add( new DataStreamTableInfo( inputStream, - transform.getProducedCatalogTable(), + Collections.singletonList(transform.getProducedCatalogTable()), pluginConfig.hasPath(RESULT_TABLE_NAME.key()) ? pluginConfig.getString(RESULT_TABLE_NAME.key()) : null)); @@ -126,28 +122,17 @@ public List execute(List upstreamDataS return upstreamDataStreams; } - protected DataStream flinkTransform( - SeaTunnelRowType sourceType, SeaTunnelTransform transform, DataStream stream) { - TypeInformation rowTypeInfo = - TypeConverterUtils.convert( - transform.getProducedCatalogTable().getSeaTunnelRowType()); - FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(sourceType); - FlinkRowConverter transformOutputRowConverter = - new FlinkRowConverter(transform.getProducedCatalogTable().getSeaTunnelRowType()); - DataStream output = - stream.flatMap( - (FlatMapFunction) - (value, out) -> { - SeaTunnelRow seaTunnelRow = - transformInputRowConverter.reconvert(value); - SeaTunnelRow dataRow = - (SeaTunnelRow) transform.map(seaTunnelRow); - if (dataRow != null) { - Row copy = transformOutputRowConverter.convert(dataRow); - out.collect(copy); - } - }, - rowTypeInfo); - return output; + protected DataStream flinkTransform( + SeaTunnelTransform transform, DataStream stream) { + return stream.transform( + String.format("%s-Transform", transform.getPluginName()), + TypeInformation.of(SeaTunnelRow.class), + new StreamMap<>( + flinkRuntimeEnvironment + .getStreamExecutionEnvironment() + .clean( + row -> + ((SeaTunnelTransform) transform) + .map(row)))); } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.java index ad5c6365f2f..48485eb69c3 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/UnifyEnvParameterIT.java @@ -126,22 +126,6 @@ public void testUnifiedFlinkTableEnvParam(AbstractTestFlinkContainer container) } Assertions.assertNotNull(jobInfoReference.get()); }); - Map jobInfo = jobInfoReference.get(); - - /** - * 'table.exec.resource.default-parallelism' has a higher priority than 'parallelism', so - * one of these nodes must have a parallelism of 2. - */ - Map plan = (Map) jobInfo.get("plan"); - List> nodes = (List>) plan.get("nodes"); - boolean tableExecParallelism = false; - for (Map node : nodes) { - int parallelism = (int) node.get("parallelism"); - if (!tableExecParallelism && parallelism == 2) { - tableExecParallelism = true; - } - } - Assertions.assertTrue(tableExecParallelism); } public void genericTest(String configPath, AbstractTestFlinkContainer container) diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml index 99c75d324a8..ef801bdb9c0 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml @@ -112,6 +112,12 @@ ${flink.1.15.3.version} + + org.apache.flink + flink-runtime-web + ${flink.1.15.3.version} + + com.squareup.okhttp3 mockwebserver diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf index 799dac79960..12c1f9f2811 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf @@ -19,7 +19,7 @@ ###### env { - job.mode = "BATCH" + job.mode = "STREAMING" parallelism = 2 } @@ -41,15 +41,21 @@ source { } transform { - + Copy { + source_table_name = "fake" + result_table_name = "fake1" + fields { + name1 = name + } + } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, # please go to https://seatunnel.apache.org/docs/category/transform-v2 } sink { Console { + source_table_name = "fake1" } - # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, # please go to https://seatunnel.apache.org/docs/category/sink-v2 } diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java index d4bd43c3d1c..4b62895f18c 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java @@ -66,6 +66,7 @@ import java.util.stream.Collectors; @Slf4j +@SuppressWarnings("unchecked") public abstract class AbstractPluginDiscovery implements PluginDiscovery { private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties"; diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java deleted file mode 100644 index 95cfa335e7d..00000000000 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-13/src/test/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtilsTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.translation.flink.utils; - -import org.apache.seatunnel.api.table.type.ArrayType; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.DecimalType; - -import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class TypeConverterUtilsTest { - // -------------------------------------------------------------- - // basic types test - // -------------------------------------------------------------- - - @Test - public void convertStringType() { - Assertions.assertEquals( - BasicTypeInfo.STRING_TYPE_INFO, TypeConverterUtils.convert(BasicType.STRING_TYPE)); - } - - @Test - public void convertIntegerType() { - Assertions.assertEquals( - BasicTypeInfo.INT_TYPE_INFO, TypeConverterUtils.convert(BasicType.INT_TYPE)); - } - - @Test - public void convertBooleanType() { - Assertions.assertEquals( - BasicTypeInfo.BOOLEAN_TYPE_INFO, - TypeConverterUtils.convert(BasicType.BOOLEAN_TYPE)); - } - - @Test - public void convertDoubleType() { - Assertions.assertEquals( - BasicTypeInfo.DOUBLE_TYPE_INFO, TypeConverterUtils.convert(BasicType.DOUBLE_TYPE)); - } - - @Test - public void convertLongType() { - Assertions.assertEquals( - BasicTypeInfo.LONG_TYPE_INFO, TypeConverterUtils.convert(BasicType.LONG_TYPE)); - } - - @Test - public void convertFloatType() { - Assertions.assertEquals( - BasicTypeInfo.FLOAT_TYPE_INFO, TypeConverterUtils.convert(BasicType.FLOAT_TYPE)); - } - - @Test - public void convertByteType() { - Assertions.assertEquals( - BasicTypeInfo.BYTE_TYPE_INFO, TypeConverterUtils.convert(BasicType.BYTE_TYPE)); - } - - @Test - public void convertShortType() { - Assertions.assertEquals( - BasicTypeInfo.SHORT_TYPE_INFO, TypeConverterUtils.convert(BasicType.SHORT_TYPE)); - } - - @Test - public void convertBigDecimalType() { - /** - * To solve lost precision and scale of {@link - * org.apache.seatunnel.api.table.type.DecimalType}, use {@link - * org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the convert - * result of {@link org.apache.seatunnel.api.table.type.DecimalType} instance. - */ - Assertions.assertEquals( - BasicTypeInfo.STRING_TYPE_INFO, TypeConverterUtils.convert(new DecimalType(30, 2))); - } - - @Test - public void convertNullType() { - Assertions.assertEquals( - BasicTypeInfo.VOID_TYPE_INFO, TypeConverterUtils.convert(BasicType.VOID_TYPE)); - } - - // -------------------------------------------------------------- - // array types test - // -------------------------------------------------------------- - - @Test - public void convertBooleanArrayType() { - Assertions.assertEquals( - BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO, - TypeConverterUtils.convert(ArrayType.BOOLEAN_ARRAY_TYPE)); - } - - @Test - public void convertStringArrayType() { - Assertions.assertEquals( - BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, - TypeConverterUtils.convert(ArrayType.STRING_ARRAY_TYPE)); - } - - @Test - public void convertDoubleArrayType() { - Assertions.assertEquals( - BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, - TypeConverterUtils.convert(ArrayType.DOUBLE_ARRAY_TYPE)); - } - - @Test - public void convertIntegerArrayType() { - Assertions.assertEquals( - BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, - TypeConverterUtils.convert(ArrayType.INT_ARRAY_TYPE)); - } - - @Test - public void convertLongArrayType() { - Assertions.assertEquals( - BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO, - TypeConverterUtils.convert(ArrayType.LONG_ARRAY_TYPE)); - } - - @Test - public void convertFloatArrayType() { - Assertions.assertEquals( - BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO, - TypeConverterUtils.convert(ArrayType.FLOAT_ARRAY_TYPE)); - } - - @Test - public void convertByteArrayType() { - Assertions.assertEquals( - BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO, - TypeConverterUtils.convert(ArrayType.BYTE_ARRAY_TYPE)); - } - - @Test - public void convertShortArrayType() { - Assertions.assertEquals( - BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO, - TypeConverterUtils.convert(ArrayType.SHORT_ARRAY_TYPE)); - } -} diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java deleted file mode 100644 index b24cb96dfef..00000000000 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/serialization/FlinkRowConverter.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.translation.flink.serialization; - -import org.apache.seatunnel.api.table.type.DecimalType; -import org.apache.seatunnel.api.table.type.MapType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.api.table.type.SqlType; -import org.apache.seatunnel.translation.serialization.RowConverter; - -import org.apache.flink.types.Row; -import org.apache.flink.types.RowKind; - -import lombok.extern.slf4j.Slf4j; - -import java.io.IOException; -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.util.HashMap; -import java.util.Map; -import java.util.function.BiFunction; - -/** - * The row converter between {@link Row} and {@link SeaTunnelRow}, used to convert or reconvert - * between flink row and seatunnel row - */ -@Slf4j -public class FlinkRowConverter extends RowConverter { - - public FlinkRowConverter(SeaTunnelDataType dataType) { - super(dataType); - } - - @Override - public Row convert(SeaTunnelRow seaTunnelRow) throws IOException { - validate(seaTunnelRow); - return (Row) convert(seaTunnelRow, dataType); - } - - private static Object convert(Object field, SeaTunnelDataType dataType) { - if (field == null) { - return null; - } - SqlType sqlType = dataType.getSqlType(); - switch (sqlType) { - case ROW: - SeaTunnelRow seaTunnelRow = (SeaTunnelRow) field; - SeaTunnelRowType rowType = (SeaTunnelRowType) dataType; - int arity = rowType.getTotalFields(); - Row engineRow = new Row(arity); - for (int i = 0; i < arity; i++) { - engineRow.setField( - i, convert(seaTunnelRow.getField(i), rowType.getFieldType(i))); - } - engineRow.setKind(RowKind.fromByteValue(seaTunnelRow.getRowKind().toByteValue())); - return engineRow; - case MAP: - return convertMap( - (Map) field, (MapType) dataType, FlinkRowConverter::convert); - - /** - * To solve lost precision and scale of {@link - * org.apache.seatunnel.api.table.type.DecimalType}, use {@link java.lang.String} as - * the convert result of {@link java.math.BigDecimal} instance. - */ - case DECIMAL: - BigDecimal decimal = (BigDecimal) field; - return decimal.toString(); - default: - return field; - } - } - - private static Object convertMap( - Map mapData, - MapType mapType, - BiFunction, Object> convertFunction) { - if (mapData == null || mapData.isEmpty()) { - return mapData; - } - - Map newMap = new HashMap<>(mapData.size()); - mapData.forEach( - (key, value) -> { - SeaTunnelDataType keyType = mapType.getKeyType(); - SeaTunnelDataType valueType = mapType.getValueType(); - newMap.put( - convertFunction.apply(key, keyType), - convertFunction.apply(value, valueType)); - }); - return newMap; - } - - @Override - public SeaTunnelRow reconvert(Row engineRow) throws IOException { - return (SeaTunnelRow) reconvert(engineRow, dataType); - } - - private static Object reconvert(Object field, SeaTunnelDataType dataType) { - if (field == null) { - return null; - } - SqlType sqlType = dataType.getSqlType(); - switch (sqlType) { - case ROW: - Row engineRow = (Row) field; - SeaTunnelRowType rowType = (SeaTunnelRowType) dataType; - int arity = rowType.getTotalFields(); - SeaTunnelRow seaTunnelRow = new SeaTunnelRow(arity); - for (int i = 0; i < arity; i++) { - seaTunnelRow.setField( - i, reconvert(engineRow.getField(i), rowType.getFieldType(i))); - } - seaTunnelRow.setRowKind( - org.apache.seatunnel.api.table.type.RowKind.fromByteValue( - engineRow.getKind().toByteValue())); - return seaTunnelRow; - case MAP: - return convertMap( - (Map) field, (MapType) dataType, FlinkRowConverter::reconvert); - - /** - * To solve lost precision and scale of {@link - * org.apache.seatunnel.api.table.type.DecimalType}, create {@link - * java.math.BigDecimal} instance from {@link java.lang.String} type field. - */ - case DECIMAL: - DecimalType decimalType = (DecimalType) dataType; - String decimalData = (String) field; - BigDecimal decimal = new BigDecimal(decimalData); - decimal.setScale(decimalType.getScale(), RoundingMode.HALF_UP); - return decimal; - default: - return field; - } - } -} diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java index 3c949f64802..725bf606f93 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java @@ -25,11 +25,9 @@ import org.apache.seatunnel.api.sink.SupportResourceShare; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter; import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.api.connector.sink.SinkWriter; -import org.apache.flink.types.Row; import lombok.extern.slf4j.Slf4j; @@ -54,7 +52,6 @@ public class FlinkSinkWriter private final org.apache.seatunnel.api.sink.SinkWriter sinkWriter; - private final FlinkRowConverter rowSerialization; private final Counter sinkWriteCount; @@ -73,7 +70,6 @@ public class FlinkSinkWriter MetricsContext metricsContext) { this.sinkWriter = sinkWriter; this.checkpointId = checkpointId; - this.rowSerialization = new FlinkRowConverter(dataType); this.sinkWriteCount = metricsContext.counter(MetricNames.SINK_WRITE_COUNT); this.sinkWriteBytes = metricsContext.counter(MetricNames.SINK_WRITE_BYTES); this.sinkWriterQPS = metricsContext.meter(MetricNames.SINK_WRITE_QPS); @@ -86,15 +82,17 @@ public class FlinkSinkWriter @Override public void write(InputT element, SinkWriter.Context context) throws IOException { - if (element instanceof Row) { - SeaTunnelRow seaTunnelRow = rowSerialization.reconvert((Row) element); - sinkWriter.write(seaTunnelRow); + if (element == null) { + return; + } + if (element instanceof SeaTunnelRow) { + sinkWriter.write((SeaTunnelRow) element); sinkWriteCount.inc(); - sinkWriteBytes.inc(seaTunnelRow.getBytesSize()); + sinkWriteBytes.inc(((SeaTunnelRow) element).getBytesSize()); sinkWriterQPS.markEvent(); } else { throw new InvalidClassException( - "only support Flink Row at now, the element Class is " + element.getClass()); + "only support SeaTunnelRow at now, the element Class is " + element.getClass()); } } diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java index 39b14d17d03..2ea584029e5 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java @@ -25,26 +25,18 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate; import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy; -import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter; import org.apache.flink.api.connector.source.ReaderOutput; -import org.apache.flink.types.Row; import lombok.extern.slf4j.Slf4j; -/** - * The implementation of {@link Collector} for flink engine, as a container for {@link SeaTunnelRow} - * and convert {@link SeaTunnelRow} to {@link Row}. - */ +/** The implementation of {@link Collector} for flink engine. */ @Slf4j public class FlinkRowCollector implements Collector { - private ReaderOutput readerOutput; - - private final FlinkRowConverter rowSerialization; + private ReaderOutput readerOutput; private final FlowControlGate flowControlGate; @@ -54,9 +46,7 @@ public class FlinkRowCollector implements Collector { private final Meter sourceReadQPS; - public FlinkRowCollector( - SeaTunnelRowType seaTunnelRowType, Config envConfig, MetricsContext metricsContext) { - this.rowSerialization = new FlinkRowConverter(seaTunnelRowType); + public FlinkRowCollector(Config envConfig, MetricsContext metricsContext) { this.flowControlGate = FlowControlGate.create(FlowControlStrategy.fromConfig(envConfig)); this.sourceReadCount = metricsContext.counter(MetricNames.SOURCE_RECEIVED_COUNT); this.sourceReadBytes = metricsContext.counter(MetricNames.SOURCE_RECEIVED_BYTES); @@ -67,8 +57,7 @@ public FlinkRowCollector( public void collect(SeaTunnelRow record) { flowControlGate.audit(record); try { - Row row = rowSerialization.convert(record); - readerOutput.collect(row); + readerOutput.collect(record); sourceReadCount.inc(); sourceReadBytes.inc(record.getBytesSize()); sourceReadQPS.markEvent(); @@ -82,7 +71,7 @@ public Object getCheckpointLock() { return this; } - public FlinkRowCollector withReaderOutput(ReaderOutput readerOutput) { + public FlinkRowCollector withReaderOutput(ReaderOutput readerOutput) { this.readerOutput = readerOutput; return this; } diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java index adf54eef4c5..7868e6d3efd 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java @@ -24,9 +24,7 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer; -import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; @@ -37,7 +35,6 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.types.Row; import java.io.Serializable; @@ -48,7 +45,8 @@ * @param The generic type of enumerator state */ public class FlinkSource - implements Source, EnumStateT>, ResultTypeQueryable { + implements Source, EnumStateT>, + ResultTypeQueryable { private final SeaTunnelSource source; @@ -68,14 +66,13 @@ public Boundedness getBoundedness() { } @Override - public SourceReader> createReader(SourceReaderContext readerContext) - throws Exception { + public SourceReader> createReader( + SourceReaderContext readerContext) throws Exception { org.apache.seatunnel.api.source.SourceReader.Context context = new FlinkSourceReaderContext(readerContext, source); org.apache.seatunnel.api.source.SourceReader reader = source.createReader(context); - return new FlinkSourceReader<>( - reader, context, envConfig, (SeaTunnelRowType) source.getProducedType()); + return new FlinkSourceReader<>(reader, context, envConfig); } @Override @@ -110,7 +107,7 @@ public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() } @Override - public TypeInformation getProducedType() { - return (TypeInformation) TypeConverterUtils.convert(source.getProducedType()); + public TypeInformation getProducedType() { + return TypeInformation.of(SeaTunnelRow.class); } } diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java index 65dc4324779..c2f9cde5005 100644 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java +++ b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java @@ -21,13 +21,11 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.core.io.InputStatus; -import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +41,7 @@ * @param */ public class FlinkSourceReader - implements SourceReader> { + implements SourceReader> { private final Logger LOGGER = LoggerFactory.getLogger(FlinkSourceReader.class); @@ -58,12 +56,10 @@ public class FlinkSourceReader public FlinkSourceReader( org.apache.seatunnel.api.source.SourceReader sourceReader, org.apache.seatunnel.api.source.SourceReader.Context context, - Config envConfig, - SeaTunnelRowType seaTunnelRowType) { + Config envConfig) { this.sourceReader = sourceReader; this.context = context; - this.flinkRowCollector = - new FlinkRowCollector(seaTunnelRowType, envConfig, context.getMetricsContext()); + this.flinkRowCollector = new FlinkRowCollector(envConfig, context.getMetricsContext()); } @Override @@ -76,7 +72,7 @@ public void start() { } @Override - public InputStatus pollNext(ReaderOutput output) throws Exception { + public InputStatus pollNext(ReaderOutput output) throws Exception { if (!((FlinkSourceReaderContext) context).isSendNoMoreElementEvent()) { sourceReader.pollNext(flinkRowCollector.withReaderOutput(output)); } else { diff --git a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java deleted file mode 100644 index ebb77da2688..00000000000 --- a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/utils/TypeConverterUtils.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.translation.flink.utils; - -import org.apache.seatunnel.api.table.type.ArrayType; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.DecimalType; -import org.apache.seatunnel.api.table.type.LocalTimeType; -import org.apache.seatunnel.api.table.type.MapType; -import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; - -import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; - -import java.math.BigDecimal; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -public class TypeConverterUtils { - - private static final Map, BridgedType> BRIDGED_TYPES = new HashMap<>(32); - - static { - // basic types - BRIDGED_TYPES.put( - String.class, - BridgedType.of(BasicType.STRING_TYPE, BasicTypeInfo.STRING_TYPE_INFO)); - BRIDGED_TYPES.put( - Boolean.class, - BridgedType.of(BasicType.BOOLEAN_TYPE, BasicTypeInfo.BOOLEAN_TYPE_INFO)); - BRIDGED_TYPES.put( - Byte.class, BridgedType.of(BasicType.BYTE_TYPE, BasicTypeInfo.BYTE_TYPE_INFO)); - BRIDGED_TYPES.put( - Short.class, BridgedType.of(BasicType.SHORT_TYPE, BasicTypeInfo.SHORT_TYPE_INFO)); - BRIDGED_TYPES.put( - Integer.class, BridgedType.of(BasicType.INT_TYPE, BasicTypeInfo.INT_TYPE_INFO)); - BRIDGED_TYPES.put( - Long.class, BridgedType.of(BasicType.LONG_TYPE, BasicTypeInfo.LONG_TYPE_INFO)); - BRIDGED_TYPES.put( - Float.class, BridgedType.of(BasicType.FLOAT_TYPE, BasicTypeInfo.FLOAT_TYPE_INFO)); - BRIDGED_TYPES.put( - Double.class, - BridgedType.of(BasicType.DOUBLE_TYPE, BasicTypeInfo.DOUBLE_TYPE_INFO)); - BRIDGED_TYPES.put( - Void.class, BridgedType.of(BasicType.VOID_TYPE, BasicTypeInfo.VOID_TYPE_INFO)); - /** - * To solve lost precision and scale of {@link - * org.apache.seatunnel.api.table.type.DecimalType}, use {@link - * org.apache.flink.api.common.typeinfo.BasicTypeInfo#STRING_TYPE_INFO} as the payload of - * {@link org.apache.seatunnel.api.table.type.DecimalType}. - */ - BRIDGED_TYPES.put( - BigDecimal.class, - BridgedType.of(new DecimalType(38, 18), BasicTypeInfo.STRING_TYPE_INFO)); - - // data time types - BRIDGED_TYPES.put( - LocalDate.class, - BridgedType.of(LocalTimeType.LOCAL_DATE_TYPE, LocalTimeTypeInfo.LOCAL_DATE)); - BRIDGED_TYPES.put( - LocalTime.class, - BridgedType.of(LocalTimeType.LOCAL_TIME_TYPE, LocalTimeTypeInfo.LOCAL_TIME)); - BRIDGED_TYPES.put( - LocalDateTime.class, - BridgedType.of( - LocalTimeType.LOCAL_DATE_TIME_TYPE, LocalTimeTypeInfo.LOCAL_DATE_TIME)); - // basic array types - BRIDGED_TYPES.put( - byte[].class, - BridgedType.of( - PrimitiveByteArrayType.INSTANCE, - PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)); - BRIDGED_TYPES.put( - String[].class, - BridgedType.of( - ArrayType.STRING_ARRAY_TYPE, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO)); - BRIDGED_TYPES.put( - Boolean[].class, - BridgedType.of( - ArrayType.BOOLEAN_ARRAY_TYPE, BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO)); - BRIDGED_TYPES.put( - Byte[].class, - BridgedType.of(ArrayType.BYTE_ARRAY_TYPE, BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)); - BRIDGED_TYPES.put( - Short[].class, - BridgedType.of( - ArrayType.SHORT_ARRAY_TYPE, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO)); - BRIDGED_TYPES.put( - Integer[].class, - BridgedType.of(ArrayType.INT_ARRAY_TYPE, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO)); - BRIDGED_TYPES.put( - Long[].class, - BridgedType.of(ArrayType.LONG_ARRAY_TYPE, BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO)); - BRIDGED_TYPES.put( - Float[].class, - BridgedType.of( - ArrayType.FLOAT_ARRAY_TYPE, BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO)); - BRIDGED_TYPES.put( - Double[].class, - BridgedType.of( - ArrayType.DOUBLE_ARRAY_TYPE, BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO)); - } - - private TypeConverterUtils() { - throw new UnsupportedOperationException( - "TypeConverterUtils is a utility class and cannot be instantiated"); - } - - public static SeaTunnelDataType convert(TypeInformation dataType) { - BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass()); - if (bridgedType != null) { - return bridgedType.getSeaTunnelType(); - } - - if (dataType instanceof MapTypeInfo) { - MapTypeInfo mapTypeInfo = (MapTypeInfo) dataType; - return new MapType<>( - convert(mapTypeInfo.getKeyTypeInfo()), convert(mapTypeInfo.getValueTypeInfo())); - } - if (dataType instanceof RowTypeInfo) { - RowTypeInfo typeInformation = (RowTypeInfo) dataType; - String[] fieldNames = typeInformation.getFieldNames(); - SeaTunnelDataType[] seaTunnelDataTypes = - Arrays.stream(typeInformation.getFieldTypes()) - .map(TypeConverterUtils::convert) - .toArray(SeaTunnelDataType[]::new); - return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); - } - throw new IllegalArgumentException("Unsupported Flink's data type: " + dataType); - } - - public static TypeInformation convert(SeaTunnelDataType dataType) { - BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass()); - if (bridgedType != null) { - return bridgedType.getFlinkType(); - } - - if (dataType instanceof MapType) { - MapType mapType = (MapType) dataType; - return new MapTypeInfo<>( - convert(mapType.getKeyType()), convert(mapType.getValueType())); - } - - if (dataType instanceof ArrayType) { - ArrayType arrayType = (ArrayType) dataType; - return ObjectArrayTypeInfo.getInfoFor( - arrayType.getTypeClass(), convert(arrayType.getElementType())); - } - - if (dataType instanceof SeaTunnelRowType) { - SeaTunnelRowType rowType = (SeaTunnelRowType) dataType; - TypeInformation[] types = - Arrays.stream(rowType.getFieldTypes()) - .map(TypeConverterUtils::convert) - .toArray(TypeInformation[]::new); - return new RowTypeInfo(types, rowType.getFieldNames()); - } - throw new IllegalArgumentException("Unsupported SeaTunnel's data type: " + dataType); - } - - public static class BridgedType { - private final SeaTunnelDataType seaTunnelType; - private final TypeInformation flinkType; - - private BridgedType(SeaTunnelDataType seaTunnelType, TypeInformation flinkType) { - this.seaTunnelType = seaTunnelType; - this.flinkType = flinkType; - } - - public static BridgedType of( - SeaTunnelDataType seaTunnelType, TypeInformation flinkType) { - return new BridgedType(seaTunnelType, flinkType); - } - - public TypeInformation getFlinkType() { - return flinkType; - } - - public SeaTunnelDataType getSeaTunnelType() { - return seaTunnelType; - } - } -}