Skip to content

Commit

Permalink
Merge branch '2.6-test' into business-dev
Browse files Browse the repository at this point in the history
# Conflicts:
#	seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
#	seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
#	seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/RedshiftJdbcClient.java
#	seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
#	seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/datatype/ToRedshiftTypeConverter.java
#	seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeHandler.java
  • Loading branch information
Hisoka-X committed Aug 24, 2023
2 parents 9542752 + 379f796 commit 17cf8a8
Show file tree
Hide file tree
Showing 174 changed files with 11,677 additions and 697 deletions.
1 change: 1 addition & 0 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ on:
push:
branches:
- '**-release'
- '**-test'
env:
GH_USERNAME: ${{ secrets.GH_PACKAGE_USER }}
GH_TOKEN: ${{ secrets.GH_PACKAGE_TOKEN }}
Expand Down
107 changes: 107 additions & 0 deletions docs/en/connector-v2/formats/debezium-json.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Debezium Format

Changelog-Data-Capture Format: Serialization Schema Format: Deserialization Schema

Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a *change event stream*, and applications simply read these streams to see the change events in the same order in which they occurred.

Seatunnel supports to interpret Debezium JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as

synchronizing incremental data from databases to other systems
auditing logs
real-time materialized views on databases
temporal join changing history of a database table and so on.

Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel asDebezium JSON messages, and emit to storage like Kafka.

# Format Options

| option | default | required | Description |
|-----------------------------------|---------|----------|------------------------------------------------------------------------------------------------------|
| format | (none) | yes | Specify what format to use, here should be 'debezium_json'. |
| debezium-json.ignore-parse-errors | false | no | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |

# How to use Debezium format

## Kafka uses example

Debezium provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table:

```bash
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter ",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter ",
"weight": 5.17
},
"source": {
"version": "1.1.1.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1589362330000,
"snapshot": "false",
"db": "inventory",
"table": "products",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 2090,
"row": 0,
"thread": 2,
"query": null
},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
```

Note: please refer to Debezium documentation about the meaning of each fields.

The MySQL products table has 4 columns (id, name, description and weight).
The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15.
Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel conf to consume this topic and interpret the change events by Debezium format.

```bash
env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "products_binlog"
result_table_name = "kafka_name"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
}
format = debezium_json
}

}

transform {
}

sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "consume-binlog"
format = debezium_json
}
}
```

16 changes: 10 additions & 6 deletions docs/en/connector-v2/sink/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@ Kafka distinguishes different transactions by different transactionId. This para

### format

Data format. The default format is json. Optional text format. The default field separator is ",".
If you customize the delimiter, add the "field_delimiter" option.
Data format. The default format is json. Optional text format, canal-json and debezium-json.
If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.
If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.
If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details.

### field_delimiter

