diff --git a/docs/en/transform-v2/dynamic-compile.md b/docs/en/transform-v2/dynamic-compile.md index fb5500880ac..66b7ba1f834 100644 --- a/docs/en/transform-v2/dynamic-compile.md +++ b/docs/en/transform-v2/dynamic-compile.md @@ -88,7 +88,7 @@ transform { compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor import org.apache.seatunnel.api.table.catalog.CatalogTable import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.type.*; @@ -146,7 +146,7 @@ transform { compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column; - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.api.table.catalog.*; import org.apache.seatunnel.api.table.type.*; import java.util.ArrayList; diff --git a/docs/en/transform-v2/metadata.md b/docs/en/transform-v2/metadata.md new file mode 100644 index 00000000000..23f593ee0a4 --- /dev/null +++ b/docs/en/transform-v2/metadata.md @@ -0,0 +1,85 @@ +# Metadata + +> Metadata transform plugin + +## Description +Metadata transform plugin for adding metadata fields to data + +## Available Metadata + +| Key | DataType | Description | +|:---------:|:--------:|:---------------------------------------------------------------------------------------------------| +| Database | string | Name of the table that contain the row. | +| Table | string | Name of the table that contain the row. | +| RowKind | string | The type of operation | +| EventTime | Long | The time at which the connector processed the event. | +| Delay | Long | The difference between data extraction time and database change time | +| Partition | string | Contains the partition field of the corresponding number table of the row, multiple using `,` join | + +### note + `Delay` `Partition` only worked on cdc series connectors for now , except TiDB-CDC + +## Options + +| name | type | required | default value | Description | +|:---------------:|------|----------|---------------|---------------------------------------------------------------------------| +| metadata_fields | map | yes | | A mapping metadata input fields and their corresponding output fields. | + +### metadata_fields [map] + +A mapping between metadata fields and their respective output fields. + +```hocon +metadata_fields { + Database = c_database + Table = c_table + RowKind = c_rowKind + EventTime = c_ts_ms + Delay = c_delay +} +``` + +## Examples + +```yaml + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second = 7000000 + read_limit.rows_per_second = 400 +} + +source { + MySQL-CDC { + result_table_name = "customers_mysql_cdc" + server-id = 5652 + username = "root" + password = "zdyk_Dev@2024" + table-names = ["source.user"] + base-url = "jdbc:mysql://172.16.17.123:3306/source" + } +} + +transform { + Metadata { + metadata_fields { + Database = database + Table = table + RowKind = rowKind + EventTime = ts_ms + Delay = delay + } + result_table_name = "trans_result" + } +} + +sink { + Console { + source_table_name = "custom_name" + } +} + +``` + diff --git a/docs/zh/transform-v2/dynamic-compile.md b/docs/zh/transform-v2/dynamic-compile.md index c5af808d4e9..4db9b86d735 100644 --- a/docs/zh/transform-v2/dynamic-compile.md +++ b/docs/zh/transform-v2/dynamic-compile.md @@ -85,7 +85,7 @@ transform { compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor import org.apache.seatunnel.api.table.catalog.CatalogTable import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.type.*; @@ -143,7 +143,7 @@ transform { compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column; - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.api.table.catalog.*; import org.apache.seatunnel.api.table.type.*; import java.util.ArrayList; diff --git a/docs/zh/transform-v2/metadata.md b/docs/zh/transform-v2/metadata.md new file mode 100644 index 00000000000..09a743adf0e --- /dev/null +++ b/docs/zh/transform-v2/metadata.md @@ -0,0 +1,85 @@ +# Metadata + +> Metadata transform plugin + +## Description +元数据转换插件,用于将元数据字段添加到数据中 + +## 支持的元数据 + +| Key | DataType | Description | +|:---------:|:--------:|:-----------------------:| +| Database | string | 包含该行的数据库名 | +| Table | string | 包含该行的数表名 | +| RowKind | string | 行类型 | +| EventTime | Long | | +| Delay | Long | 数据抽取时间与数据库变更时间的差 | +| Partition | string | 包含该行对应数表的分区字段,多个使用`,`连接 | + +### 注意事项 + `Delay` `Partition`目前只适用于cdc系列连接器,除外TiDB-CDC + +## 配置选项 + +| name | type | required | default value | Description | +|:---------------:|------|:--------:|:-------------:|-------------------| +| metadata_fields | map | 是 | - | 元数据字段与输入字段相应的映射关系 | + +### metadata_fields [map] + +元数据字段和相应的输出字段之间的映射关系 + +```hocon +metadata_fields { + database = c_database + table = c_table + rowKind = c_rowKind + ts_ms = c_ts_ms + delay = c_delay +} +``` + +## 示例 + +```yaml + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second = 7000000 + read_limit.rows_per_second = 400 +} + +source { + MySQL-CDC { + result_table_name = "customers_mysql_cdc" + server-id = 5652 + username = "root" + password = "zdyk_Dev@2024" + table-names = ["source.user"] + base-url = "jdbc:mysql://172.16.17.123:3306/source" + } +} + +transform { + Metadata { + metadata_fields { + Database = database + Table = table + RowKind = rowKind + EventTime = ts_ms + Delay = delay + } + result_table_name = "trans_result" + } +} + +sink { + Console { + source_table_name = "custom_name" + } +} + +``` + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 82c941b70f6..c494686161e 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -154,3 +154,4 @@ seatunnel.transform.DynamicCompile = seatunnel-transforms-v2 seatunnel.transform.LLM = seatunnel-transforms-v2 seatunnel.transform.Embedding = seatunnel-transforms-v2 seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2 +seatunnel.transform.Metadata = seatunnel-transforms-v2 diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java index 839d611132b..8b5b36682a8 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/CommonOptions.java @@ -30,18 +30,44 @@ public enum CommonOptions { /** * The key of {@link Column#getOptions()} to specify the column value is a json format string. */ - JSON("Json"), + JSON("Json", false), /** The key of {@link Column#getOptions()} to specify the column value is a metadata field. */ - METADATA("Metadata"), + METADATA("Metadata", false), /** * The key of {@link SeaTunnelRow#getOptions()} to store the partition value of the row value. */ - PARTITION("Partition"), - ; + PARTITION("Partition", true), + /** + * The key of {@link SeaTunnelRow#getOptions()} to store the DATABASE value of the row value. + */ + DATABASE("Database", true), + /** The key of {@link SeaTunnelRow#getOptions()} to store the TABLE value of the row value. */ + TABLE("Table", true), + /** + * The key of {@link SeaTunnelRow#getOptions()} to store the ROW_KIND value of the row value. + */ + ROW_KIND("RowKind", true), + /** + * The key of {@link SeaTunnelRow#getOptions()} to store the EVENT_TIME value of the row value. + */ + EVENT_TIME("EventTime", true), + /** The key of {@link SeaTunnelRow#getOptions()} to store the DELAY value of the row value. */ + DELAY("Delay", true); private final String name; + private final boolean supportMetadataTrans; - CommonOptions(String name) { + CommonOptions(String name, boolean supportMetadataTrans) { this.name = name; + this.supportMetadataTrans = supportMetadataTrans; + } + + public static CommonOptions fromName(String name) { + for (CommonOptions option : CommonOptions.values()) { + if (option.getName().equals(name)) { + return option; + } + } + throw new IllegalArgumentException("Unknown option name: " + name); } } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java new file mode 100644 index 00000000000..42ab2035768 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MetadataUtil.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.type; + +import org.apache.seatunnel.api.table.catalog.TablePath; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; + +import static org.apache.seatunnel.api.table.type.CommonOptions.DELAY; +import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME; +import static org.apache.seatunnel.api.table.type.CommonOptions.PARTITION; + +public class MetadataUtil { + + public static final List METADATA_FIELDS; + + static { + METADATA_FIELDS = new ArrayList<>(); + Stream.of(CommonOptions.values()) + .filter(CommonOptions::isSupportMetadataTrans) + .map(CommonOptions::getName) + .forEach(METADATA_FIELDS::add); + } + + public static void setDelay(SeaTunnelRow row, Long delay) { + row.getOptions().put(DELAY.getName(), delay); + } + + public static void setPartition(SeaTunnelRow row, String[] partition) { + row.getOptions().put(PARTITION.getName(), partition); + } + + public static void setEventTime(SeaTunnelRow row, Long delay) { + row.getOptions().put(EVENT_TIME.getName(), delay); + } + + public static Long getDelay(SeaTunnelRowAccessor row) { + return (Long) row.getOptions().get(DELAY.getName()); + } + + public static String getDatabase(SeaTunnelRowAccessor row) { + if (row.getTableId() == null) { + return null; + } + return TablePath.of(row.getTableId()).getDatabaseName(); + } + + public static String getTable(SeaTunnelRowAccessor row) { + if (row.getTableId() == null) { + return null; + } + return TablePath.of(row.getTableId()).getTableName(); + } + + public static String getRowKind(SeaTunnelRowAccessor row) { + return row.getRowKind().shortString(); + } + + public static String getPartitionStr(SeaTunnelRowAccessor row) { + Object partition = row.getOptions().get(PARTITION.getName()); + return Objects.nonNull(partition) ? String.join(",", (String[]) partition) : null; + } + + public static String[] getPartition(SeaTunnelRowAccessor row) { + return (String[]) row.getOptions().get(PARTITION.getName()); + } + + public static Long getEventTime(SeaTunnelRowAccessor row) { + return (Long) row.getOptions().get(EVENT_TIME.getName()); + } + + public static boolean isMetadataField(String fieldName) { + return METADATA_FIELDS.contains(fieldName); + } +} diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java index b6da4eea7be..84e172f2dfd 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java @@ -34,10 +34,10 @@ public final class SeaTunnelRow implements Serializable { /** The array to store the actual internal format values. */ private final Object[] fields; - private volatile int size; - private Map options; + private volatile int size; + public SeaTunnelRow(int arity) { this.fields = new Object[arity]; } diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowAccessor.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowAccessor.java new file mode 100644 index 00000000000..6bbca49cd52 --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowAccessor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.type; + +import lombok.AllArgsConstructor; + +import java.util.Map; + +@AllArgsConstructor +public class SeaTunnelRowAccessor { + private final SeaTunnelRow row; + + public int getArity() { + return row.getArity(); + } + + public String getTableId() { + return row.getTableId(); + } + + public RowKind getRowKind() { + return row.getRowKind(); + } + + public Object getField(int pos) { + return row.getField(pos); + } + + public Object[] getFields() { + return row.getFields(); + } + + public Map getOptions() { + return row.getOptions(); + } +} diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java index 3245273ace2..f7d162925d3 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/utils/SourceRecordUtils.java @@ -62,10 +62,9 @@ public static Object[] rowToArray(ResultSet rs, int size) throws SQLException { } /** - * Return the timestamp when the change event is produced in MySQL. - * - *

The field `source.ts_ms` in {@link SourceRecord} data struct is the time when the change - * event is operated in MySQL. + * In the source object, ts_ms indicates the time that the change was made in the database. By + * comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can + * determine the lag between the source database update and Debezium. */ public static Long getMessageTimestamp(SourceRecord record) { Schema schema = record.valueSchema(); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java index d09e7b77b5c..56093fd9377 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler; +import org.apache.seatunnel.api.table.type.MetadataUtil; import org.apache.seatunnel.api.table.type.MultipleRowType; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -173,26 +174,39 @@ private void deserializeDataChangeRecord(SourceRecord record, Collector ou log.debug("Ignore newly added table {}", tableId); return; } - + Long fetchTimestamp = SourceRecordUtils.getFetchTimestamp(record); + Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(record); + long delay = -1L; + if (fetchTimestamp != null && messageTimestamp != null) { + delay = fetchTimestamp - messageTimestamp; + } switch (op) { case INSERT: SeaTunnelRow insert = extractRowData(tableRowConverter, fullDocument); insert.setRowKind(RowKind.INSERT); insert.setTableId(tableId); + MetadataUtil.setDelay(insert, delay); + MetadataUtil.setEventTime(insert, fetchTimestamp); emit(record, insert, out); break; case DELETE: SeaTunnelRow delete = extractRowData(tableRowConverter, documentKey); delete.setRowKind(RowKind.DELETE); delete.setTableId(tableId); + MetadataUtil.setDelay(delete, delay); + MetadataUtil.setEventTime(delete, fetchTimestamp); emit(record, delete, out); break; case UPDATE: @@ -132,12 +143,16 @@ public void deserialize(@Nonnull SourceRecord record, Collector ou SeaTunnelRow updateAfter = extractRowData(tableRowConverter, fullDocument); updateAfter.setRowKind(RowKind.UPDATE_AFTER); updateAfter.setTableId(tableId); + MetadataUtil.setDelay(updateAfter, delay); + MetadataUtil.setEventTime(updateAfter, fetchTimestamp); emit(record, updateAfter, out); break; case REPLACE: SeaTunnelRow replaceAfter = extractRowData(tableRowConverter, fullDocument); replaceAfter.setRowKind(RowKind.UPDATE_AFTER); replaceAfter.setTableId(tableId); + MetadataUtil.setDelay(replaceAfter, delay); + MetadataUtil.setEventTime(replaceAfter, fetchTimestamp); emit(record, replaceAfter, out); break; case INVALIDATE: diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml index a8814c11ee6..e985193498d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/pom.xml @@ -67,5 +67,19 @@ ${mysql.version} test + + + org.apache.seatunnel + seatunnel-transforms-v2 + ${project.version} + test + + + + org.apache.seatunnel + connector-assert + ${project.version} + test + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java index 37897313543..9d8366f06bc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; import org.bson.Document; import org.junit.jupiter.api.AfterAll; @@ -45,6 +46,7 @@ import com.mongodb.client.model.Sorts; import lombok.extern.slf4j.Slf4j; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; @@ -148,7 +150,8 @@ public void startUp() { } @TestTemplate - public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) { + public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) + throws InterruptedException { CompletableFuture.supplyAsync( () -> { try { @@ -225,6 +228,45 @@ public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container) { }); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "This case requires obtaining the task health status and manually canceling the canceled task, which is currently only supported by the zeta engine.") + public void testMongodbCdcMetadataTrans(TestContainer container) throws InterruptedException { + cleanSourceTable(); + Long jobId = JobIdGenerator.newJobId(); + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob( + "/mongodbcdc_metadata_trans.conf", String.valueOf(jobId)); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(); + } + return null; + }); + TimeUnit.SECONDS.sleep(10); + // insert update delete + upsertDeleteSourceTable(); + TimeUnit.SECONDS.sleep(20); + await().atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + String jobStatus = container.getJobStatus(String.valueOf(jobId)); + Assertions.assertEquals("RUNNING", jobStatus); + }); + + try { + Container.ExecResult cancelJobResult = container.cancelJob(String.valueOf(jobId)); + Assertions.assertEquals(0, cancelJobResult.getExitCode(), cancelJobResult.getStderr()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + private Connection getJdbcConnection() throws SQLException { return DriverManager.getConnection( MYSQL_CONTAINER.getJdbcUrl(), diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf new file mode 100644 index 00000000000..bc6475359c4 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_metadata_trans.conf @@ -0,0 +1,104 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + MongoDB-CDC { + hosts = "mongo0:27017" + database = ["inventory"] + collection = ["inventory.products"] + username = superuser + password = superpw + schema = { + fields { + "_id": string, + "name": string, + "description": string, + "weight": string + } + } + } +} + +transform { + Metadata { + metadata_fields { + Database = database + Table = table + RowKind = rowKind + EventTime = ts_ms + Delay = delay + } + result_table_name = "trans_result" + } +} + +sink { + Assert { + source_table_name = "trans_result" + rules { + field_rules = [ + { + field_name = database + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = table + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = rowKind + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = ts_ms + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = delay + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml index 539fce1890b..8cb38aaffa1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml @@ -54,6 +54,20 @@ test + + org.apache.seatunnel + seatunnel-transforms-v2 + ${project.version} + test + + + + org.apache.seatunnel + connector-assert + ${project.version} + test + + org.testcontainers mysql diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java index 28919074315..6f6467257f9 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java @@ -180,6 +180,45 @@ public void testMysqlCdcCheckDataE2e(TestContainer container) { }); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "This case requires obtaining the task health status and manually canceling the canceled task, which is currently only supported by the zeta engine.") + public void testMysqlCdcMetadataTrans(TestContainer container) throws InterruptedException { + // Clear related content to ensure that multiple operations are not affected + clearTable(MYSQL_DATABASE, SOURCE_TABLE_1); + clearTable(MYSQL_DATABASE, SINK_TABLE); + Long jobId = JobIdGenerator.newJobId(); + CompletableFuture.runAsync( + () -> { + try { + container.executeJob( + "/mysqlcdc_to_metadata_trans.conf", String.valueOf(jobId)); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + TimeUnit.SECONDS.sleep(10); + // insert update delete + upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1); + TimeUnit.SECONDS.sleep(10); + await().atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + String jobStatus = container.getJobStatus(String.valueOf(jobId)); + Assertions.assertEquals("RUNNING", jobStatus); + }); + try { + Container.ExecResult cancelJobResult = container.cancelJob(String.valueOf(jobId)); + Assertions.assertEquals(0, cancelJobResult.getExitCode(), cancelJobResult.getStderr()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + @TestTemplate public void testMysqlCdcCheckDataWithDisableExactlyonce(TestContainer container) { // Clear related content to ensure that multiple operations are not affected diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf new file mode 100644 index 00000000000..8787c8987d1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_metadata_trans.conf @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second = 7000000 + read_limit.rows_per_second = 400 +} + +source { + MySQL-CDC { + result_table_name = "customers_mysql_cdc" + server-id = 5652 + username = "st_user_source" + password = "mysqlpw" + table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] + base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + } +} + +transform { + Metadata { + metadata_fields { + Database = database + Table = table + RowKind = rowKind + EventTime = ts_ms + Delay = delay + } + result_table_name = "trans_result" + } +} + +sink { + Assert { + source_table_name = "trans_result" + rules { + field_rules = [ + { + field_name = database + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = table + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = rowKind + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = ts_ms + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = delay + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml index f95e5cdb1a0..b855c0d6d5c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/pom.xml @@ -57,6 +57,20 @@ test + + org.apache.seatunnel + seatunnel-transforms-v2 + ${project.version} + test + + + + org.apache.seatunnel + connector-assert + ${project.version} + test + + org.testcontainers postgresql diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java index ed3fdd74b40..35be6c0d126 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/OpengaussCDCIT.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.apache.seatunnel.e2e.common.util.JobIdGenerator; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -199,6 +200,47 @@ public void testOpengaussCdcCheckDataE2e(TestContainer container) { } } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "This case requires obtaining the task health status and manually canceling the canceled task, which is currently only supported by the zeta engine.") + public void testOpengaussCdcMeatadataTrans(TestContainer container) + throws InterruptedException, IOException { + try { + Long jobId = JobIdGenerator.newJobId(); + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob( + "/opengausscdc_to_meatadata_trans.conf", String.valueOf(jobId)); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + TimeUnit.SECONDS.sleep(10); + // insert update delete + upsertDeleteSourceTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_1); + + TimeUnit.SECONDS.sleep(20); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + String jobStatus = container.getJobStatus(String.valueOf(jobId)); + Assertions.assertEquals("RUNNING", jobStatus); + }); + Container.ExecResult cancelJobResult = container.cancelJob(String.valueOf(jobId)); + Assertions.assertEquals(0, cancelJobResult.getExitCode(), cancelJobResult.getStderr()); + } finally { + clearTable(OPENGAUSS_SCHEMA, SOURCE_TABLE_1); + clearTable(OPENGAUSS_SCHEMA, SINK_TABLE_1); + } + } + @TestTemplate @DisabledOnContainer( value = {}, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/resources/opengausscdc_to_meatadata_trans.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/resources/opengausscdc_to_meatadata_trans.conf new file mode 100644 index 00000000000..9de18e661d9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-opengauss-e2e/src/test/resources/opengausscdc_to_meatadata_trans.conf @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second=7000000 + read_limit.rows_per_second=400 +} + +source { + Opengauss-CDC { + result_table_name = "customers_opengauss_cdc" + username = "gaussdb" + password = "openGauss@123" + database-names = ["opengauss_cdc"] + schema-names = ["inventory"] + table-names = ["opengauss_cdc.inventory.opengauss_cdc_table_1"] + base-url = "jdbc:postgresql://opengauss_cdc_e2e:5432/opengauss_cdc?loggerLevel=OFF" + decoding.plugin.name = "pgoutput" + } +} + +transform { + Metadata { + metadata_fields { + Database = database + Table = table + RowKind = rowKind + EventTime = ts_ms + Delay = delay + } + result_table_name = "trans_result" + } +} + +sink { + Assert { + source_table_name = "trans_result" + rules { + field_rules = [ + { + field_name = database + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = table + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = rowKind + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = ts_ms + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = delay + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml index d3ee2be7415..603e6f7f418 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/pom.xml @@ -44,6 +44,21 @@ ${project.version} test + + + org.apache.seatunnel + seatunnel-transforms-v2 + ${project.version} + test + + + + org.apache.seatunnel + connector-assert + ${project.version} + test + + org.testcontainers jdbc diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java index 86282b7ff03..ce9fbad7bea 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java @@ -299,6 +299,48 @@ public void testOracleCdcCheckDataWithCustomPrimaryKey(TestContainer container) }); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "This case requires obtaining the task health status and manually canceling the canceled task, which is currently only supported by the zeta engine.") + public void testOracleCdcMetadataTrans(TestContainer container) throws Exception { + + clearTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY); + clearTable(DATABASE, SINK_TABLE1); + + insertSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY); + Long jobId = JobIdGenerator.newJobId(); + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob( + "/oraclecdc_to_metadata_trans.conf", String.valueOf(jobId)); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + TimeUnit.SECONDS.sleep(10); + // insert update delete + updateSourceTable(DATABASE, SOURCE_TABLE_NO_PRIMARY_KEY); + TimeUnit.SECONDS.sleep(20); + await().atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + String jobStatus = container.getJobStatus(String.valueOf(jobId)); + Assertions.assertEquals("RUNNING", jobStatus); + }); + try { + Container.ExecResult cancelJobResult = container.cancelJob(String.valueOf(jobId)); + Assertions.assertEquals(0, cancelJobResult.getExitCode(), cancelJobResult.getStderr()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + @TestTemplate @DisabledOnContainer( value = {}, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_metadata_trans.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_metadata_trans.conf new file mode 100644 index 00000000000..6a24214f8c6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_metadata_trans.conf @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Oracle-CDC { + result_table_name = "customers" + username = "system" + password = "top_secret" + database-names = ["ORCLCDB"] + schema-names = ["DEBEZIUM"] + base-url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB" + source.reader.close.timeout = 120000 + connection.pool.size = 1 + debezium { + # log.mining.strategy = "online_catalog" + # log.mining.continuous.mine = true + database.oracle.jdbc.timezoneAsRegion = "false" + } + + table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY"] + table-names-config = [ + { + table = "ORCLCDB.DEBEZIUM.FULL_TYPES_NO_PRIMARY_KEY" + primaryKeys = ["ID"] + } + ] + + exactly_once = true + } +} + +transform { + Metadata { + metadata_fields { + Database = database + Table = table + RowKind = rowKind + EventTime = ts_ms + Delay = delay + } + result_table_name = "trans_result" + } +} + +sink { + Assert { + source_table_name = "trans_result" + rules { + field_rules = [ + { + field_name = database + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = table + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = rowKind + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = ts_ms + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = delay + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml index 0e789782956..bb152c27955 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml @@ -53,6 +53,20 @@ test + + org.apache.seatunnel + seatunnel-transforms-v2 + ${project.version} + test + + + + org.apache.seatunnel + connector-assert + ${project.version} + test + + org.testcontainers postgresql diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index 3abca057fb2..acb9a2a41cd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -184,6 +184,49 @@ public void testMPostgresCdcCheckDataE2e(TestContainer container) { } } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "This case requires obtaining the task health status and manually canceling the canceled task, which is currently only supported by the zeta engine.") + public void testMPostgresCdcMetadataTrans(TestContainer container) throws InterruptedException { + + Long jobId = JobIdGenerator.newJobId(); + CompletableFuture.runAsync( + () -> { + try { + container.executeJob( + "/postgrescdc_to_postgres.conf", String.valueOf(jobId)); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + TimeUnit.SECONDS.sleep(10); + // insert update delete + upsertDeleteSourceTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_1); + + TimeUnit.SECONDS.sleep(20); + await().atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + String jobStatus = container.getJobStatus(String.valueOf(jobId)); + Assertions.assertEquals("RUNNING", jobStatus); + }); + + try { + Container.ExecResult cancelJobResult = container.cancelJob(String.valueOf(jobId)); + Assertions.assertEquals(0, cancelJobResult.getExitCode(), cancelJobResult.getStderr()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } finally { + // Clear related content to ensure that multiple operations are not affected + clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_1); + clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_1); + } + } + @TestTemplate @DisabledOnContainer( value = {}, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_metadata_trans.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_metadata_trans.conf new file mode 100644 index 00000000000..d0337069b04 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_metadata_trans.conf @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second=7000000 + read_limit.rows_per_second=400 +} + +source { + Postgres-CDC { + result_table_name = "customers_postgres_cdc" + username = "postgres" + password = "postgres" + database-names = ["postgres_cdc"] + schema-names = ["inventory"] + table-names = ["postgres_cdc.inventory.postgres_cdc_table_1"] + base-url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" + decoding.plugin.name = "decoderbufs" + } +} + +transform { + Metadata { + metadata_fields { + Database = database + Table = table + RowKind = rowKind + EventTime = ts_ms + Delay = delay + } + result_table_name = "trans_result" + } +} + +sink { + Assert { + source_table_name = "trans_result" + rules { + field_rules = [ + { + field_name = database + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = table + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = rowKind + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = ts_ms + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = delay + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml index 0c5a0fa8e07..59a673fe860 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/pom.xml @@ -51,6 +51,21 @@ ${project.version} test + + + org.apache.seatunnel + seatunnel-transforms-v2 + ${project.version} + test + + + + org.apache.seatunnel + connector-assert + ${project.version} + test + + org.testcontainers jdbc diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java index eb891be7714..1b699d5805f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; +import org.apache.seatunnel.e2e.common.util.JobIdGenerator; import org.awaitility.Awaitility; import org.awaitility.core.ConditionTimeoutException; @@ -307,6 +308,44 @@ public void testCDCWithCustomPrimaryKey(TestContainer container) { }); } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "This case requires obtaining the task health status and manually canceling the canceled task, which is currently only supported by the zeta engine.") + public void testSqlServerCDCMetadataTrans(TestContainer container) throws InterruptedException { + initializeSqlServerTable("column_type_test"); + + Long jobId = JobIdGenerator.newJobId(); + CompletableFuture.runAsync( + () -> { + try { + container.executeJob( + "/sqlservercdc_to_metadata_trans.conf", String.valueOf(jobId)); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + }); + TimeUnit.SECONDS.sleep(10); + // insert update delete + updateSourceTable(SOURCE_TABLE_CUSTOM_PRIMARY_KEY); + TimeUnit.SECONDS.sleep(20); + await().atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> { + String jobStatus = container.getJobStatus(String.valueOf(jobId)); + Assertions.assertEquals("RUNNING", jobStatus); + }); + try { + Container.ExecResult cancelJobResult = container.cancelJob(String.valueOf(jobId)); + Assertions.assertEquals(0, cancelJobResult.getExitCode(), cancelJobResult.getStderr()); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + /** * Executes a JDBC statement using the default jdbc config without autocommitting the * connection. diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_metadata_trans.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_metadata_trans.conf new file mode 100644 index 00000000000..49272fc5cf8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_metadata_trans.conf @@ -0,0 +1,110 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + SqlServer-CDC { + result_table_name = "customers" + username = "sa" + password = "Password!" + database-names = ["column_type_test"] + table-names = ["column_type_test.dbo.full_types_custom_primary_key"] + base-url = "jdbc:sqlserver://sqlserver-host:1433;databaseName=column_type_test" + + exactly_once = true + table-names-config = [ + { + table = "column_type_test.dbo.full_types_custom_primary_key" + primaryKeys = ["id"] + } + ] + } +} + +transform { + Metadata { + metadata_fields { + Database = database + Table = table + RowKind = rowKind + EventTime = ts_ms + Delay = delay + } + result_table_name = "trans_result" + } +} + +sink { + Assert { + source_table_name = "trans_result" + rules { + field_rules = [ + { + field_name = database + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = table + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = rowKind + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = ts_ms + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = delay + field_type = long + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java index 72584158f64..b55cbae6c7d 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java @@ -68,6 +68,10 @@ default Container.ExecResult cancelJob(String jobId) throws IOException, Interru throw new UnsupportedOperationException("Not implemented"); } + default String getJobStatus(String jobId) { + throw new UnsupportedOperationException("Not implemented"); + } + String getServerLogs(); void copyFileToContainer(String path, String targetPath); diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java index e33d89cc0ad..03a0f87be8d 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java @@ -17,7 +17,10 @@ package org.apache.seatunnel.e2e.common.container.seatunnel; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode; + import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.e2e.common.container.AbstractTestContainer; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; import org.apache.seatunnel.e2e.common.container.TestContainer; @@ -25,6 +28,7 @@ import org.apache.seatunnel.e2e.common.util.ContainerUtil; import org.apache.commons.compress.utils.Lists; +import org.apache.http.HttpStatus; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; @@ -96,7 +100,7 @@ private GenericContainer createSeaTunnelServer() throws IOException, Interrup "seatunnel-engine:" + JDK_DOCKER_IMAGE))) .waitingFor(Wait.forLogMessage(".*received new worker register:.*", 1)); copySeaTunnelStarterToContainer(server); - server.setPortBindings(Collections.singletonList("5801:5801")); + server.setPortBindings(Arrays.asList("5801:5801", "8080:8080")); server.withCopyFileToContainer( MountableFile.forHostPath( PROJECT_ROOT_PATH @@ -490,6 +494,24 @@ public Container.ExecResult cancelJob(String jobId) throws IOException, Interrup return cancelJob(server, jobId); } + @Override + public String getJobStatus(String jobId) { + HttpGet get = new HttpGet("http://" + server.getHost() + ":8080/job-info/" + jobId); + try (CloseableHttpClient client = HttpClients.createDefault()) { + CloseableHttpResponse response = client.execute(get); + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + String jobStatus = EntityUtils.toString(response.getEntity()); + ObjectNode jsonNodes = JsonUtils.parseObject(jobStatus); + if (jsonNodes.has("jobStatus")) { + return jsonNodes.get("jobStatus").asText(); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + } + @Override public String getServerLogs() { return server.getLogs(); diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml index 80b928fcdcd..2c3d39d0be8 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml @@ -33,10 +33,10 @@ seatunnel: plugin-config: namespace: /tmp/seatunnel/checkpoint_snapshot/ http: - enable-http: false + enable-http: true port: 8080 telemetry: metric: enabled: false logs: - scheduled-deletion-enable: true \ No newline at end of file + scheduled-deletion-enable: true diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf index e91765fbf3c..960df97405b 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf @@ -46,7 +46,7 @@ transform { compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column; - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.api.table.catalog.*; import org.apache.seatunnel.api.table.type.*; import java.util.ArrayList; @@ -84,7 +84,7 @@ transform { compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor import org.apache.seatunnel.api.table.catalog.CatalogTable import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.type.*; @@ -154,4 +154,4 @@ sink { } } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf index 8689404a17e..5d226764c1c 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf @@ -43,7 +43,7 @@ transform { compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor import org.apache.seatunnel.api.table.catalog.CatalogTable import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.type.*; @@ -77,7 +77,7 @@ transform { compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor import org.apache.seatunnel.api.table.catalog.CatalogTable import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.type.*; @@ -154,4 +154,4 @@ sink { ] } } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf index 9e59a5e5350..64272fef83a 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf @@ -46,7 +46,7 @@ transform { compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column; - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.api.table.catalog.*; import org.apache.seatunnel.api.table.type.*; import java.util.ArrayList; @@ -84,7 +84,7 @@ transform { compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column; - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.api.table.catalog.*; import org.apache.seatunnel.api.table.type.*; import java.util.ArrayList; @@ -156,4 +156,4 @@ sink { } } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf index 7958b880765..661f5562eec 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf @@ -43,7 +43,7 @@ transform { compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor import org.apache.seatunnel.api.table.catalog.CatalogTable import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.type.*; @@ -108,4 +108,4 @@ sink { ] } } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_http_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_http_compile_transform.conf index 904066d69bc..6aa16b64b99 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_http_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_http_compile_transform.conf @@ -44,7 +44,7 @@ transform { source_code=""" import cn.hutool.http.HttpUtil; import org.apache.seatunnel.api.table.catalog.Column - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor import org.apache.seatunnel.api.table.catalog.CatalogTable import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.type.*; @@ -112,4 +112,4 @@ Assert { } } } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf index b65877d465c..1f732bb3068 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf @@ -43,7 +43,7 @@ DynamicCompile { compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column; - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.api.table.catalog.*; import org.apache.seatunnel.api.table.type.*; import java.util.ArrayList; @@ -112,4 +112,4 @@ sink { ] } } -} \ No newline at end of file +} diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile index 9bb6a8fcdfe..079e78d6772 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile @@ -15,7 +15,7 @@ * limitations under the License. */ import org.apache.seatunnel.api.table.catalog.Column -import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor import org.apache.seatunnel.api.table.catalog.CatalogTable import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.type.*; @@ -39,4 +39,4 @@ class demo { fieldValues[0]="AA" return fieldValues; } -}; \ No newline at end of file +}; diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile index 7d1947c077e..0fe36b01c06 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile @@ -18,7 +18,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import java.util.ArrayList; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java index 51e11724123..7f6ac1fba71 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java index 5b97f341686..39dd9517486 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java @@ -23,6 +23,7 @@ import lombok.AllArgsConstructor; @AllArgsConstructor +@Deprecated public class SeaTunnelRowAccessor { private final SeaTunnelRow row; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java index 13d25989aca..394242a41be 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SingleFieldOutputTransform.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java index 1718030db68..75712b5fc87 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java @@ -24,10 +24,10 @@ import org.apache.seatunnel.api.table.type.MapType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform; -import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; import org.apache.seatunnel.transform.exception.TransformCommonError; import java.lang.reflect.Array; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java index ea55569420d..bfae2b8d2a1 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java @@ -20,10 +20,10 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.common.utils.ReflectionUtils; import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform; -import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; import org.apache.seatunnel.transform.dynamiccompile.parse.AbstractParse; import org.apache.seatunnel.transform.dynamiccompile.parse.GroovyClassParse; import org.apache.seatunnel.transform.dynamiccompile.parse.JavaClassParse; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java index b35df6a448b..51e4b8265fb 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java @@ -25,6 +25,8 @@ import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND; import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND; +import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.METADATA_FIELDS_NOT_FOUND; +import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.METADATA_MAPPING_FIELD_EXISTS; /** The common error of SeaTunnel transform. Please refer {@link CommonError} */ public class TransformCommonError { @@ -43,4 +45,18 @@ public static TransformException cannotFindInputFieldsError( params.put("transform", transform); return new TransformException(INPUT_FIELDS_NOT_FOUND, params); } + + public static TransformException cannotFindMetadataFieldError(String transform, String field) { + Map params = new HashMap<>(); + params.put("field", field); + params.put("transform", transform); + return new TransformException(METADATA_FIELDS_NOT_FOUND, params); + } + + public static TransformException metadataMappingFieldExists(String transform, String field) { + Map params = new HashMap<>(); + params.put("field", field); + params.put("transform", transform); + return new TransformException(METADATA_MAPPING_FIELD_EXISTS, params); + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java index dc5008ec040..b0d72d7cf19 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java @@ -25,7 +25,13 @@ public enum TransformCommonErrorCode implements SeaTunnelErrorCode { "The input field '' of '' transform not found in upstream schema"), INPUT_FIELDS_NOT_FOUND( "TRANSFORM_COMMON-02", - "The input fields '' of '' transform not found in upstream schema"); + "The input fields '' of '' transform not found in upstream schema"), + METADATA_FIELDS_NOT_FOUND( + "TRANSFORM_COMMON-03", + "The metadata fields '' of '' transform not found "), + METADATA_MAPPING_FIELD_EXISTS( + "TRANSFORM_COMMON-04", + "The metadata mapping field '' of '' transform already exists in upstream schema"); private final String code; private final String description; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java index 3e14e8488d3..7978aa22601 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java @@ -23,12 +23,12 @@ import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.format.json.JsonToRowConverters; import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform; -import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; import org.apache.seatunnel.transform.exception.ErrorDataTransformException; import org.apache.seatunnel.transform.exception.TransformCommonError; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java new file mode 100644 index 00000000000..2a1679b500d --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransform.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.metadata; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.CommonOptions; +import org.apache.seatunnel.api.table.type.MetadataUtil; +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; +import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform; +import org.apache.seatunnel.transform.exception.TransformCommonError; + +import com.google.common.annotations.VisibleForTesting; +import lombok.NonNull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.seatunnel.api.table.type.MetadataUtil.isMetadataField; + +public class MetadataTransform extends MultipleFieldOutputTransform { + + private List fieldNames; + private Map metadataFieldMapping; + + public MetadataTransform(ReadonlyConfig config, @NonNull CatalogTable inputCatalogTable) { + super(inputCatalogTable); + initOutputFields(inputCatalogTable, config.get(MetadataTransformConfig.METADATA_FIELDS)); + } + + private void initOutputFields(CatalogTable inputCatalogTable, Map fields) { + List sourceTableFiledNames = + Arrays.asList(inputCatalogTable.getTableSchema().getFieldNames()); + List fieldNames = new ArrayList<>(); + for (Map.Entry field : fields.entrySet()) { + String srcField = field.getKey(); + if (!isMetadataField(srcField)) { + throw TransformCommonError.cannotFindMetadataFieldError(getPluginName(), srcField); + } + String targetField = field.getValue(); + if (sourceTableFiledNames.contains(targetField)) { + throw TransformCommonError.metadataMappingFieldExists(getPluginName(), srcField); + } + fieldNames.add(field.getKey()); + } + this.fieldNames = fieldNames; + this.metadataFieldMapping = fields; + } + + @Override + public String getPluginName() { + return MetadataTransformConfig.PLUGIN_NAME; + } + + @Override + protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) { + Object[] value = new Object[fieldNames.size()]; + for (Map.Entry mapping : metadataFieldMapping.entrySet()) { + String metadataFieldName = mapping.getKey(); + String mappingFieldName = mapping.getValue(); + int i = fieldNames.indexOf(metadataFieldName); + Object fieldValue = null; + switch (CommonOptions.fromName(metadataFieldName)) { + case DATABASE: + fieldValue = MetadataUtil.getDatabase(inputRow); + break; + case TABLE: + fieldValue = MetadataUtil.getTable(inputRow); + break; + case ROW_KIND: + fieldValue = MetadataUtil.getRowKind(inputRow); + break; + case DELAY: + fieldValue = MetadataUtil.getDelay(inputRow); + break; + case EVENT_TIME: + fieldValue = MetadataUtil.getEventTime(inputRow); + break; + case PARTITION: + fieldValue = MetadataUtil.getPartitionStr(inputRow); + break; + default: + throw TransformCommonError.cannotFindMetadataFieldError( + getPluginName(), mappingFieldName); + } + value[i] = fieldValue; + } + return value; + } + + @Override + protected Column[] getOutputColumns() { + Column[] columns = new Column[fieldNames.size()]; + for (Map.Entry mapping : metadataFieldMapping.entrySet()) { + String metadataFieldName = mapping.getKey(); + String mappingFieldName = mapping.getValue(); + int i = fieldNames.indexOf(metadataFieldName); + Column column; + switch (CommonOptions.fromName(metadataFieldName)) { + case DATABASE: + case TABLE: + case ROW_KIND: + case PARTITION: + column = + PhysicalColumn.of( + mappingFieldName, + BasicType.STRING_TYPE, + (Long) null, + null, + true, + null, + null); + break; + case DELAY: + case EVENT_TIME: + column = + PhysicalColumn.of( + mappingFieldName, + BasicType.LONG_TYPE, + (Long) null, + null, + true, + null, + null); + break; + default: + throw TransformCommonError.cannotFindMetadataFieldError( + getPluginName(), mappingFieldName); + } + columns[i] = column; + } + return columns; + } + + @VisibleForTesting + public void initRowContainerGenerator() { + transformTableSchema(); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java new file mode 100644 index 00000000000..fe9971d3a13 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.metadata; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.transform.nlpmodel.ModelTransformConfig; + +import java.util.Map; + +public class MetadataTransformConfig extends ModelTransformConfig { + + public static final String PLUGIN_NAME = "Metadata"; + + public static final Option> METADATA_FIELDS = + Options.key("metadata_fields") + .mapType() + .noDefaultValue() + .withDescription( + "Specify the metadata field relationship between input and output"); +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java new file mode 100644 index 00000000000..ebbacb5cd08 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/metadata/MetadataTransformFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.metadata; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.connector.TableTransform; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class MetadataTransformFactory implements TableTransformFactory { + @Override + public String factoryIdentifier() { + return MetadataTransformConfig.PLUGIN_NAME; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder().required(MetadataTransformConfig.METADATA_FIELDS).build(); + } + + @Override + public TableTransform createTransform(TableTransformFactoryContext context) { + return () -> new MetadataTransform(context.getOptions(), context.getCatalogTables().get(0)); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransform.java index 9e77043f0a7..ce6d864da6f 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/embadding/EmbeddingTransform.java @@ -21,10 +21,10 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.table.type.VectorType; import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform; -import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; import org.apache.seatunnel.transform.exception.TransformCommonError; import org.apache.seatunnel.transform.nlpmodel.ModelProvider; import org.apache.seatunnel.transform.nlpmodel.ModelTransformConfig; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransform.java index 069945951bc..c99b03776e9 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/nlpmodel/llm/LLMTransform.java @@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.transform.common.SingleFieldOutputTransform; import org.apache.seatunnel.transform.nlpmodel.ModelProvider; import org.apache.seatunnel.transform.nlpmodel.ModelTransformConfig; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java index 5c5451fce74..b2c9fa44ce5 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java @@ -20,8 +20,8 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; import org.apache.seatunnel.transform.common.SingleFieldOutputTransform; import org.apache.seatunnel.transform.exception.TransformCommonError; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java index ae7b1fcaa18..354e5a3dd5d 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/rowkind/RowKindExtractorTransform.java @@ -24,7 +24,7 @@ import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.transform.common.SingleFieldOutputTransform; import com.google.common.annotations.VisibleForTesting; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java index c1ead2dd0b5..922cdbd97c5 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java @@ -21,9 +21,9 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform; -import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; import org.apache.seatunnel.transform.exception.TransformCommonError; import lombok.NonNull; diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java new file mode 100644 index 00000000000..a3ddf1ced44 --- /dev/null +++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/metadata/MetadataTransformTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.metadata; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.MetadataUtil; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class MetadataTransformTest { + + static CatalogTable catalogTable; + + static Object[] values; + + static SeaTunnelRow inputRow; + + static Long eventTime; + + @BeforeAll + static void setUp() { + catalogTable = + CatalogTable.of( + TableIdentifier.of("catalog", TablePath.DEFAULT), + TableSchema.builder() + .column( + PhysicalColumn.of( + "key1", + BasicType.STRING_TYPE, + 1L, + Boolean.FALSE, + null, + null)) + .column( + PhysicalColumn.of( + "key2", + BasicType.INT_TYPE, + 1L, + Boolean.FALSE, + null, + null)) + .column( + PhysicalColumn.of( + "key3", + BasicType.LONG_TYPE, + 1L, + Boolean.FALSE, + null, + null)) + .column( + PhysicalColumn.of( + "key4", + BasicType.DOUBLE_TYPE, + 1L, + Boolean.FALSE, + null, + null)) + .column( + PhysicalColumn.of( + "key5", + BasicType.FLOAT_TYPE, + 1L, + Boolean.FALSE, + null, + null)) + .build(), + new HashMap<>(), + new ArrayList<>(), + "comment"); + values = new Object[] {"value1", 1, 896657703886127105L, 3.1415916, 3.14}; + inputRow = new SeaTunnelRow(values); + inputRow.setTableId(TablePath.DEFAULT.getFullName()); + eventTime = LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli(); + MetadataUtil.setDelay(inputRow, 150L); + MetadataUtil.setEventTime(inputRow, eventTime); + MetadataUtil.setPartition(inputRow, Arrays.asList("key1", "key2").toArray(new String[0])); + } + + @Test + void testMetadataTransform() { + Map metadataMapping = new HashMap<>(); + metadataMapping.put("Database", "database"); + metadataMapping.put("Table", "table"); + metadataMapping.put("Partition", "partition"); + metadataMapping.put("RowKind", "rowKind"); + metadataMapping.put("EventTime", "ts_ms"); + metadataMapping.put("Delay", "delay"); + Map config = new HashMap<>(); + config.put("metadata_fields", metadataMapping); + MetadataTransform transform = + new MetadataTransform(ReadonlyConfig.fromMap(config), catalogTable); + transform.initRowContainerGenerator(); + SeaTunnelRow outputRow = transform.map(inputRow); + Assertions.assertEquals(values.length + 6, outputRow.getArity()); + Assertions.assertEquals( + "SeaTunnelRow{tableId=default.default.default, kind=+I, fields=[value1, 1, 896657703886127105, 3.1415916, 3.14, key1,key2, default, " + + eventTime + + ", +I, default, 150]}", + outputRow.toString()); + } +}