Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Transform-v2] Add metadata transform #7899

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/en/transform-v2/dynamic-compile.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
hawk9821 marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.seatunnel.api.table.catalog.CatalogTable
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.*;
Expand Down Expand Up @@ -146,7 +146,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
Expand Down
85 changes: 85 additions & 0 deletions docs/en/transform-v2/metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Metadata

> Metadata transform plugin

## Description
Metadata transform plugin for adding metadata fields to data

## Available Metadata

| Key | DataType | Description |
|:---------:|:--------:|:---------------------------------------------------------------------------------------------------|
| Database | string | Name of the table that contain the row. |
| Table | string | Name of the table that contain the row. |
| RowKind | string | The type of operation |
| EventTime | Long | The time at which the connector processed the event. |
| Delay | Long | The difference between data extraction time and database change time |
hawk9821 marked this conversation as resolved.
Show resolved Hide resolved
| Partition | string | Contains the partition field of the corresponding number table of the row, multiple using `,` join |

### note
`Delay` `Partition` only worked on cdc series connectors for now , except TiDB-CDC

## Options

| name | type | required | default value | Description |
|:---------------:|------|----------|---------------|---------------------------------------------------------------------------|
| metadata_fields | map | yes | | A mapping metadata input fields and their corresponding output fields. |

### metadata_fields [map]

A mapping between metadata fields and their respective output fields.

```hocon
metadata_fields {
Database = c_database
Table = c_table
RowKind = c_rowKind
EventTime = c_ts_ms
Delay = c_delay
}
```

## Examples

```yaml

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second = 7000000
read_limit.rows_per_second = 400
}

source {
MySQL-CDC {
result_table_name = "customers_mysql_cdc"
server-id = 5652
username = "root"
password = "zdyk_Dev@2024"
table-names = ["source.user"]
base-url = "jdbc:mysql://172.16.17.123:3306/source"
}
}

transform {
Metadata {
metadata_fields {
Database = database
Table = table
RowKind = rowKind
EventTime = ts_ms
Delay = delay
}
result_table_name = "trans_result"
}
}

sink {
Console {
source_table_name = "custom_name"
}
}

```