Expand Down Expand Up @@ -209,8 +211,10 @@ sink {

### next version

- [Improve] Support to specify multiple partition keys [3230](https://github.com/apache/seatunnel/pull/3230)
- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/seatunnel/pull/3711)
- [Improve] Support extract topic from SeaTunnelRow fields [3742](https://github.com/apache/seatunnel/pull/3742)
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719)
- [Improve] Support to specify multiple partition keys [3230](https://github.com/apache/incubator-seatunnel/pull/3230)
- [Improve] Add text format for kafka sink connector [3711](https://github.com/apache/incubator-seatunnel/pull/3711)
- [Improve] Support extract topic from SeaTunnelRow fields [3742](https://github.com/apache/incubator-seatunnel/pull/3742)
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
- [Improve] Support read canal format message [3950](https://github.com/apache/incubator-seatunnel/pull/3950)
- [Improve] Support read debezium format message [3981](https://github.com/apache/incubator-seatunnel/pull/3981)

11 changes: 5 additions & 6 deletions docs/en/connector-v2/sink/S3-Redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,8 @@ hadoop_s3_properties {

The changelog mode of the sink writer, support:
`APPEND_ONLY`: Only append data to the target table.
`APPEND_ON_DUPLICATE_UPDATE`: If the primary key exists, update the data, otherwise insert the data.
`APPEND_ON_DUPLICATE_UPDATE_AUTOMATIC`: If the primary key exists, update the data, otherwise insert the data. Automatically switch copy/merge mode between snapshot sync and incremental sync.
`APPEND_ON_DUPLICATE_DELETE`: If the primary key exists, delete the data, otherwise insert the data.
`APPEND_ON_DUPLICATE_DELETE_AUTOMATIC`: If the primary key exists, delete the data, otherwise insert the data. Automatically switch copy/merge mode between snapshot sync and incremental sync.
`APPEND_ON_DUPLICATE_UPDATE`: If the primary key exists, update(update/delete) the data, otherwise insert the data.
`APPEND_ON_DUPLICATE_UPDATE_AUTOMATIC`: If the primary key exists, update(update/delete) the data, otherwise insert the data. Automatically switch copy/merge mode between snapshot sync and incremental sync.

### changelog_buffer_flush_size [int]

Expand Down Expand Up @@ -156,11 +154,12 @@ For append only
```

Support write cdc changelog event(APPEND_ON_DUPLICATE_UPDATE/APPEND_ON_DUPLICATE_UPDATE_AUTOMATIC/APPEND_ON_DUPLICATE_DELETE/APPEND_ON_DUPLICATE_DELETE_AUTOMATIC).
Support write cdc changelog event(APPEND_ON_DUPLICATE_UPDATE/APPEND_ON_DUPLICATE_UPDATE_AUTOMATIC).

*Using Redshift COPY sql import s3 file into tmp table, and use Redshift MERGE sql merge tmp table data into target table.*
*Using Redshift COPY sql import s3 file into tmp table, and use Redshift DELETE/MERGE sql merge tmp table data into target table.*
- [Redshift TEMPORARY Table](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html)
- [Redshift COPY SQL](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html)
- [Redshift DELETE USING SQL](https://docs.aws.amazon.com/redshift/latest/dg/r_DELETE.html)
- [Redshift MERGE SQL](https://docs.aws.amazon.com/redshift/latest/dg/r_MERGE.html)

Config example:
Expand Down
17 changes: 10 additions & 7 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ The structure of the data, including field names and field types.

## format

Data format. The default format is json. Optional text format. The default field separator is ", ".
If you customize the delimiter, add the "field_delimiter" option.
Data format. The default format is json. Optional text format, canal-json and debezium-json.
If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.
If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.
If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details.

## format_error_handle_way

Expand Down Expand Up @@ -221,9 +223,10 @@ source {

### Next Version

- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/seatunnel/pull/3157))
- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/seatunnel/pull/3125))
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719)
- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/seatunnel/pull/3810))
- [Feature] Kafka source supports data deserialization failure skipping([4364](https://github.com/apache/seatunnel/pull/4364))
- [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
- [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810))
- [Improve] Support read canal format message [3950](https://github.com/apache/incubator-seatunnel/pull/3950)
- [Improve] Support read debezium format message [3981](https://github.com/apache/incubator-seatunnel/pull/3981)

2 changes: 2 additions & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,5 @@ seatunnel.sink.Rocketmq = connector-rocketmq
seatunnel.source.Paimon = connector-paimon
seatunnel.sink.Paimon = connector-paimon
seatunnel.source.Postgres-CDC = connector-cdc-postgres
seatunnel.source.Informix-CDC = connector-cdc-informix
seatunnel.sink.DolphinDB = connector-dolphindb
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

<properties>
<!--todo The classification is too confusing, reclassify by type-->
<revision>2.5.2-WS-SNAPSHOT</revision>
<revision>2.6-WS-test-SNAPSHOT</revision>
<seatunnel.config.shade.version>2.1.1</seatunnel.config.shade.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
Expand Down
12 changes: 11 additions & 1 deletion release-note.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,19 @@
## Bug fix

### Core

- [Core] [API] Fixed generic class loss for lists (#4421)
- [Core] [API] Fix parse nested row data type key changed upper (#4459)
- [Starter][Flink]Support transform-v2 for flink #3396
- [Flink] Support flink 1.14.x #3963
### Transformer
- [Spark] Support transform-v2 for spark (#3409)
- [ALL]Add FieldMapper Transform #3781
### Connectors
- [Elasticsearch] Support https protocol & compatible with opensearch
- [Hbase] Add hbase sink connector #4049
### Formats
- [Canal]Support read canal format message #3950
- [Debezium]Support debezium canal format message #3981

### Connector-V2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ private CatalogTableUtil(CatalogTable catalogTable) {

@Deprecated
public static CatalogTable getCatalogTable(String tableName, SeaTunnelRowType rowType) {
return getCatalogTable("schema", "default", null, tableName, rowType);
}

public static CatalogTable getCatalogTable(
String catalog,
String database,
String schema,
String tableName,
SeaTunnelRowType rowType) {
TableSchema.Builder schemaBuilder = TableSchema.builder();
for (int i = 0; i < rowType.getTotalFields(); i++) {
PhysicalColumn column =
Expand All @@ -88,7 +97,7 @@ public static CatalogTable getCatalogTable(String tableName, SeaTunnelRowType ro
schemaBuilder.column(column);
}
return CatalogTable.of(
TableIdentifier.of("schema", "default", tableName),
TableIdentifier.of(catalog, database, schema, tableName),
schemaBuilder.build(),
new HashMap<>(),
new ArrayList<>(),
Expand Down Expand Up @@ -246,6 +255,8 @@ public static SeaTunnelDataType<?> parseDataType(String columnStr) {
return LocalTimeType.LOCAL_TIME_TYPE;
case TIMESTAMP:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case MAP:
return parseMapType(columnStr);
default:
throw new UnsupportedOperationException(
String.format("the type[%s] is not support", columnStr));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@

import org.apache.seatunnel.shade.com.google.common.collect.Lists;

import com.google.common.base.Preconditions;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;

public class MapType<K, V> implements CompositeType<Map<K, V>> {

private static final List<SqlType> SUPPORTED_KEY_TYPES =
Expand All @@ -49,9 +48,9 @@ public class MapType<K, V> implements CompositeType<Map<K, V>> {
private final SeaTunnelDataType<V> valueType;

public MapType(SeaTunnelDataType<K> keyType, SeaTunnelDataType<V> valueType) {
checkNotNull(keyType, "The key type is required.");
checkNotNull(valueType, "The value type is required.");
checkArgument(
Preconditions.checkNotNull(keyType, "The key type is required.");
Preconditions.checkNotNull(valueType, "The value type is required.");
Preconditions.checkArgument(
SUPPORTED_KEY_TYPES.contains(keyType.getSqlType()),
"Unsupported key types: %s",
keyType);
Expand Down
Loading

0 comments on commit 17cf8a8

Please sign in to comment.