diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java index 79c0c18706f..a86f132c33d 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java @@ -44,7 +44,6 @@ import java.io.Serializable; import java.net.URL; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -307,16 +306,15 @@ public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) { return sinkOptionRule; } - public static SeaTunnelTransform createAndPrepareTransform( - CatalogTable catalogTable, + public static SeaTunnelTransform createAndPrepareMultiTableTransform( + List catalogTables, ReadonlyConfig options, ClassLoader classLoader, String factoryIdentifier) { final TableTransformFactory factory = discoverFactory(classLoader, TableTransformFactory.class, factoryIdentifier); TableTransformFactoryContext context = - new TableTransformFactoryContext( - Collections.singletonList(catalogTable), options, classLoader); + new TableTransformFactoryContext(catalogTables, options, classLoader); ConfigValidator.of(context.getOptions()).validate(factory.optionRule()); return factory.createTransform(context).createTransform(); } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java index a64e1b7c7d5..103d0686416 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java @@ -20,9 +20,11 @@ import org.apache.seatunnel.api.common.PluginIdentifierInterface; import org.apache.seatunnel.api.source.SeaTunnelJobAware; import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import java.io.Serializable; +import java.util.List; public interface SeaTunnelTransform extends Serializable, PluginIdentifierInterface, SeaTunnelJobAware { @@ -53,6 +55,12 @@ default void setTypeInfo(SeaTunnelDataType inputDataType) { */ T map(T row); + List getProducedCatalogTables(); + + default SchemaChangeEvent mapSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) { + return schemaChangeEvent; + } + /** call it when Transformer completed */ default void close() {} } diff --git a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java index 805bf9da6b8..9bfd49965f0 100644 --- a/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java +++ b/seatunnel-e2e/seatunnel-core-e2e/seatunnel-starter-e2e/src/test/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelConnectorTest.java @@ -88,11 +88,11 @@ public class SeaTunnelConnectorTest extends TestSuiteBase implements TestResourc }; // Match paimon source and paimon sink - private static final Pattern pattern1 = + private static final Pattern PATTERN1 = Pattern.compile( "(Paimon (source|sink))(.*?)(?=(Paimon (source|sink)|$))", Pattern.DOTALL); // Match required options and optional options - private static final Pattern pattern2 = + private static final Pattern PATTERN2 = Pattern.compile("Required Options:(.*?)(?:Optional Options: (.*?))?$", Pattern.DOTALL); @Override @@ -132,7 +132,7 @@ public void testExecCheck(TestContainer container) throws Exception { } private void checkStdOutForOptionRule(String stdout) { - Matcher matcher1 = pattern1.matcher(stdout.trim()); + Matcher matcher1 = PATTERN1.matcher(stdout.trim()); String paimonSourceContent = StringUtils.EMPTY; String paimonSinkContent = StringUtils.EMPTY; Assertions.assertTrue(matcher1.groupCount() >= 3); @@ -153,7 +153,7 @@ private void checkStdOutForOptionRule(String stdout) { private void checkStdOutForOptionRuleOfSinglePluginTypeWithTransform( String stdout, Factory factory) { - Matcher matcher2 = pattern2.matcher(stdout.trim()); + Matcher matcher2 = PATTERN2.matcher(stdout.trim()); Assertions.assertTrue(matcher2.find()); Assertions.assertTrue(matcher2.groupCount() >= 2); OptionRule optionRule = factory.optionRule(); @@ -169,11 +169,11 @@ private void checkStdOutForOptionRuleOfSinglePluginTypeWithTransform( optionRule.getOptionalOptions().size(), StringUtils.isBlank(optionalOptions) ? 0 - : optionalOptions.split(StringUtils.LF).length); + : optionalOptions.trim().split(StringUtils.LF).length); } private void checkStdOutForOptionRuleOfSinglePluginTypeWithConnector(String stdout) { - Matcher matcher1 = pattern1.matcher(stdout.trim()); + Matcher matcher1 = PATTERN1.matcher(stdout.trim()); Assertions.assertTrue(matcher1.find()); Assertions.assertTrue(matcher1.groupCount() >= 3); String paimonPluginContent = matcher1.group(3).trim(); @@ -187,7 +187,7 @@ private void checkStdOutForOptionRuleOfSinglePluginTypeWithConnector(String stdo } private void checkOptionRuleOfSinglePluginType(Factory factory, String optionRules) { - Matcher matcher2 = pattern2.matcher(optionRules); + Matcher matcher2 = PATTERN2.matcher(optionRules); Assertions.assertTrue(matcher2.find()); Assertions.assertTrue(matcher2.groupCount() >= 2); String requiredOptions = matcher2.group(1).trim(); @@ -205,7 +205,7 @@ private void checkOptionRuleOfSinglePluginType(Factory factory, String optionRul optionRule.getOptionalOptions().size(), StringUtils.isBlank(optionalOptions) ? 0 - : optionalOptions.split(StringUtils.LF).length); + : optionalOptions.trim().split(StringUtils.LF).length); } private void checkResultForCase1(Container.ExecResult execResult) { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestCopyIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestCopyIT.java index b81d1fc6664..9217df50fdb 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestCopyIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestCopyIT.java @@ -32,4 +32,11 @@ public void testCopy(TestContainer container) throws IOException, InterruptedExc Container.ExecResult execResult = container.executeJob("/copy_transform.conf"); Assertions.assertEquals(0, execResult.getExitCode()); } + + @TestTemplate + public void testCopyMultiTable(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/copy_transform_multi_table.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform_multi_table.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform_multi_table.conf new file mode 100644 index 00000000000..b937b0a8cbe --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/resources/copy_transform_multi_table.conf @@ -0,0 +1,131 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + id = "int" + name = "string" + c_row = { + c_row = { + c_int = int + } + } + } + } + } +} + +transform { + Copy { + source_table_name = "fake" + result_table_name = "fake1" + src_field = "name" + dest_field = "name1" + } + Copy { + source_table_name = "fake1" + result_table_name = "fake2" + fields { + id_1 = "id" + name2 = "name" + name3 = "name" + c_row_1 = "c_row" + } + } +} + +sink { + Assert { + source_table_name = "fake2" + rules = + { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ], + field_rules = [ + { + field_name = id + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = id_1 + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + { + field_name = name1 + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + { + field_name = name2 + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + { + field_name = name3 + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index cb7f118a6e4..7b0537c7106 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -423,13 +423,6 @@ private void parseTransform( inputIds.stream() .map(tableWithActionMap::get) .filter(Objects::nonNull) - .peek( - input -> { - if (input.size() > 1) { - throw new JobDefineCheckException( - "Adding transform to multi-table source is not supported."); - } - }) .flatMap(Collection::stream) .collect(Collectors.toList()); if (inputs.isEmpty()) { @@ -450,14 +443,19 @@ private void parseTransform( inputs.stream() .map(Tuple2::_2) .collect(Collectors.toCollection(LinkedHashSet::new)); + + LinkedHashSet catalogTables = + inputs.stream() + .map(Tuple2::_1) + .collect(Collectors.toCollection(LinkedHashSet::new)); checkProducedTypeEquals(inputActions); int spareParallelism = inputs.get(0)._2().getParallelism(); int parallelism = readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism); - CatalogTable catalogTable = inputs.get(0)._1(); SeaTunnelTransform transform = - FactoryUtil.createAndPrepareTransform( - catalogTable, readonlyConfig, classLoader, factoryId); + FactoryUtil.createAndPrepareMultiTableTransform( + new ArrayList<>(catalogTables), readonlyConfig, classLoader, factoryId); + transform.setJobContext(jobConfig.getJobContext()); long id = idGenerator.getNextId(); String actionName = JobConfigParser.createTransformActionName(index, factoryId); @@ -471,10 +469,15 @@ private void parseTransform( jarUrls, new HashSet<>()); transformAction.setParallelism(parallelism); - tableWithActionMap.put( - tableId, - Collections.singletonList( - new Tuple2<>(transform.getProducedCatalogTable(), transformAction))); + + List> actions = new ArrayList<>(); + List producedCatalogTables = transform.getProducedCatalogTables(); + + for (CatalogTable catalogTable : producedCatalogTables) { + actions.add(new Tuple2<>(catalogTable, transformAction)); + } + + tableWithActionMap.put(tableId, actions); } public static SeaTunnelDataType getProducedType(Action action) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java index 0447513b5ff..b437174b9be 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.engine.server.task.flow; +import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.api.transform.Collector; import org.apache.seatunnel.api.transform.SeaTunnelTransform; @@ -84,6 +85,20 @@ public void received(Record record) { // ack after #addState runningTask.ack(barrier); collector.collect(record); + } else if (record.getData() instanceof SchemaChangeEvent) { + if (prepareClose) { + return; + } + SchemaChangeEvent event = (SchemaChangeEvent) record.getData(); + for (SeaTunnelTransform t : transform) { + event = t.mapSchemaChangeEvent(event); + if (event == null) { + break; + } + } + if (event != null) { + collector.collect(new Record<>(event)); + } } else { if (prepareClose) { return; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java index a0fa464af7a..8ba2dd44ee9 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java @@ -27,6 +27,9 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import java.util.Collections; +import java.util.List; + @Slf4j public abstract class AbstractCatalogSupportTransform implements SeaTunnelTransform { protected final ErrorHandleWay rowErrorHandleWay; @@ -35,7 +38,7 @@ public abstract class AbstractCatalogSupportTransform implements SeaTunnelTransf protected volatile CatalogTable outputCatalogTable; public AbstractCatalogSupportTransform(@NonNull CatalogTable inputCatalogTable) { - this(inputCatalogTable, CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.defaultValue()); + this(inputCatalogTable, TransformCommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.defaultValue()); } public AbstractCatalogSupportTransform( @@ -85,6 +88,11 @@ public CatalogTable getProducedCatalogTable() { return outputCatalogTable; } + @Override + public List getProducedCatalogTables() { + return Collections.singletonList(getProducedCatalogTable()); + } + private CatalogTable transformCatalogTable() { TableIdentifier tableIdentifier = transformTableIdentifier(); TableSchema tableSchema = transformTableSchema(); diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogSupportTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogSupportTransform.java new file mode 100644 index 00000000000..281b5516d29 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractMultiCatalogSupportTransform.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.common; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public abstract class AbstractMultiCatalogSupportTransform + implements SeaTunnelTransform { + + protected List inputCatalogTables; + + protected List outputCatalogTables; + + protected Map> transformMap; + + public AbstractMultiCatalogSupportTransform( + List inputCatalogTables, ReadonlyConfig config) { + this.inputCatalogTables = inputCatalogTables; + this.transformMap = new HashMap<>(); + Pattern tableMatchRegex = + Pattern.compile(config.get(TransformCommonOptions.TABLE_MATCH_REGEX)); + Map singleTableConfig = + config.get(TransformCommonOptions.MULTI_TABLES).stream() + .map(ReadonlyConfig::fromMap) + .filter(c -> c.get(TransformCommonOptions.TABLE_PATH) != null) + .collect( + Collectors.toMap( + c -> c.get(TransformCommonOptions.TABLE_PATH), + Function.identity())); + + inputCatalogTables.forEach( + inputCatalogTable -> { + String tableId = inputCatalogTable.getTableId().toTablePath().toString(); + ReadonlyConfig tableConfig; + if (singleTableConfig.containsKey(tableId)) { + tableConfig = singleTableConfig.get(tableId); + } else if (tableMatchRegex.matcher(tableId).matches()) { + tableConfig = config; + } else { + tableConfig = null; + } + if (tableConfig != null) { + transformMap.put(tableId, buildTransform(inputCatalogTable, tableConfig)); + } else { + transformMap.put(tableId, new IdentityTransform(inputCatalogTable)); + } + }); + + this.outputCatalogTables = + inputCatalogTables.stream() + .map( + inputCatalogTable -> { + String tableName = + inputCatalogTable.getTableId().toTablePath().toString(); + return transformMap.get(tableName).getProducedCatalogTable(); + }) + .collect(Collectors.toList()); + } + + @Override + public SeaTunnelRow map(SeaTunnelRow row) { + if (transformMap.size() == 1) { + return transformMap.values().iterator().next().map(row); + } + return transformMap.get(row.getTableId()).map(row); + } + + protected abstract SeaTunnelTransform buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config); + + @Override + public List getProducedCatalogTables() { + return outputCatalogTables; + } + + @Override + public CatalogTable getProducedCatalogTable() { + return outputCatalogTables.get(0); + } + + @Override + public void setTypeInfo(SeaTunnelDataType inputDataType) {} + + public static class IdentityTransform implements SeaTunnelTransform { + private final CatalogTable catalogTable; + + @Override + public String getPluginName() { + return "Identity"; + } + + public IdentityTransform(CatalogTable catalogTable) { + this.catalogTable = catalogTable; + } + + @Override + public SeaTunnelRow map(SeaTunnelRow row) { + return row; + } + + @Override + public List getProducedCatalogTables() { + return Collections.singletonList(catalogTable); + } + + @Override + public CatalogTable getProducedCatalogTable() { + return catalogTable; + } + + @Override + public void setTypeInfo(SeaTunnelDataType inputDataType) {} + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/CommonOptions.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/TransformCommonOptions.java similarity index 66% rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/CommonOptions.java rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/TransformCommonOptions.java index eb4b0f4a712..f2340086d62 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/CommonOptions.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/TransformCommonOptions.java @@ -17,14 +17,37 @@ package org.apache.seatunnel.transform.common; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; + import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class TransformCommonOptions { + + public static final Option>> MULTI_TABLES = + Options.key("table_transform") + .type(new TypeReference>>() {}) + .defaultValue(Collections.emptyList()) + .withDescription("The table transform config"); -public final class CommonOptions { + public static final Option TABLE_PATH = + Options.key("table_path") + .stringType() + .noDefaultValue() + .withDescription("The table path of catalog table"); + + public static final Option TABLE_MATCH_REGEX = + Options.key("table_match_regex") + .stringType() + .defaultValue(".*") + .withDescription("The regex to match the table path"); - public static Option ROW_ERROR_HANDLE_WAY_OPTION = + public static final Option ROW_ERROR_HANDLE_WAY_OPTION = Options.key("row_error_handle_way") .singleChoice( ErrorHandleWay.class, @@ -34,8 +57,7 @@ public final class CommonOptions { "The processing method of data format error. The default value is fail, and the optional value is (fail, skip). " + "When fail is selected, data format error will block and an exception will be thrown. " + "When skip is selected, data format error will skip this line data."); - - public static Option COLUMN_ERROR_HANDLE_WAY_OPTION = + public static final Option COLUMN_ERROR_HANDLE_WAY_OPTION = Options.key("column_error_handle_way") .enumType(ErrorHandleWay.class) .noDefaultValue() diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldMultiCatalogTransform.java new file mode 100644 index 00000000000..b3c0827e811 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldMultiCatalogTransform.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.copy; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogSupportTransform; + +import java.util.List; + +public class CopyFieldMultiCatalogTransform extends AbstractMultiCatalogSupportTransform { + + public CopyFieldMultiCatalogTransform( + List inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + public String getPluginName() { + return CopyFieldTransform.PLUGIN_NAME; + } + + @Override + protected SeaTunnelTransform buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config) { + return new CopyFieldTransform(CopyTransformConfig.of(config), inputCatalogTable); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java index d8d8a97962e..428623ff2e1 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java @@ -18,11 +18,11 @@ package org.apache.seatunnel.transform.copy; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import com.google.auto.service.AutoService; @@ -38,13 +38,15 @@ public OptionRule optionRule() { return OptionRule.builder() .bundled(CopyTransformConfig.SRC_FIELD, CopyTransformConfig.DEST_FIELD) .bundled(CopyTransformConfig.FIELDS) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) .build(); } @Override public TableTransform createTransform(TableTransformFactoryContext context) { - CopyTransformConfig copyTransformConfig = CopyTransformConfig.of(context.getOptions()); - CatalogTable catalogTable = context.getCatalogTables().get(0); - return () -> new CopyFieldTransform(copyTransformConfig, catalogTable); + return () -> + new CopyFieldMultiCatalogTransform( + context.getCatalogTables(), context.getOptions()); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileMultiCatalogTransform.java new file mode 100644 index 00000000000..80b8a985cae --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileMultiCatalogTransform.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.dynamiccompile; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogSupportTransform; + +import java.util.List; + +public class DynamicCompileMultiCatalogTransform extends AbstractMultiCatalogSupportTransform { + + public DynamicCompileMultiCatalogTransform( + List inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + protected SeaTunnelTransform buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config) { + return new DynamicCompileTransform(config, inputCatalogTable); + } + + @Override + public String getPluginName() { + return DynamicCompileTransform.PLUGIN_NAME; + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java index 195102c4d91..7df0f88ca3f 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java @@ -18,11 +18,11 @@ package org.apache.seatunnel.transform.dynamiccompile; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import com.google.auto.service.AutoService; @@ -47,12 +47,15 @@ public OptionRule optionRule() { DynamicCompileTransformConfig.COMPILE_PATTERN, CompilePattern.ABSOLUTE_PATH, DynamicCompileTransformConfig.ABSOLUTE_PATH) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) .build(); } @Override public TableTransform createTransform(TableTransformFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTables().get(0); - return () -> new DynamicCompileTransform(context.getOptions(), catalogTable); + return () -> + new DynamicCompileMultiCatalogTransform( + context.getCatalogTables(), context.getOptions()); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java index b35df6a448b..fc57882d702 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java @@ -17,18 +17,28 @@ package org.apache.seatunnel.transform.exception; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.seatunnel.common.exception.CommonError; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.GET_CATALOG_TABLES_WITH_NOT_EXIST_FIELDS_AND_TABLES_ERROR; +import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.GET_CATALOG_TABLE_WITH_NOT_EXIST_FIELDS_ERROR; +import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.GET_CATALOG_TABLE_WITH_NOT_EXIST_TABLES_ERROR; import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND; import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND; +import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_TABLE_FIELD_NOT_FOUND; +import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_TABLE_NOT_FOUND; /** The common error of SeaTunnel transform. Please refer {@link CommonError} */ public class TransformCommonError { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static TransformException cannotFindInputFieldError(String transform, String field) { Map params = new HashMap<>(); params.put("field", field); @@ -43,4 +53,58 @@ public static TransformException cannotFindInputFieldsError( params.put("transform", transform); return new TransformException(INPUT_FIELDS_NOT_FOUND, params); } + + public static TransformException cannotFindInputTableFieldError( + String transform, String table, String field) { + Map params = new HashMap<>(); + params.put("table", table); + params.put("field", field); + params.put("transform", transform); + return new TransformException(INPUT_TABLE_FIELD_NOT_FOUND, params); + } + + public static TransformException cannotFindInputTableError(String transform, String table) { + Map params = new HashMap<>(); + params.put("table", table); + params.put("transform", transform); + return new TransformException(INPUT_TABLE_NOT_FOUND, params); + } + + public static TransformException getCatalogTableWithNotExistFields( + String transform, Map> fields) { + Map params = new HashMap<>(); + params.put("transform", transform); + try { + params.put("tableNotExistedFields", OBJECT_MAPPER.writeValueAsString(fields)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + return new TransformException(GET_CATALOG_TABLE_WITH_NOT_EXIST_FIELDS_ERROR, params); + } + + public static TransformException getCatalogTableWithNotExistTables( + String transform, List tables) { + Map params = new HashMap<>(); + params.put("transform", transform); + try { + params.put("tables", OBJECT_MAPPER.writeValueAsString(tables)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + return new TransformException(GET_CATALOG_TABLE_WITH_NOT_EXIST_TABLES_ERROR, params); + } + + public static TransformException getCatalogTableWithNotExistFieldsAndTables( + String transform, List tables, Map> fields) { + Map params = new HashMap<>(); + params.put("transform", transform); + try { + params.put("tableNotExistedFields", OBJECT_MAPPER.writeValueAsString(fields)); + params.put("tables", OBJECT_MAPPER.writeValueAsString(tables)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + return new TransformException( + GET_CATALOG_TABLES_WITH_NOT_EXIST_FIELDS_AND_TABLES_ERROR, params); + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java index dc5008ec040..bd3d88a54b2 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java @@ -25,7 +25,21 @@ public enum TransformCommonErrorCode implements SeaTunnelErrorCode { "The input field '' of '' transform not found in upstream schema"), INPUT_FIELDS_NOT_FOUND( "TRANSFORM_COMMON-02", - "The input fields '' of '' transform not found in upstream schema"); + "The input fields '' of '' transform not found in upstream schema"), + INPUT_TABLE_NOT_FOUND( + "TRANSFORM_COMMON-03", + "The input table '' of '' transform not found in upstream schema"), + INPUT_TABLE_FIELD_NOT_FOUND( + "TRANSFORM_COMMON-04", + "The input field ''of table '
' of '' transform not found in upstream schema"), + GET_CATALOG_TABLE_WITH_NOT_EXIST_FIELDS_ERROR( + "TRANSFORM_COMMON-05", + "The '' upstream schema not exist fields: ''"), + GET_CATALOG_TABLE_WITH_NOT_EXIST_TABLES_ERROR( + "TRANSFORM_COMMON-06", "The '' upstream schema not exist tables ''"), + GET_CATALOG_TABLES_WITH_NOT_EXIST_FIELDS_AND_TABLES_ERROR( + "TRANSFORM_COMMON-07", + "The '' upstream schema not exist table '',upstream schema not exist fields: ''"); private final String code; private final String description; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformExceptionUtil.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformExceptionUtil.java new file mode 100644 index 00000000000..4cd8fd4a255 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformExceptionUtil.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +public class TransformExceptionUtil { + + public static void withErrorCheck( + String transformName, Iterator keys, Consumer execute) { + Map> notExistedColumn = new LinkedHashMap<>(); + Set notExistedTable = new LinkedHashSet<>(); + while (keys.hasNext()) { + try { + execute.accept(keys.next()); + } catch (SeaTunnelRuntimeException e) { + if (e.getSeaTunnelErrorCode() + .equals(TransformCommonErrorCode.INPUT_TABLE_FIELD_NOT_FOUND)) { + String field = e.getParams().get("field"); + String table = e.getParams().get("table"); + if (!notExistedColumn.containsKey(table)) { + notExistedColumn.put(table, new ArrayList<>()); + } + notExistedColumn.get(table).add(field); + } else if (e.getSeaTunnelErrorCode() + .equals(TransformCommonErrorCode.INPUT_TABLE_NOT_FOUND)) { + notExistedTable.add(e.getParams().get("table")); + } else { + throw e; + } + } + } + if (!notExistedTable.isEmpty() && !notExistedColumn.isEmpty()) { + throw TransformCommonError.getCatalogTableWithNotExistFieldsAndTables( + transformName, new ArrayList<>(notExistedTable), notExistedColumn); + } else if (!notExistedColumn.isEmpty()) { + throw TransformCommonError.getCatalogTableWithNotExistFields( + transformName, notExistedColumn); + } else if (!notExistedTable.isEmpty()) { + throw TransformCommonError.getCatalogTableWithNotExistTables( + transformName, new ArrayList<>(notExistedTable)); + } + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperMultiCatalogTransform.java new file mode 100644 index 00000000000..10ae1c62259 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperMultiCatalogTransform.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.fieldmapper; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogSupportTransform; + +import java.util.List; + +public class FieldMapperMultiCatalogTransform extends AbstractMultiCatalogSupportTransform { + + public FieldMapperMultiCatalogTransform( + List inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + public String getPluginName() { + return FieldMapperTransform.PLUGIN_NAME; + } + + @Override + protected SeaTunnelTransform buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config) { + return new FieldMapperTransform(FieldMapperTransformConfig.of(config), inputCatalogTable); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java index b7382175ba4..ab494b50711 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java @@ -19,11 +19,11 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import com.google.auto.service.AutoService; @@ -36,15 +36,16 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { - return OptionRule.builder().required(FieldMapperTransformConfig.FIELD_MAPPER).build(); + return OptionRule.builder() + .required(FieldMapperTransformConfig.FIELD_MAPPER) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) + .build(); } @Override public TableTransform createTransform(TableTransformFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTables().get(0); ReadonlyConfig options = context.getOptions(); - FieldMapperTransformConfig fieldMapperTransformConfig = - FieldMapperTransformConfig.of(options); - return () -> new FieldMapperTransform(fieldMapperTransformConfig, catalogTable); + return () -> new FieldMapperMultiCatalogTransform(context.getCatalogTables(), options); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java new file mode 100644 index 00000000000..a9a152b54ee --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FieldFieldMultiCatalogTransform.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.filter; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogSupportTransform; + +import java.util.List; + +public class FieldFieldMultiCatalogTransform extends AbstractMultiCatalogSupportTransform { + + public FieldFieldMultiCatalogTransform( + List inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + public String getPluginName() { + return FilterFieldTransform.PLUGIN_NAME; + } + + @Override + protected SeaTunnelTransform buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config) { + return new FilterFieldTransform(config, inputCatalogTable); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java index ebffe554dde..31795e1870a 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformConfig.java @@ -29,6 +29,7 @@ @Getter @Setter public class FilterFieldTransformConfig implements Serializable { + public static final Option> INCLUDE_FIELDS = Options.key("include_fields") .listType() diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java index e8a63275fa3..ef0d0545037 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java @@ -18,11 +18,11 @@ package org.apache.seatunnel.transform.filter; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import com.google.auto.service.AutoService; @@ -41,12 +41,15 @@ public OptionRule optionRule() { .exclusive( FilterFieldTransformConfig.INCLUDE_FIELDS, FilterFieldTransformConfig.EXCLUDE_FIELDS) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) .build(); } @Override public TableTransform createTransform(TableTransformFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTables().get(0); - return () -> new FilterFieldTransform(context.getOptions(), catalogTable); + return () -> + new FieldFieldMultiCatalogTransform( + context.getCatalogTables(), context.getOptions()); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FieldRowKindMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FieldRowKindMultiCatalogTransform.java new file mode 100644 index 00000000000..a3c3725fa3e --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FieldRowKindMultiCatalogTransform.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.filterrowkind; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogSupportTransform; + +import java.util.List; + +public class FieldRowKindMultiCatalogTransform extends AbstractMultiCatalogSupportTransform { + + public FieldRowKindMultiCatalogTransform( + List inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + public String getPluginName() { + return FilterRowKindTransform.PLUGIN_NAME; + } + + @Override + protected SeaTunnelTransform buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config) { + return new FilterRowKindTransform(config, inputCatalogTable); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java index 2191e30bc52..5d29325794c 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java @@ -18,11 +18,11 @@ package org.apache.seatunnel.transform.filterrowkind; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import com.google.auto.service.AutoService; @@ -39,12 +39,15 @@ public OptionRule optionRule() { .exclusive( FilterRowKinkTransformConfig.EXCLUDE_KINDS, FilterRowKinkTransformConfig.INCLUDE_KINDS) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) .build(); } @Override public TableTransform createTransform(TableTransformFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTables().get(0); - return () -> new FilterRowKindTransform(context.getOptions(), catalogTable); + return () -> + new FieldRowKindMultiCatalogTransform( + context.getCatalogTables(), context.getOptions()); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java new file mode 100644 index 00000000000..91825a3873e --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathMultiCatalogTransform.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.jsonpath; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogSupportTransform; + +import java.util.List; + +public class JsonPathMultiCatalogTransform extends AbstractMultiCatalogSupportTransform { + public JsonPathMultiCatalogTransform( + List inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + protected SeaTunnelTransform buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config) { + return new JsonPathTransform(JsonPathTransformConfig.of(config), inputCatalogTable); + } + + @Override + public String getPluginName() { + return "JsonPath"; + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java index 51a0d6ac34a..ac5458fe65e 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java @@ -23,8 +23,8 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.transform.common.CommonOptions; import org.apache.seatunnel.transform.common.ErrorHandleWay; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import org.apache.seatunnel.transform.exception.TransformException; import org.apache.commons.lang3.StringUtils; @@ -92,7 +92,8 @@ public static JsonPathTransformConfig of(ReadonlyConfig config) { throw new TransformException( COLUMNS_MUST_NOT_EMPTY, COLUMNS_MUST_NOT_EMPTY.getErrorMessage()); } - ErrorHandleWay rowErrorHandleWay = config.get(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION); + ErrorHandleWay rowErrorHandleWay = + config.get(TransformCommonOptions.ROW_ERROR_HANDLE_WAY_OPTION); List> columns = config.get(COLUMNS); List configs = new ArrayList<>(columns.size()); for (Map map : columns) { @@ -102,7 +103,10 @@ public static JsonPathTransformConfig of(ReadonlyConfig config) { String destField = map.get(DEST_FIELD.key()); String type = map.getOrDefault(DEST_TYPE.key(), DEST_TYPE.defaultValue()); ErrorHandleWay columnErrorHandleWay = - Optional.ofNullable(map.get(CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key())) + Optional.ofNullable( + map.get( + TransformCommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION + .key())) .map(ErrorHandleWay::valueOf) .orElse(null); diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java index def17f2564d..037a823fa82 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java @@ -17,14 +17,12 @@ package org.apache.seatunnel.transform.jsonpath; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; -import org.apache.seatunnel.transform.common.CommonOptions; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import com.google.auto.service.AutoService; @@ -39,15 +37,15 @@ public String factoryIdentifier() { public OptionRule optionRule() { return OptionRule.builder() .required(JsonPathTransformConfig.COLUMNS) - .optional(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) + .optional(TransformCommonOptions.ROW_ERROR_HANDLE_WAY_OPTION) .build(); } @Override public TableTransform createTransform(TableTransformFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTables().get(0); - ReadonlyConfig options = context.getOptions(); - JsonPathTransformConfig jsonPathTransformConfig = JsonPathTransformConfig.of(options); - return () -> new JsonPathTransform(jsonPathTransformConfig, catalogTable); + return () -> + new JsonPathMultiCatalogTransform(context.getCatalogTables(), context.getOptions()); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingMultiCatalogTransform.java new file mode 100644 index 00000000000..105a1500952 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingMultiCatalogTransform.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.nlpmodel.embadding; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogSupportTransform; + +import java.util.List; + +public class EmbeddingMultiCatalogTransform extends AbstractMultiCatalogSupportTransform { + public EmbeddingMultiCatalogTransform( + List inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + protected SeaTunnelTransform buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config) { + return new EmbeddingTransform(config, inputCatalogTable); + } + + @Override + public String getPluginName() { + return "Embedding"; + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransformFactory.java index db3464e9291..a7b182bf281 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransformFactory.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import org.apache.seatunnel.transform.nlpmodel.ModelProvider; import org.apache.seatunnel.transform.nlpmodel.llm.LLMTransformConfig; @@ -61,12 +62,15 @@ public OptionRule optionRule() { LLMTransformConfig.MODEL_PROVIDER, ModelProvider.CUSTOM, LLMTransformConfig.CustomRequestConfig.CUSTOM_CONFIG) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) .build(); } @Override public TableTransform createTransform(TableTransformFactoryContext context) { return () -> - new EmbeddingTransform(context.getOptions(), context.getCatalogTables().get(0)); + new EmbeddingMultiCatalogTransform( + context.getCatalogTables(), context.getOptions()); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMMultiCatalogTransform.java new file mode 100644 index 00000000000..5684ba1e693 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMMultiCatalogTransform.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.nlpmodel.llm; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogSupportTransform; + +import java.util.List; + +public class LLMMultiCatalogTransform extends AbstractMultiCatalogSupportTransform { + public LLMMultiCatalogTransform(List inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + protected SeaTunnelTransform buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config) { + return new LLMTransform(config, inputCatalogTable); + } + + @Override + public String getPluginName() { + return "LLM"; + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransformFactory.java index 834c0b4d174..ee500321a5c 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransformFactory.java @@ -20,11 +20,11 @@ import org.apache.seatunnel.shade.com.google.common.collect.Lists; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import org.apache.seatunnel.transform.nlpmodel.ModelProvider; import com.google.auto.service.AutoService; @@ -64,12 +64,13 @@ public OptionRule optionRule() { LLMTransformConfig.MODEL_PROVIDER, ModelProvider.CUSTOM, LLMTransformConfig.CustomRequestConfig.CUSTOM_CONFIG) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) .build(); } @Override public TableTransform createTransform(TableTransformFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTables().get(0); - return () -> new LLMTransform(context.getOptions(), catalogTable); + return () -> new LLMMultiCatalogTransform(context.getCatalogTables(), context.getOptions()); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceMultiCatalogTransform.java new file mode 100644 index 00000000000..266c8accec4 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceMultiCatalogTransform.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.replace; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogSupportTransform; + +import java.util.List; + +public class ReplaceMultiCatalogTransform extends AbstractMultiCatalogSupportTransform { + + public ReplaceMultiCatalogTransform( + List inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + public String getPluginName() { + return "Replace"; + } + + @Override + protected SeaTunnelTransform buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config) { + return new ReplaceTransform(config, inputCatalogTable); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java index 97630080e23..0479cb8b457 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformConfig.java @@ -17,11 +17,21 @@ package org.apache.seatunnel.transform.replace; +import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonAlias; + import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import lombok.Data; +import lombok.Getter; +import lombok.Setter; import java.io.Serializable; +import java.util.List; +@Getter +@Setter public class ReplaceTransformConfig implements Serializable { public static final Option KEY_REPLACE_FIELD = @@ -53,4 +63,47 @@ public class ReplaceTransformConfig implements Serializable { .booleanType() .noDefaultValue() .withDescription("Replace the first match string"); + + public static final Option> MULTI_TABLES = + Options.key("table_transform") + .listType(TableTransforms.class) + .noDefaultValue() + .withDescription(""); + + private String replaceField; + private String pattern; + private String replacement; + private Boolean isRegex; + private Boolean replaceFirst; + + @Data + public static class TableTransforms implements Serializable { + @JsonAlias("table_path") + private String tablePath; + + @JsonAlias("replace_field") + private String replaceField; + + @JsonAlias("pattern") + private String pattern; + + @JsonAlias("replacement") + private String replacement; + + @JsonAlias("is_regex") + private Boolean isRegex; + + @JsonAlias("replace_first") + private Boolean replaceFirst; + } + + public static ReplaceTransformConfig of(ReadonlyConfig config) { + ReplaceTransformConfig replaceTransformConfig = new ReplaceTransformConfig(); + replaceTransformConfig.setReplaceField(config.get(KEY_REPLACE_FIELD)); + replaceTransformConfig.setPattern(config.get(KEY_PATTERN)); + replaceTransformConfig.setReplacement(config.get(KEY_REPLACEMENT)); + replaceTransformConfig.setIsRegex(config.get(KEY_IS_REGEX)); + replaceTransformConfig.setReplaceFirst(config.get(KEY_REPLACE_FIRST)); + return replaceTransformConfig; + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java index c0bed8977da..141511a15f2 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java @@ -18,11 +18,11 @@ package org.apache.seatunnel.transform.replace; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import com.google.auto.service.AutoService; @@ -45,12 +45,14 @@ public OptionRule optionRule() { ReplaceTransformConfig.KEY_IS_REGEX, true, ReplaceTransformConfig.KEY_REPLACE_FIRST) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) .build(); } @Override public TableTransform createTransform(TableTransformFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTables().get(0); - return () -> new ReplaceTransform(context.getOptions(), catalogTable); + return () -> + new ReplaceMultiCatalogTransform(context.getCatalogTables(), context.getOptions()); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitMultiCatalogTransform.java new file mode 100644 index 00000000000..e501031066f --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitMultiCatalogTransform.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.split; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogSupportTransform; + +import java.util.List; + +public class SplitMultiCatalogTransform extends AbstractMultiCatalogSupportTransform { + + public SplitMultiCatalogTransform( + List inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + public String getPluginName() { + return SplitTransform.PLUGIN_NAME; + } + + @Override + protected SeaTunnelTransform buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config) { + return new SplitTransform(SplitTransformConfig.of(config), inputCatalogTable); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java index c1ead2dd0b5..34078bbf40d 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java @@ -31,6 +31,7 @@ import java.util.Arrays; public class SplitTransform extends MultipleFieldOutputTransform { + public static String PLUGIN_NAME = "Split"; private final SplitTransformConfig splitTransformConfig; private final int splitFieldIndex; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java index 32d660860b9..b8e0c039594 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java @@ -18,11 +18,11 @@ package org.apache.seatunnel.transform.split; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import com.google.auto.service.AutoService; @@ -40,13 +40,14 @@ public OptionRule optionRule() { SplitTransformConfig.KEY_SEPARATOR, SplitTransformConfig.KEY_SPLIT_FIELD, SplitTransformConfig.KEY_OUTPUT_FIELDS) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) .build(); } @Override public TableTransform createTransform(TableTransformFactoryContext context) { - SplitTransformConfig splitTransformConfig = SplitTransformConfig.of(context.getOptions()); - CatalogTable catalogTable = context.getCatalogTables().get(0); - return () -> new SplitTransform(splitTransformConfig, catalogTable); + return () -> + new SplitMultiCatalogTransform(context.getCatalogTables(), context.getOptions()); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogTransform.java new file mode 100644 index 00000000000..5d093d8fb0e --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLMultiCatalogTransform.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.sql; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.transform.SeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractMultiCatalogSupportTransform; + +import java.util.List; + +public class SQLMultiCatalogTransform extends AbstractMultiCatalogSupportTransform { + + public SQLMultiCatalogTransform(List inputCatalogTables, ReadonlyConfig config) { + super(inputCatalogTables, config); + } + + @Override + public String getPluginName() { + return SQLTransform.PLUGIN_NAME; + } + + @Override + protected SeaTunnelTransform buildTransform( + CatalogTable inputCatalogTable, ReadonlyConfig config) { + return new SQLTransform(config, inputCatalogTable); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java index 5c4abf53c07..973ce5cae48 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java @@ -18,11 +18,11 @@ package org.apache.seatunnel.transform.sql; import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import com.google.auto.service.AutoService; @@ -37,12 +37,15 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { - return OptionRule.builder().required(KEY_QUERY).build(); + return OptionRule.builder() + .required(KEY_QUERY) + .optional(TransformCommonOptions.MULTI_TABLES) + .optional(TransformCommonOptions.TABLE_MATCH_REGEX) + .build(); } @Override public TableTransform createTransform(TableTransformFactoryContext context) { - CatalogTable catalogTable = context.getCatalogTables().get(0); - return () -> new SQLTransform(context.getOptions(), catalogTable); + return () -> new SQLMultiCatalogTransform(context.getCatalogTables(), context.getOptions()); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java index 993b4e0a3c2..ba1797ccdc2 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java @@ -24,9 +24,6 @@ import org.apache.seatunnel.transform.exception.TransformException; import org.apache.seatunnel.transform.sql.SQLEngine; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.expression.Expression; import net.sf.jsqlparser.parser.CCJSqlParserUtil; @@ -49,7 +46,6 @@ import java.util.stream.Collectors; public class ZetaSQLEngine implements SQLEngine { - private static final Logger log = LoggerFactory.getLogger(ZetaSQLEngine.class); public static final String ESCAPE_IDENTIFIER = "`"; private String inputTableName; @@ -122,15 +118,11 @@ private void validateSQL(Statement statement) { if (table.getAlias() != null) { throw new IllegalArgumentException("Unsupported table alias name syntax"); } - String tableName = table.getName(); - if (!inputTableName.equalsIgnoreCase(tableName) - && !tableName.equalsIgnoreCase(catalogTableName)) { - log.warn( - "SQL table name {} is not equal to input table name {} or catalog table name {}", - tableName, - inputTableName, - catalogTableName); - } + // String tableName = table.getName(); + // if (!inputTableName.equalsIgnoreCase(tableName)) { + // throw new IllegalArgumentException( + // String.format("Table name: %s not found", tableName)); + // } } else { throw new IllegalArgumentException("Unsupported sub table syntax"); } @@ -195,6 +187,7 @@ public SeaTunnelRowType typeMapping(List inputColumnsMapping) { } else if (selectItem instanceof SelectExpressionItem) { SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem; Expression expression = expressionItem.getExpression(); + if (expressionItem.getAlias() != null) { String aliasName = expressionItem.getAlias().getName(); if (aliasName.startsWith(ESCAPE_IDENTIFIER) diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java index 51c0c0ac306..525efd87d56 100644 --- a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java +++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java @@ -25,8 +25,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.transform.common.CommonOptions; import org.apache.seatunnel.transform.common.ErrorHandleWay; +import org.apache.seatunnel.transform.common.TransformCommonOptions; import org.apache.seatunnel.transform.exception.ErrorDataTransformException; import org.apache.seatunnel.transform.jsonpath.JsonPathTransform; import org.apache.seatunnel.transform.jsonpath.JsonPathTransformConfig; @@ -101,7 +101,7 @@ public void testErrorHandleWay() { "$.f1", JsonPathTransformConfig.DEST_FIELD.key(), "f1", - CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(), + TransformCommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(), ErrorHandleWay.FAIL.name()))); config = ReadonlyConfig.fromMap(configMap); transform = new JsonPathTransform(JsonPathTransformConfig.of(config), table); @@ -121,7 +121,7 @@ public void testErrorHandleWay() { "$.f1", JsonPathTransformConfig.DEST_FIELD.key(), "f1", - CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(), + TransformCommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(), ErrorHandleWay.SKIP.name()))); config = ReadonlyConfig.fromMap(configMap); transform = new JsonPathTransform(JsonPathTransformConfig.of(config), table); @@ -140,7 +140,7 @@ public void testErrorHandleWay() { "$.f1", JsonPathTransformConfig.DEST_FIELD.key(), "f1", - CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(), + TransformCommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(), ErrorHandleWay.SKIP_ROW.name()))); config = ReadonlyConfig.fromMap(configMap); transform = new JsonPathTransform(JsonPathTransformConfig.of(config), table); @@ -148,7 +148,9 @@ public void testErrorHandleWay() { outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\": 1}"})); Assertions.assertNull(outputRow); - configMap.put(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(), ErrorHandleWay.SKIP.name()); + configMap.put( + TransformCommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(), + ErrorHandleWay.SKIP.name()); configMap.put( JsonPathTransformConfig.COLUMNS.key(), Arrays.asList( @@ -162,7 +164,9 @@ public void testErrorHandleWay() { outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\": 1}"})); Assertions.assertNull(outputRow); - configMap.put(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(), ErrorHandleWay.SKIP.name()); + configMap.put( + TransformCommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(), + ErrorHandleWay.SKIP.name()); configMap.put( JsonPathTransformConfig.COLUMNS.key(), Arrays.asList( @@ -173,7 +177,7 @@ public void testErrorHandleWay() { "$.f1", JsonPathTransformConfig.DEST_FIELD.key(), "f1", - CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(), + TransformCommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(), ErrorHandleWay.FAIL.name()))); config = ReadonlyConfig.fromMap(configMap); transform = new JsonPathTransform(JsonPathTransformConfig.of(config), table); @@ -185,7 +189,9 @@ public void testErrorHandleWay() { // ignore } - configMap.put(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(), ErrorHandleWay.FAIL.name()); + configMap.put( + TransformCommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(), + ErrorHandleWay.FAIL.name()); configMap.put( JsonPathTransformConfig.COLUMNS.key(), Arrays.asList( @@ -196,7 +202,7 @@ public void testErrorHandleWay() { "$.f1", JsonPathTransformConfig.DEST_FIELD.key(), "f1", - CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(), + TransformCommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(), ErrorHandleWay.SKIP.name()))); config = ReadonlyConfig.fromMap(configMap); transform = new JsonPathTransform(JsonPathTransformConfig.of(config), table); @@ -205,7 +211,9 @@ public void testErrorHandleWay() { Assertions.assertNotNull(outputRow); Assertions.assertNull(outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("f1"))); - configMap.put(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(), ErrorHandleWay.FAIL.name()); + configMap.put( + TransformCommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(), + ErrorHandleWay.FAIL.name()); configMap.put( JsonPathTransformConfig.COLUMNS.key(), Arrays.asList( @@ -216,7 +224,7 @@ public void testErrorHandleWay() { "$.f1", JsonPathTransformConfig.DEST_FIELD.key(), "f1", - CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(), + TransformCommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(), ErrorHandleWay.SKIP_ROW.name()))); config = ReadonlyConfig.fromMap(configMap); transform = new JsonPathTransform(JsonPathTransformConfig.of(config), table);