4 changes: 2 additions & 2 deletions docs/zh/transform-v2/dynamic-compile.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor
import org.apache.seatunnel.api.table.catalog.CatalogTable
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.*;
Expand Down Expand Up @@ -143,7 +143,7 @@ transform {
compile_pattern="SOURCE_CODE"
source_code="""
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
import org.apache.seatunnel.api.table.catalog.*;
import org.apache.seatunnel.api.table.type.*;
import java.util.ArrayList;
Expand Down
85 changes: 85 additions & 0 deletions docs/zh/transform-v2/metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Metadata

> Metadata transform plugin

## Description
元数据转换插件,用于将元数据字段添加到数据中

## 支持的元数据

| Key | DataType | Description |
|:---------:|:--------:|:-----------------------:|
| Database | string | 包含该行的数据库名 |
| Table | string | 包含该行的数表名 |
| RowKind | string | 行类型 |
| EventTime | Long | |
| Delay | Long | 数据抽取时间与数据库变更时间的差 |
| Partition | string | 包含该行对应数表的分区字段,多个使用`,`连接 |

### 注意事项
`Delay` `Partition`目前只适用于cdc系列连接器,除外TiDB-CDC

## 配置选项

| name | type | required | default value | Description |
|:---------------:|------|:--------:|:-------------:|-------------------|
| metadata_fields | map | 是 | - | 元数据字段与输入字段相应的映射关系 |

### metadata_fields [map]

元数据字段和相应的输出字段之间的映射关系

```hocon
metadata_fields {
database = c_database
table = c_table
rowKind = c_rowKind
ts_ms = c_ts_ms
delay = c_delay
}
```

## 示例

```yaml

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second = 7000000
read_limit.rows_per_second = 400
}

source {
MySQL-CDC {
result_table_name = "customers_mysql_cdc"
server-id = 5652
username = "root"
password = "zdyk_Dev@2024"
table-names = ["source.user"]
base-url = "jdbc:mysql://172.16.17.123:3306/source"
}
}

transform {
Metadata {
metadata_fields {
Database = database
Table = table
RowKind = rowKind
EventTime = ts_ms
Delay = delay
}
result_table_name = "trans_result"
}
}

sink {
Console {
source_table_name = "custom_name"
}
}

```

1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,4 @@ seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
seatunnel.transform.LLM = seatunnel-transforms-v2
seatunnel.transform.Embedding = seatunnel-transforms-v2
seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
seatunnel.transform.Metadata = seatunnel-transforms-v2
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,44 @@ public enum CommonOptions {
/**
* The key of {@link Column#getOptions()} to specify the column value is a json format string.
*/
JSON("Json"),
JSON("Json", false),
/** The key of {@link Column#getOptions()} to specify the column value is a metadata field. */
METADATA("Metadata"),
METADATA("Metadata", false),
/**
* The key of {@link SeaTunnelRow#getOptions()} to store the partition value of the row value.
*/
PARTITION("Partition"),
;
PARTITION("Partition", true),
/**
* The key of {@link SeaTunnelRow#getOptions()} to store the DATABASE value of the row value.
*/
DATABASE("Database", true),
/** The key of {@link SeaTunnelRow#getOptions()} to store the TABLE value of the row value. */
TABLE("Table", true),
/**
* The key of {@link SeaTunnelRow#getOptions()} to store the ROW_KIND value of the row value.
*/
ROW_KIND("RowKind", true),
/**
* The key of {@link SeaTunnelRow#getOptions()} to store the EVENT_TIME value of the row value.
*/
EVENT_TIME("EventTime", true),
/** The key of {@link SeaTunnelRow#getOptions()} to store the DELAY value of the row value. */
DELAY("Delay", true);

private final String name;
private final boolean supportMetadataTrans;

CommonOptions(String name) {
CommonOptions(String name, boolean supportMetadataTrans) {
this.name = name;
this.supportMetadataTrans = supportMetadataTrans;
}

public static CommonOptions fromName(String name) {
for (CommonOptions option : CommonOptions.values()) {
if (option.getName().equals(name)) {
return option;
}
}
throw new IllegalArgumentException("Unknown option name: " + name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.type;

import org.apache.seatunnel.api.table.catalog.TablePath;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

import static org.apache.seatunnel.api.table.type.CommonOptions.DELAY;
import static org.apache.seatunnel.api.table.type.CommonOptions.EVENT_TIME;
import static org.apache.seatunnel.api.table.type.CommonOptions.PARTITION;

public class MetadataUtil {

public static final List<String> METADATA_FIELDS;

static {
METADATA_FIELDS = new ArrayList<>();
Stream.of(CommonOptions.values())
.filter(CommonOptions::isSupportMetadataTrans)
.map(CommonOptions::getName)
.forEach(METADATA_FIELDS::add);
}

public static void setDelay(SeaTunnelRow row, Long delay) {
row.getOptions().put(DELAY.getName(), delay);
}

public static void setPartition(SeaTunnelRow row, String[] partition) {
row.getOptions().put(PARTITION.getName(), partition);
}

public static void setEventTime(SeaTunnelRow row, Long delay) {
row.getOptions().put(EVENT_TIME.getName(), delay);
}

public static Long getDelay(SeaTunnelRowAccessor row) {
return (Long) row.getOptions().get(DELAY.getName());
}

public static String getDatabase(SeaTunnelRowAccessor row) {
if (row.getTableId() == null) {
return null;
}
return TablePath.of(row.getTableId()).getDatabaseName();
}

public static String getTable(SeaTunnelRowAccessor row) {
if (row.getTableId() == null) {
return null;
}
return TablePath.of(row.getTableId()).getTableName();
}

public static String getRowKind(SeaTunnelRowAccessor row) {
return row.getRowKind().shortString();
}

public static String getPartitionStr(SeaTunnelRowAccessor row) {
Object partition = row.getOptions().get(PARTITION.getName());
return Objects.nonNull(partition) ? String.join(",", (String[]) partition) : null;
}

public static String[] getPartition(SeaTunnelRowAccessor row) {
return (String[]) row.getOptions().get(PARTITION.getName());
}

public static Long getEventTime(SeaTunnelRowAccessor row) {
return (Long) row.getOptions().get(EVENT_TIME.getName());
}

public static boolean isMetadataField(String fieldName) {
return METADATA_FIELDS.contains(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public final class SeaTunnelRow implements Serializable {
/** The array to store the actual internal format values. */
private final Object[] fields;

private volatile int size;

private Map<String, Object> options;

private volatile int size;

public SeaTunnelRow(int arity) {
this.fields = new Object[arity];
}
Expand Down
Loading
Loading