Skip to content

Commit

Permalink
[Feature][Transform-V2] Support transform with multi-table
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X committed Oct 25, 2024
1 parent 10c37ac commit 405ed76
Show file tree
Hide file tree
Showing 41 changed files with 1,152 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@

import java.io.Serializable;
import java.net.URL;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -307,16 +306,15 @@ public static OptionRule sinkFullOptionRule(@NonNull TableSinkFactory factory) {
return sinkOptionRule;
}

public static SeaTunnelTransform<?> createAndPrepareTransform(
CatalogTable catalogTable,
public static SeaTunnelTransform<?> createAndPrepareMultiTableTransform(
List<CatalogTable> catalogTables,
ReadonlyConfig options,
ClassLoader classLoader,
String factoryIdentifier) {
final TableTransformFactory factory =
discoverFactory(classLoader, TableTransformFactory.class, factoryIdentifier);
TableTransformFactoryContext context =
new TableTransformFactoryContext(
Collections.singletonList(catalogTable), options, classLoader);
new TableTransformFactoryContext(catalogTables, options, classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createTransform(context).createTransform();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.source.SeaTunnelJobAware;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;

import java.io.Serializable;
import java.util.List;

public interface SeaTunnelTransform<T>
extends Serializable, PluginIdentifierInterface, SeaTunnelJobAware {
Expand Down Expand Up @@ -53,6 +55,12 @@ default void setTypeInfo(SeaTunnelDataType<T> inputDataType) {
*/
T map(T row);

List<CatalogTable> getProducedCatalogTables();

default SchemaChangeEvent mapSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) {
return schemaChangeEvent;
}

/** call it when Transformer completed */
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ public class SeaTunnelConnectorTest extends TestSuiteBase implements TestResourc
};

// Match paimon source and paimon sink
private static final Pattern pattern1 =
private static final Pattern PATTERN1 =
Pattern.compile(
"(Paimon (source|sink))(.*?)(?=(Paimon (source|sink)|$))", Pattern.DOTALL);
// Match required options and optional options
private static final Pattern pattern2 =
private static final Pattern PATTERN2 =
Pattern.compile("Required Options:(.*?)(?:Optional Options: (.*?))?$", Pattern.DOTALL);

@Override
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testExecCheck(TestContainer container) throws Exception {
}

private void checkStdOutForOptionRule(String stdout) {
Matcher matcher1 = pattern1.matcher(stdout.trim());
Matcher matcher1 = PATTERN1.matcher(stdout.trim());
String paimonSourceContent = StringUtils.EMPTY;
String paimonSinkContent = StringUtils.EMPTY;
Assertions.assertTrue(matcher1.groupCount() >= 3);
Expand All @@ -153,7 +153,7 @@ private void checkStdOutForOptionRule(String stdout) {

private void checkStdOutForOptionRuleOfSinglePluginTypeWithTransform(
String stdout, Factory factory) {
Matcher matcher2 = pattern2.matcher(stdout.trim());
Matcher matcher2 = PATTERN2.matcher(stdout.trim());
Assertions.assertTrue(matcher2.find());
Assertions.assertTrue(matcher2.groupCount() >= 2);
OptionRule optionRule = factory.optionRule();
Expand All @@ -169,11 +169,11 @@ private void checkStdOutForOptionRuleOfSinglePluginTypeWithTransform(
optionRule.getOptionalOptions().size(),
StringUtils.isBlank(optionalOptions)
? 0
: optionalOptions.split(StringUtils.LF).length);
: optionalOptions.trim().split(StringUtils.LF).length);
}

private void checkStdOutForOptionRuleOfSinglePluginTypeWithConnector(String stdout) {
Matcher matcher1 = pattern1.matcher(stdout.trim());
Matcher matcher1 = PATTERN1.matcher(stdout.trim());
Assertions.assertTrue(matcher1.find());
Assertions.assertTrue(matcher1.groupCount() >= 3);
String paimonPluginContent = matcher1.group(3).trim();
Expand All @@ -187,7 +187,7 @@ private void checkStdOutForOptionRuleOfSinglePluginTypeWithConnector(String stdo
}

private void checkOptionRuleOfSinglePluginType(Factory factory, String optionRules) {
Matcher matcher2 = pattern2.matcher(optionRules);
Matcher matcher2 = PATTERN2.matcher(optionRules);
Assertions.assertTrue(matcher2.find());
Assertions.assertTrue(matcher2.groupCount() >= 2);
String requiredOptions = matcher2.group(1).trim();
Expand All @@ -205,7 +205,7 @@ private void checkOptionRuleOfSinglePluginType(Factory factory, String optionRul
optionRule.getOptionalOptions().size(),
StringUtils.isBlank(optionalOptions)
? 0
: optionalOptions.split(StringUtils.LF).length);
: optionalOptions.trim().split(StringUtils.LF).length);
}

private void checkResultForCase1(Container.ExecResult execResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ public void testCopy(TestContainer container) throws IOException, InterruptedExc
Container.ExecResult execResult = container.executeJob("/copy_transform.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@TestTemplate
public void testCopyMultiTable(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/copy_transform_multi_table.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
c_row = {
c_row = {
c_int = int
}
}
}
}
}
}

transform {
Copy {
source_table_name = "fake"
result_table_name = "fake1"
src_field = "name"
dest_field = "name1"
}
Copy {
source_table_name = "fake1"
result_table_name = "fake2"
fields {
id_1 = "id"
name2 = "name"
name3 = "name"
c_row_1 = "c_row"
}
}
}

sink {
Assert {
source_table_name = "fake2"
rules =
{
row_rules = [
{
rule_type = MIN_ROW
rule_value = 100
}
],
field_rules = [
{
field_name = id
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = id_1
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
{
field_name = name1
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
{
field_name = name2
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
{
field_name = name3
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -423,13 +423,6 @@ private void parseTransform(
inputIds.stream()
.map(tableWithActionMap::get)
.filter(Objects::nonNull)
.peek(
input -> {
if (input.size() > 1) {
throw new JobDefineCheckException(
"Adding transform to multi-table source is not supported.");
}
})
.flatMap(Collection::stream)
.collect(Collectors.toList());
if (inputs.isEmpty()) {
Expand All @@ -450,14 +443,19 @@ private void parseTransform(
inputs.stream()
.map(Tuple2::_2)
.collect(Collectors.toCollection(LinkedHashSet::new));

LinkedHashSet<CatalogTable> catalogTables =
inputs.stream()
.map(Tuple2::_1)
.collect(Collectors.toCollection(LinkedHashSet::new));
checkProducedTypeEquals(inputActions);
int spareParallelism = inputs.get(0)._2().getParallelism();
int parallelism =
readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism);
CatalogTable catalogTable = inputs.get(0)._1();
SeaTunnelTransform<?> transform =
FactoryUtil.createAndPrepareTransform(
catalogTable, readonlyConfig, classLoader, factoryId);
FactoryUtil.createAndPrepareMultiTableTransform(
new ArrayList<>(catalogTables), readonlyConfig, classLoader, factoryId);

transform.setJobContext(jobConfig.getJobContext());
long id = idGenerator.getNextId();
String actionName = JobConfigParser.createTransformActionName(index, factoryId);
Expand All @@ -471,10 +469,15 @@ private void parseTransform(
jarUrls,
new HashSet<>());
transformAction.setParallelism(parallelism);
tableWithActionMap.put(
tableId,
Collections.singletonList(
new Tuple2<>(transform.getProducedCatalogTable(), transformAction)));

List<Tuple2<CatalogTable, Action>> actions = new ArrayList<>();
List<CatalogTable> producedCatalogTables = transform.getProducedCatalogTables();

for (CatalogTable catalogTable : producedCatalogTables) {
actions.add(new Tuple2<>(catalogTable, transformAction));
}

tableWithActionMap.put(tableId, actions);
}

public static SeaTunnelDataType<?> getProducedType(Action action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.engine.server.task.flow;

import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
Expand Down Expand Up @@ -84,6 +85,20 @@ public void received(Record<?> record) {
// ack after #addState
runningTask.ack(barrier);
collector.collect(record);
} else if (record.getData() instanceof SchemaChangeEvent) {
if (prepareClose) {
return;
}
SchemaChangeEvent event = (SchemaChangeEvent) record.getData();
for (SeaTunnelTransform<T> t : transform) {
event = t.mapSchemaChangeEvent(event);
if (event == null) {
break;
}
}
if (event != null) {
collector.collect(new Record<>(event));
}
} else {
if (prepareClose) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.List;

@Slf4j
public abstract class AbstractCatalogSupportTransform implements SeaTunnelTransform<SeaTunnelRow> {
protected final ErrorHandleWay rowErrorHandleWay;
Expand All @@ -35,7 +38,7 @@ public abstract class AbstractCatalogSupportTransform implements SeaTunnelTransf
protected volatile CatalogTable outputCatalogTable;

public AbstractCatalogSupportTransform(@NonNull CatalogTable inputCatalogTable) {
this(inputCatalogTable, CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.defaultValue());
this(inputCatalogTable, TransformCommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.defaultValue());
}

public AbstractCatalogSupportTransform(
Expand Down Expand Up @@ -85,6 +88,11 @@ public CatalogTable getProducedCatalogTable() {
return outputCatalogTable;
}

@Override
public List<CatalogTable> getProducedCatalogTables() {
return Collections.singletonList(getProducedCatalogTable());
}

private CatalogTable transformCatalogTable() {
TableIdentifier tableIdentifier = transformTableIdentifier();
TableSchema tableSchema = transformTableSchema();
Expand Down
Loading

0 comments on commit 405ed76

Please sign in to comment.