Skip to content

Commit

Permalink
[Feature][Transform-v2] Add metadata transform
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 committed Oct 25, 2024
1 parent 15431c9 commit 6a603a0
Show file tree
Hide file tree
Showing 15 changed files with 651 additions and 6 deletions.
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,4 @@ seatunnel.transform.DynamicCompile = seatunnel-transforms-v2
seatunnel.transform.LLM = seatunnel-transforms-v2
seatunnel.transform.Embedding = seatunnel-transforms-v2
seatunnel.transform.RowKindExtractor = seatunnel-transforms-v2
seatunnel.transform.Metadata = seatunnel-transforms-v2
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.table.type;

import org.apache.seatunnel.api.table.catalog.TablePath;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

public class MetadataUtil {

public static final String DATABASE = "database";
public static final String TABLE = "table";
public static final String PARTITION = "partition";
public static final String ROW_KIND = "rowKind";
public static final String EVENT_TIME = "ts_ms";
public static final String DELAY = "delay";

public static final List<String> METADATA_FIELDS =
Arrays.asList(DELAY, PARTITION, DATABASE, TABLE, ROW_KIND, EVENT_TIME);

public static void setDelay(SeaTunnelRow row, Long delay) {
row.getMetadata().put(DELAY, delay);
}

public static void setPartition(SeaTunnelRow row, String[] partition) {
row.getMetadata().put(PARTITION, partition);
}

public static void setEventTime(SeaTunnelRow row, Long delay) {
row.getMetadata().put(EVENT_TIME, delay);
}

public static Long getDelay(SeaTunnelRow row) {
return (Long) row.getMetadata().get(DELAY);
}

public static String getDatabase(SeaTunnelRow row) {
TablePath tablePath = TablePath.of(row.getTableId());
return tablePath.getDatabaseName();
}

public static String getTable(SeaTunnelRow row) {
TablePath tablePath = TablePath.of(row.getTableId());
return tablePath.getTableName();
}

public static String getRowKind(SeaTunnelRow row) {
return row.getRowKind().shortString();
}

public static String getPartitionStr(SeaTunnelRow row) {
Object partition = row.getMetadata().get(PARTITION);
return Objects.nonNull(partition) ? String.join(",", (String[]) partition) : null;
}

public static String[] getPartition(SeaTunnelRow row) {
return (String[]) row.getMetadata().get(PARTITION);
}

public static Long getEventTime(SeaTunnelRow row) {
return (Long) row.getMetadata().get(EVENT_TIME);
}

public static boolean isMetadataField(String fieldName) {
return METADATA_FIELDS.contains(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

Expand All @@ -33,6 +34,8 @@ public final class SeaTunnelRow implements Serializable {
/** The array to store the actual internal format values. */
private final Object[] fields;

private Map<String, Object> metadata = new HashMap<String, Object>();

private volatile int size;

public SeaTunnelRow(int arity) {
Expand Down Expand Up @@ -340,6 +343,10 @@ public int hashCode() {
return result;
}

public Map<String, Object> getMetadata() {
return metadata;
}

@Override
public String toString() {
return "SeaTunnelRow{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ public static Object[] rowToArray(ResultSet rs, int size) throws SQLException {
}

/**
* Return the timestamp when the change event is produced in MySQL.
*
* <p>The field `source.ts_ms` in {@link SourceRecord} data struct is the time when the change
* event is operated in MySQL.
* In the source object, ts_ms indicates the time that the change was made in the database. By
* comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can
* determine the lag between the source database update and Debezium.
*/
public static Long getMessageTimestamp(SourceRecord record) {
Schema schema = record.valueSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
import org.apache.seatunnel.api.table.type.MetadataUtil;
import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand Down Expand Up @@ -173,26 +174,39 @@ private void deserializeDataChangeRecord(SourceRecord record, Collector<SeaTunne
} else {
converters = tableRowConverters.get(DEFAULT_TABLE_NAME_KEY);
}

Long fetchTimestamp = SourceRecordUtils.getFetchTimestamp(record);
Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(record);
long delay = -1L;
if (fetchTimestamp != null && messageTimestamp != null) {
delay = fetchTimestamp - messageTimestamp;
}
if (operation == Envelope.Operation.CREATE || operation == Envelope.Operation.READ) {
SeaTunnelRow insert = extractAfterRow(converters, record, messageStruct, valueSchema);
insert.setRowKind(RowKind.INSERT);
insert.setTableId(tableId);
MetadataUtil.setDelay(insert, delay);
MetadataUtil.setEventTime(insert, fetchTimestamp);
collector.collect(insert);
} else if (operation == Envelope.Operation.DELETE) {
SeaTunnelRow delete = extractBeforeRow(converters, record, messageStruct, valueSchema);
delete.setRowKind(RowKind.DELETE);
delete.setTableId(tableId);
MetadataUtil.setDelay(delete, delay);
MetadataUtil.setEventTime(delete, fetchTimestamp);
collector.collect(delete);
} else if (operation == Envelope.Operation.UPDATE) {
SeaTunnelRow before = extractBeforeRow(converters, record, messageStruct, valueSchema);
before.setRowKind(RowKind.UPDATE_BEFORE);
before.setTableId(tableId);
MetadataUtil.setDelay(before, delay);
MetadataUtil.setEventTime(before, fetchTimestamp);
collector.collect(before);

SeaTunnelRow after = extractAfterRow(converters, record, messageStruct, valueSchema);
after.setRowKind(RowKind.UPDATE_AFTER);
after.setTableId(tableId);
MetadataUtil.setDelay(after, delay);
MetadataUtil.setEventTime(after, fetchTimestamp);
collector.collect(after);
} else {
log.warn("Received {} operation, skip", operation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-transforms-v2</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;
Expand Down Expand Up @@ -180,6 +181,35 @@ public void testMysqlCdcCheckDataE2e(TestContainer container) {
});
}

@TestTemplate
public void testMysqlCdcMetadataTrans(TestContainer container)
throws InterruptedException, IOException {
// Clear related content to ensure that multiple operations are not affected
clearTable(MYSQL_DATABASE, SOURCE_TABLE_1);
clearTable(MYSQL_DATABASE, SINK_TABLE);

Long jobId = JobIdGenerator.newJobId();
AtomicReference<Container.ExecResult> execResult = new AtomicReference<>();
CompletableFuture.supplyAsync(
() -> {
try {
execResult.set(
container.executeJob(
"/mysqlcdc_to_metadata_trans.conf", String.valueOf(jobId)));
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
return null;
});
// insert update delete
upsertDeleteSourceTable(MYSQL_DATABASE, SOURCE_TABLE_1);
TimeUnit.SECONDS.sleep(30);
Assertions.assertEquals(0, execResult.get().getExitCode(), execResult.get().getStderr());
Container.ExecResult cancelJobResult = container.cancelJob(String.valueOf(jobId));
Assertions.assertEquals(0, cancelJobResult.getExitCode(), cancelJobResult.getStderr());
}

@TestTemplate
public void testMysqlCdcCheckDataWithDisableExactlyonce(TestContainer container) {
// Clear related content to ensure that multiple operations are not affected
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second = 7000000
read_limit.rows_per_second = 400
}

source {
MySQL-CDC {
result_table_name = "customers_mysql_cdc"
server-id = 5652
username = "st_user_source"
password = "mysqlpw"
table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
}
}

transform {
Metadata {
metadata_fields {
database = database
table = table
partition = partition
rowKind = rowKind
ts_ms = ts_ms
delay = delay
}
result_table_name = "trans_result"
}
}

sink {
Assert {
source_table_name = "trans_result"
rules {
field_rules = [
{
field_name = database
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}, {
field_name = table
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}, {
field_name = partition
field_type = string
}, {
field_name = rowKind
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}, {
field_name = ts_ms
field_type = long
field_value = [
{
rule_type = NOT_NULL
}
]
}, {
field_name = delay
field_type = long
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ public Object getField(int pos) {
public Object[] getFields() {
return row.getFields();
}

public SeaTunnelRow getRow() {
return row;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND;
import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND;
import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.METADATA_FIELDS_NOT_FOUND;
import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.METADATA_MAPPING_FIELD_EXISTS;

/** The common error of SeaTunnel transform. Please refer {@link CommonError} */
public class TransformCommonError {
Expand All @@ -43,4 +45,18 @@ public static TransformException cannotFindInputFieldsError(
params.put("transform", transform);
return new TransformException(INPUT_FIELDS_NOT_FOUND, params);
}

public static TransformException cannotFindMetadataFieldError(String transform, String field) {
Map<String, String> params = new HashMap<>();
params.put("field", field);
params.put("transform", transform);
return new TransformException(METADATA_FIELDS_NOT_FOUND, params);
}

public static TransformException metadataMappingFieldExists(String transform, String field) {
Map<String, String> params = new HashMap<>();
params.put("field", field);
params.put("transform", transform);
return new TransformException(METADATA_MAPPING_FIELD_EXISTS, params);
}
}
Loading

0 comments on commit 6a603a0

Please sign in to comment.