Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Kudu] Refactor Kudu functionality and Sink support CDC data. #5437

Merged
merged 14 commits into from
Oct 26, 2023
151 changes: 111 additions & 40 deletions docs/en/connector-v2/sink/Kudu.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,122 @@

> Kudu sink connector

## Description
## Support Those Engines

Write data to Kudu.

The tested kudu version is 1.11.1.
> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that you have added the features of Source to Sink's documentation

- [x] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|----------------|--------|----------|---------------|
| kudu_master | string | yes | - |
| kudu_table | string | yes | - |
| save_mode | string | yes | - |
| common-options | | no | - |

### kudu_master [string]

`kudu_master` The address of kudu master,such as '192.168.88.110:7051'.

### kudu_table [string]

`kudu_table` The name of kudu table..

### save_mode [string]

Storage mode, we need support `overwrite` and `append`. `append` is now supported.

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.

## Example

```bash

kudu {
kudu_master = "192.168.88.110:7051"
kudu_table = "studentlyhresultflink"
save_mode="append"
}

- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Data Type Mapping

| SeaTunnel Data type | kudu Data type |
|---------------------|--------------------------|
| BOOLEAN | BOOL |
| INT | INT8<br/>INT16<br/>INT32 |
| BIGINT | INT64 |
| DECIMAL | DECIMAL |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| STRING | STRING |
| TIMESTAMP | UNIXTIME_MICROS |
| BYTES | BINARY |

## Sink Options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format have some error:


| Name | Type | Required | Default | Description |
|-------------------------------------------|--------|----------|------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------|---|
| kudu_masters | String | Yes | - | Kudu master address. Separated by ',',such as '192.168.88.110:7051'. |
| table_name | String | Yes | - | The name of kudu table. | |
| client_worker_count | Int | No | 2 * Runtime.getRuntime().availableProcessors() | Kudu worker count. Default value is twice the current number of cpu cores. |
| client_default_operation_timeout_ms | Long | No | 30000 | Kudu normal operation time out. |
| client_default_admin_operation_timeout_ms | Long | No | 30000 | Kudu admin operation time out. |
| kerberos_principal | String | No | - | Kerberos principal. |
| kerberos_keytab | String | No | - | Kerberos keytab. |
| kerberos_krb5conf | String | No | - | Kerberos krb5 conf. |
| save_mode | String | No | - | Storage mode, support `overwrite` and `append`. |
| session_flush_mode | String | No | AUTO_FLUSH_SYNC | Kudu flush mode. Default AUTO_FLUSH_SYNC. |
| session_mutation_buffer_space | Int | No | 1024 | The max size of Kudu buffer which buffed data. |
| batch_size | Int | No | 1024 | The flush max size (includes all append, upsert and delete records), over this number of records, will flush data. The default value is 100 |
| buffer_flush_interval | Int | No | 10000 | The flush interval mills, over this time, asynchronous threads will flush data. |
| ignore_not_found | Bool | No | false | If true, ignore all not found rows. |
| ignore_not_duplicate | Bool | No | false | If true, ignore all dulicate rows. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |

## Task Example

### Simple:

> The following example refers to a FakeSource named "kudu" cdc write kudu table "kudu_sink_table"

```hocon

env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
result_table_name = "kudu"
schema = {
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = UPDATE_BEFORE
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = UPDATE_AFTER
fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = DELETE
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
}
}

sink {
kudu{
source_table_name = "kudu"
kudu_masters = "kudu-master-cdc:7051"
table_name = "kudu_sink_table"
}
}
```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add examples for kerberos enable.


## Changelog
Expand Down
109 changes: 73 additions & 36 deletions docs/en/connector-v2/source/Kudu.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,95 @@

> Kudu source connector

## Description
## Support Those Engines

Used to read data from Kudu.

The tested kudu version is 1.11.1.
> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code it seems not supported stream

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [column projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|----------------|--------|----------|---------------|
| kudu_master | string | yes | - |
| kudu_table | string | yes | - |
| columnsList | string | yes | - |
| common-options | | no | - |

### kudu_master [string]

`kudu_master` The address of kudu master,such as '192.168.88.110:7051'.

### kudu_table [string]

`kudu_table` The name of kudu table..

### columnsList [string]

`columnsList` Specifies the column names of the table.
## Description

### common options
Used to read data from Kudu.

Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
The tested kudu version is 1.11.1.

## Examples
## Data Type Mapping

| kudu Data type | SeaTunnel Data type |
|--------------------------|---------------------|
| BOOL | BOOLEAN |
| INT8<br/>INT16<br/>INT32 | INT |
| INT64 | BIGINT |
| DECIMAL | DECIMAL |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| STRING | STRING |
| UNIXTIME_MICROS | TIMESTAMP |
| BINARY | BYTES |

## Source Options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format have some error.

Uploading image.png…


| Name | Type | Required | Default | Description |
|-------------------------------------------|--------|----------|------------------------------------------------|----------------------------------------------------------------------------------------------------------|---|
| kudu_masters | String | Yes | - | Kudu master address. Separated by ',',such as '192.168.88.110:7051'. |
| table_name | String | Yes | - | The name of kudu table. | |
| client_worker_count | Int | No | 2 * Runtime.getRuntime().availableProcessors() | Kudu worker count. Default value is twice the current number of cpu cores. |
| client_default_operation_timeout_ms | Long | No | 30000 | Kudu normal operation time out. |
| client_default_admin_operation_timeout_ms | Long | No | 30000 | Kudu admin operation time out. |
| kerberos_principal | String | No | - | Kerberos principal. |
| kerberos_keytab | String | No | - | Kerberos keytab. |
| kerberos_krb5conf | String | No | - | Kerberos krb5 conf. |
| scan_token_query_timeout | Long | No | 30000 | The timeout for connecting scan token. If not set, it will be the same as operationTimeout. |
| scan_token_batch_size_bytes | Int | No | 1024 * 1024 | Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB. |
| filter | Int | No | 1024 * 1024 | Kudu scan filter expressions,Not supported yet. |
| schema | Map | No | 1024 * 1024 | SeaTunnel Schema. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |

## Task Example

### Simple:

> The following example is for a Kudu table named "kudu_source_table", The goal is to print the data from this table on the console and write kudu table "kudu_sink_table"

```hocon
# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 2
job.mode = "BATCH"
}

source {
Kudu {
result_table_name = "studentlyh2"
kudu_master = "192.168.88.110:7051"
kudu_table = "studentlyh2"
columnsList = "id,name,age,sex"
}
# This is a example source plugin **only for test and demonstrate the feature source plugin**
kudu{
kudu_masters = "kudu-master:7051"
table_name = "kudu_source_table"
result_table_name = "kudu"
}
}

transform {
}

sink {
console {
source_table_name = "kudu"
}

kudu{
source_table_name = "kudu"
kudu_masters = "kudu-master:7051"
table_name = "kudu_sink_table"
}
```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add kerberos enable examples.


## Changelog
Expand Down
14 changes: 14 additions & 0 deletions seatunnel-connectors-v2/connector-kudu/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,19 @@
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
<version>${project.version}</version>
<classifier>optional</classifier>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
Loading
Loading