From 6a603a0a861a24cef819d5719e6e4e8002b13d49 Mon Sep 17 00:00:00 2001 From: zhangdonghao Date: Thu, 24 Oct 2024 18:01:31 +0800 Subject: [PATCH] [Feature][Transform-v2] Add metadata transform --- plugin-mapping.properties | 1 + .../api/table/type/MetadataUtil.java | 84 ++++++++++ .../api/table/type/SeaTunnelRow.java | 7 + .../cdc/base/utils/SourceRecordUtils.java | 7 +- ...SeaTunnelRowDebeziumDeserializeSchema.java | 16 +- .../connector-cdc-mysql-e2e/pom.xml | 14 ++ .../seatunnel/cdc/mysql/MysqlCDCIT.java | 30 ++++ .../resources/mysqlcdc_to_metadata_trans.conf | 102 ++++++++++++ .../common/SeaTunnelRowAccessor.java | 4 + .../exception/TransformCommonError.java | 16 ++ .../exception/TransformCommonErrorCode.java | 8 +- .../transform/metadata/MetadataTransform.java | 156 ++++++++++++++++++ .../metadata/MetadataTransformConfig.java | 36 ++++ .../metadata/MetadataTransformFactory.java | 44 +++++ .../metadata/MetadataTransformTest.java | 132 +++++++++++++++ 15 files changed, 651 insertions(+), 6 deletions(-) create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf create mode 100644 seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java create mode 100644 seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java create mode 100644 seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java create mode 100644 seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java diff --git a/plugin-mapping.properties b/plugin-mapping.properties index e314ef86613..aca1c6c7731 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -152,3 +152,4 @@ seatunnel.transform.DynamicCompile = seatunnel-transforms-v2 seatunnel.transform.LLM = seatunnel-transforms-v2 seatunnel.transform.Embedding = seatunnel-transforms-v2 seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2 +seatunnel.transform.Metadata = seatunnel-transforms-v2 diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java new file mode 100644 index 00000000000..4324e35e979 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.type; + +import org.apache.seatunnel.api.table.catalog.TablePath; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +public class MetadataUtil { + + public static final String DATABASE = "database"; + public static final String TABLE = "table"; + public static final String PARTITION = "partition"; + public static final String ROW_KIND = "rowKind"; + public static final String EVENT_TIME = "ts_ms"; + public static final String DELAY = "delay"; + + public static final List METADATA_FIELDS = + Arrays.asList(DELAY, PARTITION, DATABASE, TABLE, ROW_KIND, EVENT_TIME); + + public static void setDelay(SeaTunnelRow row, Long delay) { + row.getMetadata().put(DELAY, delay); + } + + public static void setPartition(SeaTunnelRow row, String[] partition) { + row.getMetadata().put(PARTITION, partition); + } + + public static void setEventTime(SeaTunnelRow row, Long delay) { + row.getMetadata().put(EVENT_TIME, delay); + } + + public static Long getDelay(SeaTunnelRow row) { + return (Long) row.getMetadata().get(DELAY); + } + + public static String getDatabase(SeaTunnelRow row) { + TablePath tablePath = TablePath.of(row.getTableId()); + return tablePath.getDatabaseName(); + } + + public static String getTable(SeaTunnelRow row) { + TablePath tablePath = TablePath.of(row.getTableId()); + return tablePath.getTableName(); + } + + public static String getRowKind(SeaTunnelRow row) { + return row.getRowKind().shortString(); + } + + public static String getPartitionStr(SeaTunnelRow row) { + Object partition = row.getMetadata().get(PARTITION); + return Objects.nonNull(partition) ? String.join(",", (String[]) partition) : null; + } + + public static String[] getPartition(SeaTunnelRow row) { + return (String[]) row.getMetadata().get(PARTITION); + } + + public static Long getEventTime(SeaTunnelRow row) { + return (Long) row.getMetadata().get(EVENT_TIME); + } + + public static boolean isMetadataField(String fieldName) { + return METADATA_FIELDS.contains(fieldName); + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java index 10a5b33a935..1bf5c5a6eed 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -33,6 +34,8 @@ public final class SeaTunnelRow implements Serializable { /** The array to store the actual internal format values. */ private final Object[] fields; + private Map metadata = new HashMap(); + private volatile int size; public SeaTunnelRow(int arity) { @@ -340,6 +343,10 @@ public int hashCode() { return result; } + public Map getMetadata() { + return metadata; + } + @Override public String toString() { return "SeaTunnelRow{" diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java index 3245273ace2..f7d162925d3 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java @@ -62,10 +62,9 @@ public static Object[] rowToArray(ResultSet rs, int size) throws SQLException { } /** - * Return the timestamp when the change event is produced in MySQL. - * - *

The field `source.ts_ms` in {@link SourceRecord} data struct is the time when the change - * event is operated in MySQL. + * In the source object, ts_ms indicates the time that the change was made in the database. By + * comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can + * determine the lag between the source database update and Debezium. */ public static Long getMessageTimestamp(SourceRecord record) { Schema schema = record.valueSchema(); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java index d09e7b77b5c..56093fd9377 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler; +import org.apache.seatunnel.api.table.type.MetadataUtil; import org.apache.seatunnel.api.table.type.MultipleRowType; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -173,26 +174,39 @@ private void deserializeDataChangeRecord(SourceRecord record, Collectortest + + org.apache.seatunnel + seatunnel-transforms-v2 + ${project.version} + test + + + + org.apache.seatunnel + connector-assert + ${project.version} + test + + org.testcontainers mysql diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java index bf7e8d8fe7c..29e2b655e7f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import static org.awaitility.Awaitility.await; @@ -180,6 +181,35 @@ public void testMysqlCdcCheckDataE2e(TestContainer container) { }); } + @TestTemplate + public void testMysqlCdcMetadataTrans(TestContainer container) + throws InterruptedException, IOException { + // Clear related content to ensure that multiple operations are not affected + clearTable(MYSQL_DATABASE, SOURCE_TABLE_1); + clearTable(MYSQL_DATABASE, SINK_TABLE); + + Long jobId = JobIdGenerator.newJobId(); + AtomicReference execResult = new AtomicReference<>(); + CompletableFuture.supplyAsync( + () -> { + try { + execResult.set( + container.executeJob( + "/mysqlcdc_to_metadata_trans.conf", String.valueOf(jobId))); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + // insert update delete + upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1); + TimeUnit.SECONDS.sleep(30); + Assertions.assertEquals(0, execResult.get().getExitCode(), execResult.get().getStderr()); + Container.ExecResult cancelJobResult = container.cancelJob(String.valueOf(jobId)); + Assertions.assertEquals(0, cancelJobResult.getExitCode(), cancelJobResult.getStderr()); + } + @TestTemplate public void testMysqlCdcCheckDataWithDisableExactlyonce(TestContainer container) { // Clear related content to ensure that multiple operations are not affected diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf new file mode 100644 index 00000000000..0db16b77f62 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second = 7000000 + read_limit.rows_per_second = 400 +} + +source { + MySQL-CDC { + result_table_name = "customers_mysql_cdc" + server-id = 5652 + username = "st_user_source" + password = "mysqlpw" + table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + } +} + +transform { + Metadata { + metadata_fields { + database = database + table = table + partition = partition + rowKind = rowKind + ts_ms = ts_ms + delay = delay + } + result_table_name = "trans_result" + } +} + +sink { + Assert { + source_table_name = "trans_result" + rules { + field_rules = [ + { + field_name = database + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = table + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = partition + field_type = string + }, { + field_name = rowKind + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = ts_ms + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = delay + field_type = long + } + ] + } + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java index 5b97f341686..dc624fa57aa 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java @@ -45,4 +45,8 @@ public Object getField(int pos) { public Object[] getFields() { return row.getFields(); } + + public SeaTunnelRow getRow() { + return row; + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java index b35df6a448b..51e4b8265fb 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java @@ -25,6 +25,8 @@ import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND; import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND; +import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.METADATA_FIELDS_NOT_FOUND; +import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.METADATA_MAPPING_FIELD_EXISTS; /** The common error of SeaTunnel transform. Please refer {@link CommonError} */ public class TransformCommonError { @@ -43,4 +45,18 @@ public static TransformException cannotFindInputFieldsError( params.put("transform", transform); return new TransformException(INPUT_FIELDS_NOT_FOUND, params); } + + public static TransformException cannotFindMetadataFieldError(String transform, String field) { + Map params = new HashMap<>(); + params.put("field", field); + params.put("transform", transform); + return new TransformException(METADATA_FIELDS_NOT_FOUND, params); + } + + public static TransformException metadataMappingFieldExists(String transform, String field) { + Map params = new HashMap<>(); + params.put("field", field); + params.put("transform", transform); + return new TransformException(METADATA_MAPPING_FIELD_EXISTS, params); + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java index dc5008ec040..b0d72d7cf19 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java @@ -25,7 +25,13 @@ public enum TransformCommonErrorCode implements SeaTunnelErrorCode { "The input field '' of '' transform not found in upstream schema"), INPUT_FIELDS_NOT_FOUND( "TRANSFORM_COMMON-02", - "The input fields '' of '' transform not found in upstream schema"); + "The input fields '' of '' transform not found in upstream schema"), + METADATA_FIELDS_NOT_FOUND( + "TRANSFORM_COMMON-03", + "The metadata fields '' of '' transform not found "), + METADATA_MAPPING_FIELD_EXISTS( + "TRANSFORM_COMMON-04", + "The metadata mapping field '' of '' transform already exists in upstream schema"); private final String code; private final String description; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java new file mode 100644 index 00000000000..c89713b3646 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.metadata; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.MetadataUtil; +import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform; +import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; +import org.apache.seatunnel.transform.exception.TransformCommonError; + +import com.google.common.annotations.VisibleForTesting; +import lombok.NonNull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class MetadataTransform extends MultipleFieldOutputTransform { + + private List fieldNames; + private Map metadataFieldMapping; + + public MetadataTransform(ReadonlyConfig config, @NonNull CatalogTable inputCatalogTable) { + super(inputCatalogTable); + initOutputFields(inputCatalogTable, config.get(MetadataTransformConfig.METADATA_FIELDS)); + } + + private void initOutputFields(CatalogTable inputCatalogTable, Map fields) { + List sourceTableFiledNames = + Arrays.asList(inputCatalogTable.getTableSchema().getFieldNames()); + List fieldNames = new ArrayList<>(); + for (Map.Entry field : fields.entrySet()) { + String srcField = field.getKey(); + if (!MetadataUtil.isMetadataField(srcField)) { + throw TransformCommonError.cannotFindMetadataFieldError(getPluginName(), srcField); + } + String targetField = field.getValue(); + if (sourceTableFiledNames.contains(targetField)) { + throw TransformCommonError.metadataMappingFieldExists(getPluginName(), srcField); + } + fieldNames.add(field.getKey()); + } + this.fieldNames = fieldNames; + this.metadataFieldMapping = fields; + } + + @Override + public String getPluginName() { + return MetadataTransformConfig.PLUGIN_NAME; + } + + @Override + protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) { + Object[] value = new Object[fieldNames.size()]; + for (Map.Entry mapping : metadataFieldMapping.entrySet()) { + String metadataFieldName = mapping.getKey(); + String mappingFieldName = mapping.getValue(); + int i = fieldNames.indexOf(mappingFieldName); + Object fieldValue = null; + switch (metadataFieldName) { + case MetadataUtil.DATABASE: + fieldValue = MetadataUtil.getDatabase(inputRow.getRow()); + break; + case MetadataUtil.TABLE: + fieldValue = MetadataUtil.getTable(inputRow.getRow()); + break; + case MetadataUtil.ROW_KIND: + fieldValue = MetadataUtil.getRowKind(inputRow.getRow()); + break; + case MetadataUtil.DELAY: + fieldValue = MetadataUtil.getDelay(inputRow.getRow()); + break; + case MetadataUtil.EVENT_TIME: + fieldValue = MetadataUtil.getEventTime(inputRow.getRow()); + break; + case MetadataUtil.PARTITION: + fieldValue = MetadataUtil.getPartitionStr(inputRow.getRow()); + break; + default: + throw TransformCommonError.cannotFindMetadataFieldError( + getPluginName(), mappingFieldName); + } + value[i] = fieldValue; + } + return value; + } + + @Override + protected Column[] getOutputColumns() { + Column[] columns = new Column[fieldNames.size()]; + for (Map.Entry mapping : metadataFieldMapping.entrySet()) { + String metadataFieldName = mapping.getKey(); + String mappingFieldName = mapping.getValue(); + int i = fieldNames.indexOf(mappingFieldName); + Column column; + switch (metadataFieldName) { + case MetadataUtil.DATABASE: + case MetadataUtil.TABLE: + case MetadataUtil.ROW_KIND: + case MetadataUtil.PARTITION: + column = + PhysicalColumn.of( + mappingFieldName, + BasicType.STRING_TYPE, + (Long) null, + null, + true, + null, + null); + break; + case MetadataUtil.DELAY: + case MetadataUtil.EVENT_TIME: + column = + PhysicalColumn.of( + mappingFieldName, + BasicType.LONG_TYPE, + (Long) null, + null, + true, + null, + null); + break; + default: + throw TransformCommonError.cannotFindMetadataFieldError( + getPluginName(), mappingFieldName); + } + columns[i] = column; + } + return columns; + } + + @VisibleForTesting + public void initRowContainerGenerator() { + transformTableSchema(); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java new file mode 100644 index 00000000000..fe9971d3a13 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.metadata; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.transform.nlpmodel.ModelTransformConfig; + +import java.util.Map; + +public class MetadataTransformConfig extends ModelTransformConfig { + + public static final String PLUGIN_NAME = "Metadata"; + + public static final Option> METADATA_FIELDS = + Options.key("metadata_fields") + .mapType() + .noDefaultValue() + .withDescription( + "Specify the metadata field relationship between input and output"); +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java new file mode 100644 index 00000000000..ebbacb5cd08 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.metadata; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableTransform; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class MetadataTransformFactory implements TableTransformFactory { + @Override + public String factoryIdentifier() { + return MetadataTransformConfig.PLUGIN_NAME; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().required(MetadataTransformConfig.METADATA_FIELDS).build(); + } + + @Override + public TableTransform createTransform(TableTransformFactoryContext context) { + return () -> new MetadataTransform(context.getOptions(), context.getCatalogTables().get(0)); + } +} diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java new file mode 100644 index 00000000000..267bb0a03dc --- /dev/null +++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.metadata; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.MetadataUtil; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class MetadataTransformTest { + + static CatalogTable catalogTable; + + static Object[] values; + + static SeaTunnelRow inputRow; + + static Long eventTime; + + @BeforeAll + static void setUp() { + catalogTable = + CatalogTable.of( + TableIdentifier.of("catalog", TablePath.DEFAULT), + TableSchema.builder() + .column( + PhysicalColumn.of( + "key1", + BasicType.STRING_TYPE, + 1L, + Boolean.FALSE, + null, + null)) + .column( + PhysicalColumn.of( + "key2", + BasicType.INT_TYPE, + 1L, + Boolean.FALSE, + null, + null)) + .column( + PhysicalColumn.of( + "key3", + BasicType.LONG_TYPE, + 1L, + Boolean.FALSE, + null, + null)) + .column( + PhysicalColumn.of( + "key4", + BasicType.DOUBLE_TYPE, + 1L, + Boolean.FALSE, + null, + null)) + .column( + PhysicalColumn.of( + "key5", + BasicType.FLOAT_TYPE, + 1L, + Boolean.FALSE, + null, + null)) + .build(), + new HashMap<>(), + new ArrayList<>(), + "comment"); + values = new Object[] {"value1", 1, 896657703886127105L, 3.1415916, 3.14}; + inputRow = new SeaTunnelRow(values); + inputRow.setTableId(TablePath.DEFAULT.getFullName()); + eventTime = LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli(); + MetadataUtil.setDelay(inputRow, 150L); + MetadataUtil.setEventTime(inputRow, eventTime); + MetadataUtil.setPartition(inputRow, Arrays.asList("key1", "key2").toArray(new String[0])); + } + + @Test + void testMetadataTransform() { + Map metadataMapping = new HashMap<>(); + metadataMapping.put("database", "database"); + metadataMapping.put("table", "table"); + metadataMapping.put("partition", "partition"); + metadataMapping.put("rowKind", "rowKind"); + metadataMapping.put("ts_ms", "ts_ms"); + metadataMapping.put("delay", "delay"); + Map config = new HashMap<>(); + config.put("metadata_fields", metadataMapping); + MetadataTransform transform = + new MetadataTransform(ReadonlyConfig.fromMap(config), catalogTable); + transform.initRowContainerGenerator(); + SeaTunnelRow outputRow = transform.map(inputRow); + Assertions.assertEquals(values.length + 6, outputRow.getArity()); + Assertions.assertEquals( + "SeaTunnelRow{tableId=default.default.default, kind=+I, fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, default, key1,key2, 150, default, +I, " + + eventTime + + "]}", + outputRow.toString()); + } +}