diff --git a/docs/en/connector-v2/sink/Kudu.md b/docs/en/connector-v2/sink/Kudu.md index 9a024ba4a41..b6e4eee24c1 100644 --- a/docs/en/connector-v2/sink/Kudu.md +++ b/docs/en/connector-v2/sink/Kudu.md @@ -2,51 +2,125 @@ > Kudu sink connector -## Description +## Support Kudu Version -Write data to Kudu. +- 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0 -The tested kudu version is 1.11.1. +## Support Those Engines + +> Spark
+> Flink
+> SeaTunnel Zeta
## Key features - [ ] [exactly-once](../../concept/connector-v2-features.md) - -## Options - -| name | type | required | default value | -|----------------|--------|----------|---------------| -| kudu_master | string | yes | - | -| kudu_table | string | yes | - | -| save_mode | string | yes | - | -| common-options | | no | - | - -### kudu_master [string] - -`kudu_master` The address of kudu master,such as '192.168.88.110:7051'. - -### kudu_table [string] - -`kudu_table` The name of kudu table.. - -### save_mode [string] - -Storage mode, we need support `overwrite` and `append`. `append` is now supported. - -### common options - -Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. - -## Example - -```bash - - kudu { - kudu_master = "192.168.88.110:7051" - kudu_table = "studentlyhresultflink" - save_mode="append" - } - +- [x] [cdc](../../concept/connector-v2-features.md) + +## Data Type Mapping + +| SeaTunnel Data type | kudu Data type | +|---------------------|--------------------------| +| BOOLEAN | BOOL | +| INT | INT8
INT16
INT32 | +| BIGINT | INT64 | +| DECIMAL | DECIMAL | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| STRING | STRING | +| TIMESTAMP | UNIXTIME_MICROS | +| BYTES | BINARY | + +## Sink Options + +| Name | Type | Required | Default | Description | +|-------------------------------------------|--------|----------|------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------| +| kudu_masters | String | Yes | - | Kudu master address. Separated by ',',such as '192.168.88.110:7051'. | +| table_name | String | Yes | - | The name of kudu table. | +| client_worker_count | Int | No | 2 * Runtime.getRuntime().availableProcessors() | Kudu worker count. Default value is twice the current number of cpu cores. | +| client_default_operation_timeout_ms | Long | No | 30000 | Kudu normal operation time out. | +| client_default_admin_operation_timeout_ms | Long | No | 30000 | Kudu admin operation time out. | +| enable_kerberos | Bool | No | false | Kerberos principal enable. | +| kerberos_principal | String | No | - | Kerberos principal. Note that all zeta nodes require have this file. | +| kerberos_keytab | String | No | - | Kerberos keytab. Note that all zeta nodes require have this file. | +| kerberos_krb5conf | String | No | - | Kerberos krb5 conf. Note that all zeta nodes require have this file. | +| save_mode | String | No | - | Storage mode, support `overwrite` and `append`. | +| session_flush_mode | String | No | AUTO_FLUSH_SYNC | Kudu flush mode. Default AUTO_FLUSH_SYNC. | +| batch_size | Int | No | 1024 | The flush max size (includes all append, upsert and delete records), over this number of records, will flush data. The default value is 100 | +| buffer_flush_interval | Int | No | 10000 | The flush interval mills, over this time, asynchronous threads will flush data. | +| ignore_not_found | Bool | No | false | If true, ignore all not found rows. | +| ignore_not_duplicate | Bool | No | false | If true, ignore all dulicate rows. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | + +## Task Example + +### Simple: + +> The following example refers to a FakeSource named "kudu" cdc write kudu table "kudu_sink_table" + +```hocon + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + source { + FakeSource { + result_table_name = "kudu" + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_BEFORE + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_AFTER + fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = DELETE + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } + } + +sink { + kudu{ + source_table_name = "kudu" + kudu_masters = "kudu-master-cdc:7051" + table_name = "kudu_sink_table" + enable_kerberos = true + kerberos_principal = "xx@xx.COM" + kerberos_keytab = "xx.keytab" + } +} ``` ## Changelog diff --git a/docs/en/connector-v2/source/Kudu.md b/docs/en/connector-v2/source/Kudu.md index 0fc39b82f79..f3953e98ae6 100644 --- a/docs/en/connector-v2/source/Kudu.md +++ b/docs/en/connector-v2/source/Kudu.md @@ -2,58 +2,105 @@ > Kudu source connector -## Description +## Support Kudu Version -Used to read data from Kudu. +- 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0 -The tested kudu version is 1.11.1. +## Support Those Engines + +> Spark
+> Flink
+> SeaTunnel Zeta
## Key features - [x] [batch](../../concept/connector-v2-features.md) -- [ ] [stream](../../concept/connector-v2-features.md) - [ ] [exactly-once](../../concept/connector-v2-features.md) -- [ ] [column projection](../../concept/connector-v2-features.md) -- [ ] [parallelism](../../concept/connector-v2-features.md) +- [x] [column projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) - [ ] [support user-defined split](../../concept/connector-v2-features.md) -## Options - -| name | type | required | default value | -|----------------|--------|----------|---------------| -| kudu_master | string | yes | - | -| kudu_table | string | yes | - | -| columnsList | string | yes | - | -| common-options | | no | - | - -### kudu_master [string] - -`kudu_master` The address of kudu master,such as '192.168.88.110:7051'. - -### kudu_table [string] - -`kudu_table` The name of kudu table.. - -### columnsList [string] - -`columnsList` Specifies the column names of the table. +## Description -### common options +Used to read data from Kudu. -Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. +The tested kudu version is 1.11.1. -## Examples +## Data Type Mapping + +| kudu Data type | SeaTunnel Data type | +|--------------------------|---------------------| +| BOOL | BOOLEAN | +| INT8
INT16
INT32 | INT | +| INT64 | BIGINT | +| DECIMAL | DECIMAL | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| STRING | STRING | +| UNIXTIME_MICROS | TIMESTAMP | +| BINARY | BYTES | + +## Source Options + +| Name | Type | Required | Default | Description | +|-------------------------------------------|--------|----------|------------------------------------------------|----------------------------------------------------------------------------------------------------------| +| kudu_masters | String | Yes | - | Kudu master address. Separated by ',',such as '192.168.88.110:7051'. | +| table_name | String | Yes | - | The name of kudu table. | +| client_worker_count | Int | No | 2 * Runtime.getRuntime().availableProcessors() | Kudu worker count. Default value is twice the current number of cpu cores. | +| client_default_operation_timeout_ms | Long | No | 30000 | Kudu normal operation time out. | +| client_default_admin_operation_timeout_ms | Long | No | 30000 | Kudu admin operation time out. | +| enable_kerberos | Bool | No | false | Kerberos principal enable. | +| kerberos_principal | String | No | - | Kerberos principal. Note that all zeta nodes require have this file. | +| kerberos_keytab | String | No | - | Kerberos keytab. Note that all zeta nodes require have this file. | +| kerberos_krb5conf | String | No | - | Kerberos krb5 conf. Note that all zeta nodes require have this file. | +| scan_token_query_timeout | Long | No | 30000 | The timeout for connecting scan token. If not set, it will be the same as operationTimeout. | +| scan_token_batch_size_bytes | Int | No | 1024 * 1024 | Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB. | +| filter | Int | No | 1024 * 1024 | Kudu scan filter expressions,Not supported yet. | +| schema | Map | No | 1024 * 1024 | SeaTunnel Schema. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | + +## Task Example + +### Simple: + +> The following example is for a Kudu table named "kudu_source_table", The goal is to print the data from this table on the console and write kudu table "kudu_sink_table" ```hocon +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 2 + job.mode = "BATCH" +} + source { - Kudu { - result_table_name = "studentlyh2" - kudu_master = "192.168.88.110:7051" - kudu_table = "studentlyh2" - columnsList = "id,name,age,sex" - } + # This is a example source plugin **only for test and demonstrate the feature source plugin** + kudu{ + kudu_masters = "kudu-master:7051" + table_name = "kudu_source_table" + result_table_name = "kudu" + enable_kerberos = true + kerberos_principal = "xx@xx.COM" + kerberos_keytab = "xx.keytab" +} +} +transform { } + +sink { + console { + source_table_name = "kudu" + } + + kudu{ + source_table_name = "kudu" + kudu_masters = "kudu-master:7051" + table_name = "kudu_sink_table" + enable_kerberos = true + kerberos_principal = "xx@xx.COM" + kerberos_keytab = "xx.keytab" + } ``` ## Changelog diff --git a/seatunnel-connectors-v2/connector-kudu/pom.xml b/seatunnel-connectors-v2/connector-kudu/pom.xml index a4f58a3f6c5..9dcdc87853a 100644 --- a/seatunnel-connectors-v2/connector-kudu/pom.xml +++ b/seatunnel-connectors-v2/connector-kudu/pom.xml @@ -56,5 +56,19 @@ connector-common ${project.version} + + + org.apache.seatunnel + seatunnel-hadoop3-3.1.4-uber + ${project.version} + optional + provided + + + org.apache.avro + avro + + + diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java new file mode 100644 index 00000000000..0fa40d8b0e0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kudu.catalog; + +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig; +import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper; +import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil; + +import org.apache.commons.lang.StringUtils; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.shaded.com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ADMIN_OPERATION_TIMEOUT; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ENABLE_KERBEROS; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.KERBEROS_KEYTAB; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.KERBEROS_KRB5_CONF; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.KERBEROS_PRINCIPAL; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.MASTER; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.OPERATION_TIMEOUT; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.TABLE_NAME; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.WORKER_COUNT; +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; + +public class KuduCatalog implements Catalog { + + private final CommonConfig config; + + private KuduClient kuduClient; + + private final String defaultDatabase = "default_database"; + + private final String catalogName; + + public KuduCatalog(String catalogName, CommonConfig config) { + this.config = config; + this.catalogName = catalogName; + } + + @Override + public void open() throws CatalogException { + kuduClient = KuduUtil.getKuduClient(config); + } + + @Override + public void close() throws CatalogException { + try { + kuduClient.close(); + } catch (KuduException e) { + throw new CatalogException("Failed close kudu client", e); + } + } + + @Override + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + return listDatabases().contains(databaseName); + } + + @Override + public List listDatabases() throws CatalogException { + return Lists.newArrayList(getDefaultDatabase()); + } + + @Override + public List listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + try { + return kuduClient.getTablesList().getTablesList(); + } catch (KuduException e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", this.catalogName), e); + } + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + checkNotNull(tablePath); + try { + return kuduClient.tableExists(tablePath.getFullName()); + } catch (KuduException e) { + throw new CatalogException(e); + } + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + checkNotNull(tablePath); + + if (!tableExists(tablePath)) { + throw new TableNotExistException(catalogName, tablePath); + } + + String tableName = tablePath.getFullName(); + + try { + KuduTable kuduTable = kuduClient.openTable(tableName); + TableSchema.Builder builder = TableSchema.builder(); + Schema schema = kuduTable.getSchema(); + kuduTable.getPartitionSchema(); + List columnSchemaList = schema.getColumns(); + Optional primaryKey = getPrimaryKey(schema.getPrimaryKeyColumns()); + for (int i = 0; i < columnSchemaList.size(); i++) { + SeaTunnelDataType type = KuduTypeMapper.mapping(columnSchemaList, i); + builder.column( + PhysicalColumn.of( + columnSchemaList.get(i).getName(), + type, + columnSchemaList.get(i).getTypeSize(), + columnSchemaList.get(i).isNullable(), + columnSchemaList.get(i).getDefaultValue(), + columnSchemaList.get(i).getComment())); + } + + primaryKey.ifPresent(builder::primaryKey); + + TableIdentifier tableIdentifier = + TableIdentifier.of( + catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); + + return CatalogTable.of( + tableIdentifier, + builder.build(), + buildConnectorOptions(tablePath), + Collections.emptyList(), + tableName); + } catch (Exception e) { + throw new CatalogException("An exception occurred while obtaining the table", e); + } + } + + private Map buildConnectorOptions(TablePath tablePath) { + Map options = new HashMap<>(8); + options.put("connector", "kudu"); + options.put(TABLE_NAME.key(), tablePath.getFullName()); + options.put(MASTER.key(), config.getMasters()); + options.put(WORKER_COUNT.key(), config.getWorkerCount().toString()); + options.put(OPERATION_TIMEOUT.key(), config.getOperationTimeout().toString()); + options.put(ADMIN_OPERATION_TIMEOUT.key(), config.getAdminOperationTimeout().toString()); + if (config.getEnableKerberos()) { + options.put(KERBEROS_PRINCIPAL.key(), config.getPrincipal()); + options.put(KERBEROS_KEYTAB.key(), config.getKeytab()); + if (StringUtils.isNotBlank(config.getKrb5conf())) { + options.put(KERBEROS_KRB5_CONF.key(), config.getKrb5conf()); + } + } + options.put(ENABLE_KERBEROS.key(), config.getEnableKerberos().toString()); + return options; + } + + @Override + public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + String tableName = tablePath.getFullName(); + try { + if (tableExists(tablePath)) { + kuduClient.deleteTable(tableName); + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (KuduException e) { + throw new CatalogException("Could not delete table " + tableName, e); + } + } + + @Override + public void createDatabase(TablePath tablePath, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException(); + } + + protected Optional getPrimaryKey(List columnSchemaList) { + List pkFields = + columnSchemaList.stream().map(ColumnSchema::getName).collect(Collectors.toList()); + if (!pkFields.isEmpty()) { + String pkName = "pk_" + String.join("_", pkFields); + return Optional.of(PrimaryKey.of(pkName, pkFields)); + } + return Optional.empty(); + } +} diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalogFactory.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalogFactory.java new file mode 100644 index 00000000000..6ebf783604a --- /dev/null +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalogFactory.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kudu.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig; + +import com.google.auto.service.AutoService; + +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ADMIN_OPERATION_TIMEOUT; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ENABLE_KERBEROS; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.KERBEROS_KEYTAB; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.KERBEROS_KRB5_CONF; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.KERBEROS_PRINCIPAL; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.MASTER; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.OPERATION_TIMEOUT; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.WORKER_COUNT; + +@AutoService(Factory.class) +public class KuduCatalogFactory implements CatalogFactory { + + public static final String IDENTIFIER = "Kudu"; + + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + CommonConfig config = new CommonConfig(options); + KuduCatalog kuduCatalog = new KuduCatalog(catalogName, config); + return kuduCatalog; + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(MASTER) + .optional(WORKER_COUNT) + .optional(OPERATION_TIMEOUT) + .optional(ADMIN_OPERATION_TIMEOUT) + .optional(KERBEROS_KRB5_CONF) + .conditional(ENABLE_KERBEROS, true, KERBEROS_PRINCIPAL, KERBEROS_KEYTAB) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/CommonConfig.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/CommonConfig.java new file mode 100644 index 00000000000..b01cfbdc2bc --- /dev/null +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/CommonConfig.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kudu.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import org.apache.kudu.client.AsyncKuduClient; + +import lombok.Getter; +import lombok.ToString; + +import java.io.Serializable; + +@Getter +@ToString +public class CommonConfig implements Serializable { + + public static final Option MASTER = + Options.key("kudu_masters") + .stringType() + .noDefaultValue() + .withDescription("Kudu master address. Separated by ','"); + + public static final Option TABLE_NAME = + Options.key("table_name") + .stringType() + .noDefaultValue() + .withDescription("Kudu table name"); + + public static final Option WORKER_COUNT = + Options.key("client_worker_count") + .intType() + .defaultValue(2 * Runtime.getRuntime().availableProcessors()) + .withDescription( + "Kudu worker count. Default value is twice the current number of cpu cores"); + + public static final Option OPERATION_TIMEOUT = + Options.key("client_default_operation_timeout_ms") + .longType() + .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS) + .withDescription("Kudu normal operation time out"); + + public static final Option ADMIN_OPERATION_TIMEOUT = + Options.key("client_default_admin_operation_timeout_ms") + .longType() + .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS) + .withDescription("Kudu admin operation time out"); + + public static final Option ENABLE_KERBEROS = + Options.key("enable_kerberos") + .booleanType() + .defaultValue(false) + .withDescription("Kerberos principal enable."); + public static final Option KERBEROS_PRINCIPAL = + Options.key("kerberos_principal") + .stringType() + .noDefaultValue() + .withDescription( + "Kerberos principal. Note that all zeta nodes require have this file."); + + public static final Option KERBEROS_KEYTAB = + Options.key("kerberos_keytab") + .stringType() + .noDefaultValue() + .withDescription( + "Kerberos keytab. Note that all zeta nodes require have this file."); + + public static final Option KERBEROS_KRB5_CONF = + Options.key("kerberos_krb5conf") + .stringType() + .noDefaultValue() + .withDescription( + "Kerberos krb5 conf. Note that all zeta nodes require have this file."); + + protected String masters; + + protected String table; + + protected Integer workerCount; + + protected Long operationTimeout; + + protected Long adminOperationTimeout; + + protected Boolean enableKerberos; + protected String principal; + protected String keytab; + protected String krb5conf; + + public CommonConfig(ReadonlyConfig config) { + this.masters = config.get(MASTER); + this.table = config.get(TABLE_NAME); + this.workerCount = config.get(WORKER_COUNT); + this.operationTimeout = config.get(OPERATION_TIMEOUT); + this.adminOperationTimeout = config.get(ADMIN_OPERATION_TIMEOUT); + this.enableKerberos = config.get(ENABLE_KERBEROS); + this.principal = config.get(KERBEROS_PRINCIPAL); + this.keytab = config.get(KERBEROS_KEYTAB); + this.krb5conf = config.get(KERBEROS_KEYTAB); + } +} diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java index fbeb2885f37..8db9e3896ed 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java @@ -17,46 +17,72 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.config; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; -import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.commons.lang3.StringUtils; +import org.apache.kudu.client.SessionConfiguration; -import lombok.Data; -import lombok.NonNull; +import lombok.Getter; +import lombok.ToString; -@Data -public class KuduSinkConfig { +import java.util.Locale; - public static final Option KUDU_MASTER = - Options.key("kudu_master") - .stringType() - .noDefaultValue() - .withDescription("kudu master address"); +@Getter +@ToString +public class KuduSinkConfig extends CommonConfig { - public static final Option KUDU_SAVE_MODE = + public static final Option SAVE_MODE = Options.key("save_mode") .enumType(SaveMode.class) - .noDefaultValue() + .defaultValue(SaveMode.APPEND) .withDescription("Storage mode,append is now supported"); - public static final Option KUDU_TABLE_NAME = - Options.key("kudu_table") + public static final Option FLUSH_MODE = + Options.key("session_flush_mode") .stringType() - .noDefaultValue() - .withDescription("kudu table name"); + .defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC.name()) + .withDescription("Kudu flush mode. Default AUTO_FLUSH_SYNC"); + + public static final Option BATCH_SIZE = + Options.key("batch_size") + .intType() + .defaultValue(1024) + .withDescription( + "the flush max size (includes all append, upsert and delete records), over this number" + + " of records, will flush data. The default value is 100."); + + public static final Option BUFFER_FLUSH_INTERVAL = + Options.key("buffer_flush_interval") + .intType() + .defaultValue(10000) + .withDescription( + "the flush interval mills, over this time, asynchronous threads will flush data. The " + + "default value is 1s."); + + public static final Option IGNORE_NOT_FOUND = + Options.key("ignore_not_found") + .booleanType() + .defaultValue(false) + .withDescription("if true, ignore all not found rows"); + + public static final Option IGNORE_DUPLICATE = + Options.key("ignore_not_duplicate") + .booleanType() + .defaultValue(false) + .withDescription("if true, ignore all dulicate rows"); private SaveMode saveMode; - private String kuduMaster; + private SessionConfiguration.FlushMode flushMode; - /** Specifies the name of the table */ - private String kuduTableName; + private int maxBufferSize; + + private int flushInterval; + + private boolean ignoreNotFound; + + private boolean ignoreDuplicate; public enum SaveMode { APPEND(), @@ -71,22 +97,25 @@ public static SaveMode fromStr(String str) { } } - public KuduSinkConfig(@NonNull Config pluginConfig) { - if (pluginConfig.hasPath(KUDU_SAVE_MODE.key()) - && pluginConfig.hasPath(KUDU_MASTER.key()) - && pluginConfig.hasPath(KUDU_TABLE_NAME.key())) { - this.saveMode = - StringUtils.isBlank(pluginConfig.getString(KUDU_SAVE_MODE.key())) - ? SaveMode.APPEND - : SaveMode.fromStr(pluginConfig.getString(KUDU_SAVE_MODE.key())); - this.kuduMaster = pluginConfig.getString(KUDU_MASTER.key()); - this.kuduTableName = pluginConfig.getString(KUDU_TABLE_NAME.key()); - } else { - throw new KuduConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - "Kudu", PluginType.SINK, "Missing Sink configuration parameters")); + public KuduSinkConfig(ReadonlyConfig config) { + super(config); + this.saveMode = config.get(SAVE_MODE); + this.flushMode = fromStrFlushMode(config.get(FLUSH_MODE)); + this.maxBufferSize = config.get(BATCH_SIZE); + this.flushInterval = config.get(BUFFER_FLUSH_INTERVAL); + this.ignoreNotFound = config.get(IGNORE_NOT_FOUND); + this.ignoreDuplicate = config.get(IGNORE_DUPLICATE); + } + + private SessionConfiguration.FlushMode fromStrFlushMode(String flushMode) { + switch (flushMode.toUpperCase(Locale.ENGLISH)) { + case "MANUAL_FLUSH": + return SessionConfiguration.FlushMode.MANUAL_FLUSH; + case "AUTO_FLUSH_BACKGROUND": + return SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND; + case "AUTO_FLUSH_SYNC": + default: + return SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC; } } } diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java index 5a2c1d20532..bb73b43e88f 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java @@ -19,26 +19,47 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import java.io.Serializable; +import org.apache.kudu.client.AsyncKuduClient; -public class KuduSourceConfig implements Serializable { +import lombok.Getter; +import lombok.ToString; - public static final Option KUDU_MASTER = - Options.key("kudu_master") - .stringType() - .noDefaultValue() - .withDescription("Kudu master address"); +@Getter +@ToString +public class KuduSourceConfig extends CommonConfig { - public static final Option TABLE_NAME = - Options.key("kudu_table") - .stringType() - .noDefaultValue() - .withDescription("Kudu table name"); + public static final Option QUERY_TIMEOUT = + Options.key("scan_token_query_timeout") + .longType() + .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS) + .withDescription( + "The timeout for connecting scan token. If not set, it will be the same as operationTimeout"); + + public static final Option SCAN_BATCH_SIZE_BYTES = + Options.key("scan_token_batch_size_bytes") + .intType() + .defaultValue(1024 * 1024) + .withDescription( + "Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB"); - public static final Option COLUMNS_LIST = - Options.key("columnsList") + public static final Option FILTER = + Options.key("filter") .stringType() .noDefaultValue() - .withDescription("Specifies the column names of the table"); + .withDescription("Kudu scan filter expressions"); + + private int batchSizeBytes; + + protected Long queryTimeout; + + private String filter; + + public KuduSourceConfig(ReadonlyConfig config) { + super(config); + this.batchSizeBytes = config.get(SCAN_BATCH_SIZE_BYTES); + this.queryTimeout = config.get(QUERY_TIMEOUT); + this.filter = config.get(FILTER); + } } diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorErrorCode.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorErrorCode.java index 2457560a0f0..04aaa039954 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorErrorCode.java @@ -23,11 +23,10 @@ public enum KuduConnectorErrorCode implements SeaTunnelErrorCode { GET_KUDUSCAN_OBJECT_FAILED("KUDU-01", "Get the Kuduscan object for each splice failed"), CLOSE_KUDU_CLIENT_FAILED("KUDU-02", "Close Kudu client failed"), DATA_TYPE_CAST_FILED("KUDU-03", "Value type does not match column type"), - KUDU_UPSERT_FAILED("KUDU-04", "Upsert data to Kudu failed"), - KUDU_INSERT_FAILED("KUDU-05", "Insert data to Kudu failed"), - INIT_KUDU_CLIENT_FAILED("KUDU-06", "Initialize the Kudu client failed"), + WRITE_DATA_FAILED("KUDU-04", "while sending value to Kudu failed"), + INIT_KUDU_CLIENT_FAILED("KUDU-05", "Initialize the Kudu client failed"), GENERATE_KUDU_PARAMETERS_FAILED( - "KUDU-07", "Generate Kudu Parameters in the preparation phase failed"); + "KUDU-06", "Generate Kudu Parameters in the preparation phase failed"); private final String code; diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java index 6dcf33eca13..006a8adb3c9 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java @@ -17,177 +17,72 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.common.exception.CommonErrorCode; -import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig; import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException; +import org.apache.seatunnel.connectors.seatunnel.kudu.source.KuduSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil; -import org.apache.kudu.ColumnSchema; -import org.apache.kudu.Schema; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduException; -import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.KuduScanToken; import org.apache.kudu.client.KuduScanner; import org.apache.kudu.client.RowResult; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import java.io.IOException; import java.io.Serializable; -import java.math.BigDecimal; -import java.math.BigInteger; import java.sql.SQLException; +import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.seatunnel.api.table.type.SqlType.TIMESTAMP; @Slf4j public class KuduInputFormat implements Serializable { - public KuduInputFormat(String kuduMaster, String tableName, String columnsList) { - this.kuduMaster = kuduMaster; - this.columnsList = Arrays.asList(columnsList.split(",")); - this.tableName = tableName; - } + private final KuduSourceConfig kuduSourceConfig; + private final SeaTunnelRowType rowTypeInfo; /** Declare the global variable KuduClient and use it to manipulate the Kudu table */ public KuduClient kuduClient; - /** Specify kuduMaster address */ - public String kuduMaster; - - public List columnsList; - public Schema schema; - public String keyColumn; - public static final int TIMEOUTMS = 18000; - - /** Specifies the name of the table */ - public String tableName; + public KuduInputFormat( + @NonNull KuduSourceConfig kuduSourceConfig, SeaTunnelRowType rowTypeInfo) { + this.kuduSourceConfig = kuduSourceConfig; + this.rowTypeInfo = rowTypeInfo; + } - public List getColumnsSchemas() { - List columns = null; - try { - schema = kuduClient.openTable(tableName).getSchema(); - keyColumn = schema.getPrimaryKeyColumns().get(0).getName(); - columns = schema.getColumns(); - } catch (KuduException e) { - throw new KuduConnectorException( - CommonErrorCode.TABLE_SCHEMA_GET_FAILED, "get table Columns Schemas Failed"); + public void openInputFormat() { + if (kuduClient == null) { + kuduClient = KuduUtil.getKuduClient(kuduSourceConfig); } - return columns; } - public static SeaTunnelRow getSeaTunnelRowData(RowResult rs, SeaTunnelRowType typeInfo) - throws SQLException { - + public SeaTunnelRow toInternal(RowResult rs) throws SQLException { List fields = new ArrayList<>(); - SeaTunnelDataType[] seaTunnelDataTypes = typeInfo.getFieldTypes(); + SeaTunnelDataType[] seaTunnelDataTypes = rowTypeInfo.getFieldTypes(); for (int i = 0; i < seaTunnelDataTypes.length; i++) { - Object seatunnelField; - SeaTunnelDataType seaTunnelDataType = seaTunnelDataTypes[i]; - if (null == rs.getObject(i)) { - seatunnelField = null; - } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getBoolean(i); - } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getByte(i); - } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getShort(i); - } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getInt(i); - } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getLong(i); - } else if (seaTunnelDataType instanceof DecimalType) { - Object value = rs.getObject(i); - seatunnelField = - value instanceof BigInteger ? new BigDecimal((BigInteger) value, 0) : value; - } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getFloat(i); - } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getDouble(i); - } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getString(i); - } else { - throw new KuduConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - "Unsupported data type: " + seaTunnelDataType); + if (seaTunnelDataTypes[i].getSqlType() == TIMESTAMP) { + Timestamp timestamp = rs.getTimestamp(i); + fields.add( + Optional.ofNullable(timestamp).map(e -> e.toLocalDateTime()).orElse(null)); + continue; } - fields.add(seatunnelField); + fields.add(rs.getObject(i)); } - return new SeaTunnelRow(fields.toArray()); } - public SeaTunnelRowType getSeaTunnelRowType(List columnSchemaList) { - - ArrayList> seaTunnelDataTypes = new ArrayList<>(); - ArrayList fieldNames = new ArrayList<>(); - try { - - for (int i = 0; i < columnSchemaList.size(); i++) { - fieldNames.add(columnSchemaList.get(i).getName()); - seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i)); - } - } catch (Exception e) { - throw new KuduConnectorException( - CommonErrorCode.TABLE_SCHEMA_GET_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - "Kudu", PluginType.SOURCE, ExceptionUtils.getMessage(e))); - } - return new SeaTunnelRowType( - fieldNames.toArray(new String[fieldNames.size()]), - seaTunnelDataTypes.toArray(new SeaTunnelDataType[seaTunnelDataTypes.size()])); - } - - public void openInputFormat() { - KuduClient.KuduClientBuilder kuduClientBuilder = - new KuduClient.KuduClientBuilder(kuduMaster); - kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS); - - kuduClient = kuduClientBuilder.build(); - - log.info("The Kudu client is successfully initialized", kuduMaster, kuduClient); - } - - /** - * @param lowerBound The beginning of each slice - * @param upperBound End of each slice - * @return Get the kuduScanner object for each slice - */ - public KuduScanner getKuduBuildSplit(int lowerBound, int upperBound) { - KuduScanner kuduScanner = null; - try { - KuduScanner.KuduScannerBuilder kuduScannerBuilder = - kuduClient.newScannerBuilder(kuduClient.openTable(tableName)); - - kuduScannerBuilder.setProjectedColumnNames(columnsList); - - KuduPredicate lowerPred = - KuduPredicate.newComparisonPredicate( - schema.getColumn("" + keyColumn), - KuduPredicate.ComparisonOp.GREATER_EQUAL, - lowerBound); - - KuduPredicate upperPred = - KuduPredicate.newComparisonPredicate( - schema.getColumn("" + keyColumn), - KuduPredicate.ComparisonOp.LESS, - upperBound); - - kuduScanner = - kuduScannerBuilder.addPredicate(lowerPred).addPredicate(upperPred).build(); - } catch (KuduException e) { - throw new KuduConnectorException(KuduConnectorErrorCode.GET_KUDUSCAN_OBJECT_FAILED, e); - } - return kuduScanner; - } - public void closeInputFormat() { if (kuduClient != null) { try { @@ -200,4 +95,18 @@ public void closeInputFormat() { } } } + + public Set createInputSplits() throws IOException { + List scanTokens = + KuduUtil.getKuduScanToken(kuduSourceConfig, rowTypeInfo.getFieldNames()); + Set allSplit = new HashSet<>(scanTokens.size()); + for (int i = 0; i < scanTokens.size(); i++) { + allSplit.add(new KuduSourceSplit(i, scanTokens.get(i).serialize())); + } + return allSplit; + } + + public KuduScanner scanner(byte[] token) throws IOException { + return KuduScanToken.deserializeIntoScanner(token, kuduClient); + } } diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java index dfdb10af6db..01667c6e7db 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java @@ -17,181 +17,137 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient; +import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig; import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException; +import org.apache.seatunnel.connectors.seatunnel.kudu.serialize.KuduRowSerializer; +import org.apache.seatunnel.connectors.seatunnel.kudu.serialize.SeaTunnelRowSerializer; +import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil; -import org.apache.kudu.ColumnSchema; -import org.apache.kudu.Schema; -import org.apache.kudu.client.Insert; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduException; import org.apache.kudu.client.KuduSession; import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.PartialRow; -import org.apache.kudu.client.SessionConfiguration; -import org.apache.kudu.client.Upsert; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.OperationResponse; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import java.io.IOException; import java.io.Serializable; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.sql.Timestamp; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** A Kudu outputFormat */ @Slf4j public class KuduOutputFormat implements Serializable { - public static final long TIMEOUTMS = 18000; - public static final long SESSIONTIMEOUTMS = 100000; - - private final String kuduMaster; private final String kuduTableName; private final KuduSinkConfig.SaveMode saveMode; + private final KuduSinkConfig kuduSinkConfig; private KuduClient kuduClient; private KuduSession kuduSession; private KuduTable kuduTable; - public KuduOutputFormat(KuduSinkConfig kuduSinkConfig) { - this.kuduMaster = kuduSinkConfig.getKuduMaster(); - this.kuduTableName = kuduSinkConfig.getKuduTableName(); - this.saveMode = kuduSinkConfig.getSaveMode(); - init(); - } + private SeaTunnelRowSerializer seaTunnelRowSerializer; - private void transform(PartialRow row, SeaTunnelRow element, Schema schema) { - int columnCount = schema.getColumnCount(); - for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) { - ColumnSchema col = schema.getColumnByIndex(columnIndex); - try { - switch (col.getType()) { - case BOOL: - row.addBoolean(columnIndex, (Boolean) element.getField(columnIndex)); - break; - case INT8: - row.addByte(columnIndex, (Byte) element.getField(columnIndex)); - break; - case INT16: - row.addShort(columnIndex, (Short) element.getField(columnIndex)); - break; - case INT32: - row.addInt(columnIndex, (Integer) element.getField(columnIndex)); - break; - case INT64: - row.addLong(columnIndex, (Long) element.getField(columnIndex)); - break; - case UNIXTIME_MICROS: - if (element.getField(columnIndex) instanceof Timestamp) { - row.addTimestamp( - columnIndex, (Timestamp) element.getField(columnIndex)); - } else { - row.addLong(columnIndex, (Long) element.getField(columnIndex)); - } - break; - case FLOAT: - row.addFloat(columnIndex, (Float) element.getField(columnIndex)); - break; - case DOUBLE: - row.addDouble(columnIndex, (Double) element.getField(columnIndex)); - break; - case STRING: - row.addString(columnIndex, element.getField(columnIndex).toString()); - break; - case BINARY: - if (element.getField(columnIndex) instanceof byte[]) { - row.addBinary(columnIndex, (byte[]) element.getField(columnIndex)); - } else { - row.addBinary(columnIndex, (ByteBuffer) element.getField(columnIndex)); - } - break; - case DECIMAL: - row.addDecimal(columnIndex, (BigDecimal) element.getField(columnIndex)); - break; - default: - throw new KuduConnectorException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - "Unsupported column type: " + col.getType()); - } - } catch (ClassCastException e) { - throw new KuduConnectorException( - KuduConnectorErrorCode.DATA_TYPE_CAST_FILED, - "Value type does not match column type " - + col.getType() - + " for column " - + col.getName()); - } - } + private SeaTunnelRowType seaTunnelRowType; + + private transient AtomicInteger numPendingRequests; + + public KuduOutputFormat( + @NonNull KuduSinkConfig kuduSinkConfig, SeaTunnelRowType seaTunnelRowType) { + this.kuduTableName = kuduSinkConfig.getTable(); + this.saveMode = kuduSinkConfig.getSaveMode(); + this.kuduSinkConfig = kuduSinkConfig; + this.seaTunnelRowType = seaTunnelRowType; + this.numPendingRequests = new AtomicInteger(0); + openOutputFormat(); } - private void upsert(SeaTunnelRow element) { - Upsert upsert = kuduTable.newUpsert(); - Schema schema = kuduTable.getSchema(); - PartialRow row = upsert.getRow(); - transform(row, element, schema); + private void openOutputFormat() { + this.kuduClient = KuduUtil.getKuduClient(kuduSinkConfig); + this.kuduSession = getSession(); try { - kuduSession.apply(upsert); + kuduTable = kuduClient.openTable(kuduTableName); } catch (KuduException e) { - throw new KuduConnectorException(KuduConnectorErrorCode.KUDU_UPSERT_FAILED, e); + throw new KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e); } + log.info( + "The Kudu client for Master: {} is initialized successfully.", + kuduSinkConfig.getMasters()); + + seaTunnelRowSerializer = new KuduRowSerializer(kuduTable, saveMode, seaTunnelRowType); + } + + private KuduSession getSession() { + KuduSession session = kuduClient.newSession(); + session.setTimeoutMillis(kuduSinkConfig.getOperationTimeout()); + session.setFlushMode(kuduSinkConfig.getFlushMode()); + session.setFlushInterval(kuduSinkConfig.getFlushInterval()); + session.setMutationBufferSpace(kuduSinkConfig.getMaxBufferSize()); + session.setIgnoreAllNotFoundRows(kuduSinkConfig.isIgnoreNotFound()); + session.setIgnoreAllDuplicateRows(kuduSinkConfig.isIgnoreDuplicate()); + return session; } - private void insert(SeaTunnelRow element) { - Insert insert = kuduTable.newInsert(); - Schema schema = kuduTable.getSchema(); - PartialRow row = insert.getRow(); - transform(row, element, schema); + public void closeOutputFormat() throws IOException { try { - kuduSession.apply(insert); - } catch (KuduException e) { - throw new KuduConnectorException(KuduConnectorErrorCode.KUDU_INSERT_FAILED, e); + flush(); + } finally { + try { + if (kuduSession != null) { + kuduSession.close(); + } + } catch (Exception e) { + log.error("Error while closing session.", e); + } + try { + if (kuduClient != null) { + kuduClient.close(); + } + } catch (Exception e) { + log.error("Error while closing client.", e); + } } } - public void write(SeaTunnelRow element) { - switch (saveMode) { - case APPEND: - insert(element); - break; - case OVERWRITE: - upsert(element); - break; - default: - throw new KuduConnectorException( - CommonErrorCode.FLUSH_DATA_FAILED, - String.format("Unsupported saveMode: %s.", saveMode.name())); + public void flush() throws KuduException { + kuduSession.flush(); + checkAsyncErrors(); + } + + private void checkAsyncErrors() { + if (kuduSession.countPendingErrors() == 0) { + return; } + String errorMessage = + Arrays.stream(kuduSession.getPendingErrors().getRowErrors()) + .map(error -> error.toString() + System.lineSeparator()) + .collect(Collectors.joining()); + throw new KuduConnectorException(KuduConnectorErrorCode.WRITE_DATA_FAILED, errorMessage); } - private void init() { - KuduClient.KuduClientBuilder kuduClientBuilder = - new KuduClient.KuduClientBuilder(kuduMaster); - kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS); - this.kuduClient = kuduClientBuilder.build(); - this.kuduSession = kuduClient.newSession(); - this.kuduSession.setTimeoutMillis(SESSIONTIMEOUTMS); - this.kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC); - try { - kuduTable = kuduClient.openTable(kuduTableName); - } catch (KuduException e) { - throw new KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e); + private void checkErrors(OperationResponse response) throws IOException { + if (response != null && response.hasRowError()) { + throw new KuduConnectorException( + KuduConnectorErrorCode.WRITE_DATA_FAILED, response.getRowError().toString()); } - log.info("The Kudu client for Master: {} is initialized successfully.", kuduMaster); } - public void closeOutputFormat() { - if (kuduClient != null) { - try { - kuduClient.close(); - kuduSession.close(); - } catch (KuduException ignored) { - log.warn("Failed to close Kudu Client.", ignored); - } finally { - kuduClient = null; - kuduSession = null; - } + public void write(SeaTunnelRow row) throws IOException { + checkAsyncErrors(); + if (row.getRowKind() == RowKind.UPDATE_BEFORE) return; + Operation operation = seaTunnelRowSerializer.serializeRow(row); + checkErrors(kuduSession.apply(operation)); + if (kuduSinkConfig.getMaxBufferSize() > 0 + && numPendingRequests.incrementAndGet() >= kuduSinkConfig.getMaxBufferSize()) { + flush(); } } } diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java index 7a1d0def67f..198e133f2c1 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java @@ -26,6 +26,8 @@ import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException; import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnTypeAttributes; +import org.apache.kudu.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,66 +37,37 @@ public class KuduTypeMapper { - private static final Logger LOG = LoggerFactory.getLogger(KuduTypeMapper.class); - - // ============================data types===================== - - private static final String KUDU_UNKNOWN = "UNKNOWN"; - private static final String KUDU_BIT = "BOOL"; - - // -------------------------number---------------------------- - private static final String KUDU_TINYINT = "INT8"; - private static final String KUDU_MEDIUMINT = "INT32"; - private static final String KUDU_INT = "INT16"; - private static final String KUDU_BIGINT = "INT64"; - - private static final String KUDU_FLOAT = "FLOAT"; - - private static final String KUDU_DOUBLE = "DOUBLE"; - private static final String KUDU_DECIMAL = "DECIMAL32"; - - // -------------------------string---------------------------- - - private static final String KUDU_VARCHAR = "STRING"; - - // ------------------------------time------------------------- - - private static final String KUDU_UNIXTIME_MICROS = "UNIXTIME_MICROS"; - - // ------------------------------blob------------------------- - - private static final String KUDU_BINARY = "BINARY"; - private static final int PRECISION = 20; + private static final Logger log = LoggerFactory.getLogger(KuduTypeMapper.class); public static SeaTunnelDataType mapping(List columnSchemaList, int colIndex) throws SQLException { - String kuduType = columnSchemaList.get(colIndex).getType().getName().toUpperCase(); + Type kuduType = columnSchemaList.get(colIndex).getType(); switch (kuduType) { - case KUDU_BIT: + case BOOL: return BasicType.BOOLEAN_TYPE; - case KUDU_TINYINT: - case KUDU_MEDIUMINT: - case KUDU_INT: + case INT8: + return BasicType.BYTE_TYPE; + case INT16: + return BasicType.SHORT_TYPE; + case INT32: return BasicType.INT_TYPE; - case KUDU_BIGINT: + case INT64: return BasicType.LONG_TYPE; - case KUDU_DECIMAL: - return new DecimalType(PRECISION, 0); - case KUDU_FLOAT: + case DECIMAL: + ColumnTypeAttributes typeAttributes = + columnSchemaList.get(colIndex).getTypeAttributes(); + return new DecimalType(typeAttributes.getPrecision(), typeAttributes.getScale()); + case FLOAT: return BasicType.FLOAT_TYPE; - case KUDU_DOUBLE: + case DOUBLE: return BasicType.DOUBLE_TYPE; - case KUDU_VARCHAR: + case STRING: return BasicType.STRING_TYPE; - case KUDU_UNIXTIME_MICROS: + case UNIXTIME_MICROS: return LocalTimeType.LOCAL_DATE_TIME_TYPE; - case KUDU_BINARY: + case BINARY: return PrimitiveByteArrayType.INSTANCE; - - // Doesn't support yet - - case KUDU_UNKNOWN: default: throw new KuduConnectorException( CommonErrorCode.UNSUPPORTED_DATA_TYPE, diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/serialize/KuduRowSerializer.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/serialize/KuduRowSerializer.java new file mode 100644 index 00000000000..d468c580b09 --- /dev/null +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/serialize/KuduRowSerializer.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kudu.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException; + +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.PartialRow; + +import java.time.LocalDateTime; + +public class KuduRowSerializer implements SeaTunnelRowSerializer { + + private KuduTable kuduTable; + private KuduSinkConfig.SaveMode saveMode; + + private SeaTunnelRowType seaTunnelRowType; + + public KuduRowSerializer( + KuduTable kuduTable, + KuduSinkConfig.SaveMode saveMode, + SeaTunnelRowType seaTunnelRowType) { + this.kuduTable = kuduTable; + this.saveMode = saveMode; + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public Operation serializeRow(SeaTunnelRow row) { + Operation operation; + switch (row.getRowKind()) { + case INSERT: + if (saveMode == KuduSinkConfig.SaveMode.OVERWRITE) { + operation = kuduTable.newUpsert(); + break; + } + operation = kuduTable.newInsert(); + break; + case UPDATE_AFTER: + operation = kuduTable.newUpsert(); + break; + case DELETE: + operation = kuduTable.newDelete(); + break; + default: + throw new KuduConnectorException( + CommonErrorCode.UNSUPPORTED_OPERATION, + "Unsupported write row kind: " + row.getRowKind()); + } + transform(operation, row); + return operation; + } + + private void transform(Operation operation, SeaTunnelRow element) { + PartialRow row = operation.getRow(); + for (int columnIndex = 0; columnIndex < seaTunnelRowType.getTotalFields(); columnIndex++) { + SeaTunnelDataType type = seaTunnelRowType.getFieldType(columnIndex); + try { + switch (type.getSqlType()) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + case FLOAT: + case DOUBLE: + case STRING: + case DECIMAL: + case BYTES: + row.addObject( + seaTunnelRowType.getFieldName(columnIndex), + element.getField(columnIndex)); + break; + case TIMESTAMP: + Object fieldValue = element.getField(columnIndex); + if (fieldValue == null) { + row.addObject(seaTunnelRowType.getFieldName(columnIndex), null); + } else { + LocalDateTime localDateTime = (LocalDateTime) fieldValue; + row.addObject( + seaTunnelRowType.getFieldName(columnIndex), + java.sql.Timestamp.valueOf(localDateTime)); + } + break; + default: + throw new KuduConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unsupported column type: " + type.getSqlType()); + } + } catch (ClassCastException e) { + throw new KuduConnectorException( + KuduConnectorErrorCode.DATA_TYPE_CAST_FILED, + "Value type does not match column type " + + type.getSqlType() + + " for column " + + seaTunnelRowType.getFieldName(columnIndex)); + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/serialize/SeaTunnelRowSerializer.java new file mode 100644 index 00000000000..cbb7e92ec21 --- /dev/null +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/serialize/SeaTunnelRowSerializer.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kudu.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.apache.kudu.client.Operation; + +public interface SeaTunnelRowSerializer { + + Operation serializeRow(SeaTunnelRow row); +} diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java index e6ec92b9302..5ea299ac716 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java @@ -20,13 +20,22 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.ConfigValidator; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException; +import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState; import com.google.auto.service.AutoService; @@ -37,9 +46,11 @@ * {@link AbstractSimpleSink}. */ @AutoService(SeaTunnelSink.class) -public class KuduSink extends AbstractSimpleSink { +public class KuduSink + implements SeaTunnelSink< + SeaTunnelRow, KuduSinkState, KuduCommitInfo, KuduAggregatedCommitInfo> { - private Config config; + private KuduSinkConfig kuduSinkConfig; private SeaTunnelRowType seaTunnelRowType; @Override @@ -59,12 +70,22 @@ public SeaTunnelDataType getConsumedType() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { - this.config = pluginConfig; + ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig); + ConfigValidator.of(config).validate(new KuduSinkFactory().optionRule()); + try { + kuduSinkConfig = new KuduSinkConfig(config); + } catch (Exception e) { + throw new KuduConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SINK, ExceptionUtils.getMessage(e))); + } } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) - throws IOException { - return new KuduSinkWriter(seaTunnelRowType, config); + public SinkWriter createWriter( + SinkWriter.Context context) throws IOException { + return new KuduSinkWriter(seaTunnelRowType, kuduSinkConfig); } } diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java index 4a66e5a8cc2..c78e7859a7b 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java @@ -24,6 +24,11 @@ import com.google.auto.service.AutoService; +import java.util.Arrays; + +import static org.apache.kudu.client.SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND; +import static org.apache.kudu.client.SessionConfiguration.FlushMode.MANUAL_FLUSH; + @AutoService(Factory.class) public class KuduSinkFactory implements TableSinkFactory { @Override @@ -34,10 +39,29 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required( - KuduSinkConfig.KUDU_MASTER, - KuduSinkConfig.KUDU_SAVE_MODE, - KuduSinkConfig.KUDU_TABLE_NAME) + .required(KuduSinkConfig.MASTER, KuduSinkConfig.TABLE_NAME) + .optional(KuduSinkConfig.WORKER_COUNT) + .optional(KuduSinkConfig.OPERATION_TIMEOUT) + .optional(KuduSinkConfig.ADMIN_OPERATION_TIMEOUT) + .optional(KuduSinkConfig.SAVE_MODE) + .optional(KuduSinkConfig.FLUSH_MODE) + .optional(KuduSinkConfig.IGNORE_NOT_FOUND) + .optional(KuduSinkConfig.IGNORE_DUPLICATE) + .optional(KuduSinkConfig.ENABLE_KERBEROS) + .optional(KuduSinkConfig.KERBEROS_KRB5_CONF) + .conditional( + KuduSinkConfig.FLUSH_MODE, + Arrays.asList(AUTO_FLUSH_BACKGROUND.name(), MANUAL_FLUSH.name()), + KuduSinkConfig.BATCH_SIZE) + .conditional( + KuduSinkConfig.FLUSH_MODE, + AUTO_FLUSH_BACKGROUND.name(), + KuduSinkConfig.BUFFER_FLUSH_INTERVAL) + .conditional( + KuduSinkConfig.ENABLE_KERBEROS, + true, + KuduSinkConfig.KERBEROS_PRINCIPAL, + KuduSinkConfig.KERBEROS_KEYTAB) .build(); } } diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java index 33f2487a796..acf5b4ca2e3 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java @@ -17,34 +17,30 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - +import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig; import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduOutputFormat; +import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSinkState; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.util.Optional; @Slf4j -public class KuduSinkWriter extends AbstractSinkWriter { +public class KuduSinkWriter implements SinkWriter { private SeaTunnelRowType seaTunnelRowType; - private Config pluginConfig; private KuduOutputFormat fileWriter; - private KuduSinkConfig kuduSinkConfig; public KuduSinkWriter( - @NonNull SeaTunnelRowType seaTunnelRowType, @NonNull Config pluginConfig) { + @NonNull SeaTunnelRowType seaTunnelRowType, @NonNull KuduSinkConfig kuduSinkConfig) { this.seaTunnelRowType = seaTunnelRowType; - this.pluginConfig = pluginConfig; - - kuduSinkConfig = new KuduSinkConfig(this.pluginConfig); - fileWriter = new KuduOutputFormat(kuduSinkConfig); + fileWriter = new KuduOutputFormat(kuduSinkConfig, seaTunnelRowType); } @Override @@ -52,6 +48,15 @@ public void write(SeaTunnelRow element) throws IOException { fileWriter.write(element); } + @Override + public Optional prepareCommit() throws IOException { + fileWriter.flush(); + return Optional.empty(); + } + + @Override + public void abortPrepare() {} + @Override public void close() throws IOException { fileWriter.closeOutputFormat(); diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java index 391bf44535b..3748de2522d 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java @@ -20,18 +20,18 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.serialization.DefaultSerializer; -import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.ConfigValidator; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.source.SupportParallelism; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.common.utils.ExceptionUtils; @@ -41,13 +41,10 @@ import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat; import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper; import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState; +import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil; import org.apache.kudu.ColumnSchema; import org.apache.kudu.client.KuduClient; -import org.apache.kudu.client.KuduException; -import org.apache.kudu.client.KuduScanner; -import org.apache.kudu.client.RowResult; -import org.apache.kudu.client.RowResultIterator; import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; @@ -60,10 +57,10 @@ public class KuduSource implements SeaTunnelSource, SupportParallelism { + private SeaTunnelRowType rowTypeInfo; private KuduInputFormat kuduInputFormat; - private PartitionParameter partitionParameter; - public static final int TIMEOUTMS = 18000; + private KuduSourceConfig kuduSourceConfig; @Override public Boundedness getBoundedness() { @@ -81,28 +78,18 @@ public SourceReader createReader( return new KuduSourceReader(kuduInputFormat, readerContext); } - @Override - public Serializer getSplitSerializer() { - return SeaTunnelSource.super.getSplitSerializer(); - } - @Override public SourceSplitEnumerator createEnumerator( SourceSplitEnumerator.Context enumeratorContext) { - return new KuduSourceSplitEnumerator(enumeratorContext, partitionParameter); + return new KuduSourceSplitEnumerator(enumeratorContext, kuduSourceConfig, kuduInputFormat); } @Override public SourceSplitEnumerator restoreEnumerator( SourceSplitEnumerator.Context enumeratorContext, KuduSourceState checkpointState) { - // todo: - return new KuduSourceSplitEnumerator(enumeratorContext, partitionParameter); - } - - @Override - public Serializer getEnumeratorStateSerializer() { - return new DefaultSerializer<>(); + return new KuduSourceSplitEnumerator( + enumeratorContext, kuduSourceConfig, kuduInputFormat, checkpointState); } @Override @@ -111,89 +98,34 @@ public String getPluginName() { } @Override - public void prepare(Config config) { - String kudumaster = ""; - String tableName = ""; - String columnslist = ""; - CheckResult checkResult = - CheckConfigUtil.checkAllExists( - config, - KuduSourceConfig.KUDU_MASTER.key(), - KuduSourceConfig.TABLE_NAME.key(), - KuduSourceConfig.COLUMNS_LIST.key()); - if (checkResult.isSuccess()) { - kudumaster = config.getString(KuduSourceConfig.KUDU_MASTER.key()); - tableName = config.getString(KuduSourceConfig.TABLE_NAME.key()); - columnslist = config.getString(KuduSourceConfig.COLUMNS_LIST.key()); - kuduInputFormat = new KuduInputFormat(kudumaster, tableName, columnslist); - } else { + public void prepare(Config pluginConfig) { + ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig); + ConfigValidator.of(config).validate(new KuduSourceFactory().optionRule()); + try { + kuduSourceConfig = new KuduSourceConfig(config); + } catch (Exception e) { throw new KuduConnectorException( SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format( "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, checkResult.getMsg())); - } - try { - KuduClient.KuduClientBuilder kuduClientBuilder = - new KuduClient.KuduClientBuilder(kudumaster); - kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS); - - KuduClient kuduClient = kuduClientBuilder.build(); - partitionParameter = initPartitionParameter(kuduClient, tableName); - SeaTunnelRowType seaTunnelRowType = - getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns()); - rowTypeInfo = seaTunnelRowType; - } catch (KuduException e) { - throw new KuduConnectorException( - KuduConnectorErrorCode.GENERATE_KUDU_PARAMETERS_FAILED, e); + getPluginName(), PluginType.SINK, ExceptionUtils.getMessage(e))); } - } - private PartitionParameter initPartitionParameter(KuduClient kuduClient, String tableName) { - String keyColumn = null; - int maxKey = 0; - int minKey = 0; - boolean flag = true; - try { - KuduScanner.KuduScannerBuilder kuduScannerBuilder = - kuduClient.newScannerBuilder(kuduClient.openTable(tableName)); - ArrayList columnsList = new ArrayList(); - keyColumn = - kuduClient - .openTable(tableName) - .getSchema() - .getPrimaryKeyColumns() - .get(0) - .getName(); - columnsList.add("" + keyColumn); - kuduScannerBuilder.setProjectedColumnNames(columnsList); - KuduScanner kuduScanner = kuduScannerBuilder.build(); - while (kuduScanner.hasMoreRows()) { - RowResultIterator rowResults = kuduScanner.nextRows(); - while (rowResults.hasNext()) { - RowResult row = rowResults.next(); - int id = row.getInt("" + keyColumn); - if (flag) { - maxKey = id; - minKey = id; - flag = false; - } else { - if (id >= maxKey) { - maxKey = id; - } - if (id <= minKey) { - minKey = id; - } - } - } + if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) { + rowTypeInfo = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType(); + } else { + try (KuduClient kuduClient = KuduUtil.getKuduClient(kuduSourceConfig)) { + rowTypeInfo = + getSeaTunnelRowType( + kuduClient + .openTable(kuduSourceConfig.getTable()) + .getSchema() + .getColumns()); + } catch (Exception e) { + throw new KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e); } - } catch (KuduException e) { - throw new KuduConnectorException( - KuduConnectorErrorCode.GENERATE_KUDU_PARAMETERS_FAILED, - "Failed to generate upper and lower limits for each partition"); } - return new PartitionParameter( - keyColumn, Long.parseLong(minKey + ""), Long.parseLong(maxKey + "")); + kuduInputFormat = new KuduInputFormat(kuduSourceConfig, rowTypeInfo); } public SeaTunnelRowType getSeaTunnelRowType(List columnSchemaList) { diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java index 15a69eabe91..5bb035633d2 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java @@ -19,13 +19,15 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig; import com.google.auto.service.AutoService; -import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.COLUMNS_LIST; -import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.KUDU_MASTER; +import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.MASTER; import static org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig.TABLE_NAME; @AutoService(Factory.class) @@ -38,7 +40,23 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { - return OptionRule.builder().required(KUDU_MASTER, TABLE_NAME, COLUMNS_LIST).build(); + return OptionRule.builder() + .required(MASTER, TABLE_NAME) + .optional(TableSchemaOptions.SCHEMA) + .optional(KuduSourceConfig.WORKER_COUNT) + .optional(KuduSourceConfig.OPERATION_TIMEOUT) + .optional(KuduSourceConfig.ADMIN_OPERATION_TIMEOUT) + .optional(KuduSourceConfig.QUERY_TIMEOUT) + .optional(KuduSourceConfig.SCAN_BATCH_SIZE_BYTES) + .optional(KuduSourceConfig.FILTER) + .optional(KuduSinkConfig.ENABLE_KERBEROS) + .optional(KuduSinkConfig.KERBEROS_KRB5_CONF) + .conditional( + KuduSinkConfig.ENABLE_KERBEROS, + true, + KuduSinkConfig.KERBEROS_PRINCIPAL, + KuduSinkConfig.KERBEROS_KEYTAB) + .build(); } @Override diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java index b52664c3e2e..bece0bc2336 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java @@ -17,20 +17,18 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.source; -import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat; -import org.apache.kudu.ColumnSchema; import org.apache.kudu.client.KuduScanner; import org.apache.kudu.client.RowResult; import org.apache.kudu.client.RowResultIterator; import lombok.extern.slf4j.Slf4j; -import java.util.Collections; +import java.util.ArrayList; import java.util.Deque; import java.util.LinkedList; import java.util.List; @@ -62,33 +60,31 @@ public void close() { @Override public void pollNext(Collector output) throws Exception { - KuduSourceSplit split = splits.poll(); - Object[] parameterValues = split.parameterValues; - int lowerBound = Integer.parseInt(parameterValues[0].toString()); - int upperBound = Integer.parseInt(parameterValues[1].toString()); - List columnSchemaList = kuduInputFormat.getColumnsSchemas(); - KuduScanner kuduScanner = kuduInputFormat.getKuduBuildSplit(lowerBound, upperBound); - // - while (kuduScanner.hasMoreRows()) { - RowResultIterator rowResults = kuduScanner.nextRows(); - while (rowResults.hasNext()) { - RowResult rowResult = rowResults.next(); - SeaTunnelRow seaTunnelRow = - KuduInputFormat.getSeaTunnelRowData( - rowResult, kuduInputFormat.getSeaTunnelRowType(columnSchemaList)); - output.collect(seaTunnelRow); + synchronized (output.getCheckpointLock()) { + KuduSourceSplit split = splits.poll(); + if (null != split) { + KuduScanner kuduScanner = kuduInputFormat.scanner(split.getToken()); + while (kuduScanner.hasMoreRows()) { + RowResultIterator rowResults = kuduScanner.nextRows(); + while (rowResults.hasNext()) { + RowResult rowResult = rowResults.next(); + SeaTunnelRow seaTunnelRow = kuduInputFormat.toInternal(rowResult); + output.collect(seaTunnelRow); + } + } + } else if (noMoreSplit && splits.isEmpty()) { + // signal to the source that we have reached the end of the data. + log.info("Closed the bounded kudu source"); + context.signalNoMoreElement(); + } else { + Thread.sleep(1000L); } } - if (Boundedness.BOUNDED.equals(context.getBoundedness())) { - // signal to the source that we have reached the end of the data. - log.info("Closed the bounded fake source"); - context.signalNoMoreElement(); - } } @Override public List snapshotState(long checkpointId) { - return Collections.emptyList(); + return new ArrayList<>(splits); } @Override diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplit.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplit.java index 68fd6fcdca5..04f4707892a 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplit.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplit.java @@ -27,10 +27,10 @@ public class KuduSourceSplit implements SourceSplit { private static final long serialVersionUID = -1L; - - Object[] parameterValues; public final Integer splitId; + private final byte[] token; + @Override public String splitId() { return splitId.toString(); diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java index b0f84468792..1e229ac9600 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java @@ -18,109 +18,166 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.source; import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException; +import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat; import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; +import java.util.Set; public class KuduSourceSplitEnumerator implements SourceSplitEnumerator { + private static final Logger log = LoggerFactory.getLogger(KuduSourceSplitEnumerator.class); private final SourceSplitEnumerator.Context enumeratorContext; - private PartitionParameter partitionParameter; - List allSplit = new ArrayList<>(); - private Long maxVal; - private Long minVal; - private Long batchSize; - private Integer batchNum; + private KuduSourceState checkpointState; + private KuduSourceConfig kuduSourceConfig; + private final Map> pendingSplits; + + private final KuduInputFormat kuduInputFormat; + + private final Object stateLock = new Object(); + private volatile boolean shouldEnumerate; + + public KuduSourceSplitEnumerator( + Context enumeratorContext, + KuduSourceConfig kuduSourceConfig, + KuduInputFormat kuduInputFormat) { + this(enumeratorContext, kuduSourceConfig, kuduInputFormat, null); + } public KuduSourceSplitEnumerator( SourceSplitEnumerator.Context enumeratorContext, - PartitionParameter partitionParameter) { + KuduSourceConfig kuduSourceConfig, + KuduInputFormat kuduInputFormat, + KuduSourceState checkpointState) { this.enumeratorContext = enumeratorContext; - this.partitionParameter = partitionParameter; + this.kuduSourceConfig = kuduSourceConfig; + this.pendingSplits = new HashMap<>(); + this.kuduInputFormat = kuduInputFormat; + this.shouldEnumerate = checkpointState == null; + if (checkpointState != null) { + this.shouldEnumerate = checkpointState.isShouldEnumerate(); + this.pendingSplits.putAll(checkpointState.getPendingSplits()); + } } @Override - public void open() {} + public void open() { + kuduInputFormat.openInputFormat(); + } @Override - public void run() {} + public void run() throws IOException { + Set readers = enumeratorContext.registeredReaders(); + if (shouldEnumerate) { + Set newSplits = discoverySplits(); + + synchronized (stateLock) { + addPendingSplit(newSplits); + shouldEnumerate = false; + } - @Override - public void close() throws IOException {} + assignSplit(readers); + } - @Override - public void addSplitsBack(List splits, int subtaskId) {} + log.debug( + "No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(enumeratorContext::signalNoMoreSplits); + } - @Override - public int currentUnassignedSplitSize() { - return 0; + private Set discoverySplits() throws IOException { + return kuduInputFormat.createInputSplits(); } @Override - public void handleSplitRequest(int subtaskId) {} + public void close() throws IOException { + kuduInputFormat.closeInputFormat(); + } @Override - public void registerReader(int subtaskId) { - int parallelism = enumeratorContext.currentParallelism(); - if (allSplit.isEmpty()) { - if (null != partitionParameter) { - Serializable[][] parameterValues = - getParameterValues( - partitionParameter.minValue, - partitionParameter.maxValue, - parallelism); - for (int i = 0; i < parameterValues.length; i++) { - allSplit.add(new KuduSourceSplit(parameterValues[i], i)); + public void addSplitsBack(List splits, int subtaskId) { + log.debug("Add back splits {} to KuduSourceSplitEnumerator.", splits); + synchronized (stateLock) { + if (!splits.isEmpty()) { + addPendingSplit(splits); + assignSplit(Collections.singletonList(subtaskId)); + } + } + } + + private void assignSplit(Collection readers) { + log.debug("Assign pendingSplits to readers {}", readers); + + for (int reader : readers) { + List assignmentForReader = pendingSplits.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + log.info("Assign splits {} to reader {}", assignmentForReader, reader); + try { + enumeratorContext.assignSplit(reader, assignmentForReader); + } catch (Exception e) { + log.error( + "Failed to assign splits {} to reader {}", + assignmentForReader, + reader, + e); + pendingSplits.put(reader, assignmentForReader); } - } else { - allSplit.add(new KuduSourceSplit(null, 0)); } } - // Filter the split that the current task needs to run - List splits = - allSplit.stream() - .filter(p -> p.splitId % parallelism == subtaskId) - .collect(Collectors.toList()); - enumeratorContext.assignSplit(subtaskId, splits); - enumeratorContext.signalNoMoreSplits(subtaskId); } - private Serializable[][] getParameterValues(Long minVal, Long maxVal, int parallelism) { - this.maxVal = maxVal; - this.minVal = minVal; - long maxElemCount = (maxVal - minVal) + 1; - batchNum = parallelism; - getBatchSizeAndBatchNum(parallelism); - long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum; - - Serializable[][] parameters = new Serializable[batchNum][2]; - long start = minVal; - for (int i = 0; i < batchNum; i++) { - long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0); - parameters[i] = new Long[] {start, end}; - start = end + 1; + private void addPendingSplit(Collection splits) { + int readerCount = enumeratorContext.currentParallelism(); + for (KuduSourceSplit split : splits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + log.info("Assigning {} to {} reader.", split, ownerReader); + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(split); } - return parameters; } - private void getBatchSizeAndBatchNum(int parallelism) { - batchNum = parallelism; - long maxElemCount = (maxVal - minVal) + 1; - if (batchNum > maxElemCount) { - batchNum = (int) maxElemCount; + private int getSplitOwner(String splitId, int numReaders) { + return (splitId.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + @Override + public int currentUnassignedSplitSize() { + return pendingSplits.size(); + } + + @Override + public void handleSplitRequest(int subtaskId) { + throw new KuduConnectorException( + CommonErrorCode.UNSUPPORTED_OPERATION, + String.format("Unsupported handleSplitRequest: %d", subtaskId)); + } + + @Override + public void registerReader(int subtaskId) { + log.debug("Register reader {} to KuduSourceSplitEnumerator.", subtaskId); + synchronized (stateLock) { + if (!pendingSplits.isEmpty()) { + assignSplit(Collections.singletonList(subtaskId)); + } } - this.batchNum = batchNum; - this.batchSize = new Double(Math.ceil((double) maxElemCount / batchNum)).longValue(); } @Override public KuduSourceState snapshotState(long checkpointId) throws Exception { - return null; + synchronized (stateLock) { + return new KuduSourceState(shouldEnumerate, new HashMap<>(pendingSplits)); + } } @Override diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/PartitionParameter.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduAggregatedCommitInfo.java similarity index 75% rename from seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/PartitionParameter.java rename to seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduAggregatedCommitInfo.java index e791164667c..6abdbf81505 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/PartitionParameter.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduAggregatedCommitInfo.java @@ -15,18 +15,8 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.kudu.source; - -import lombok.AllArgsConstructor; -import lombok.Data; +package org.apache.seatunnel.connectors.seatunnel.kudu.state; import java.io.Serializable; -@Data -@AllArgsConstructor -public class PartitionParameter implements Serializable { - - String partitionColumnName; - Long minValue; - Long maxValue; -} +public class KuduAggregatedCommitInfo implements Serializable {} diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduCommitInfo.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduCommitInfo.java new file mode 100644 index 00000000000..695fb2efc99 --- /dev/null +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduCommitInfo.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kudu.state; + +import java.io.Serializable; + +public class KuduCommitInfo implements Serializable {} diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSinkState.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSinkState.java new file mode 100644 index 00000000000..e229df86d7f --- /dev/null +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSinkState.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kudu.state; + +import java.io.Serializable; + +public class KuduSinkState implements Serializable {} diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSourceState.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSourceState.java index ba5ca8424e5..b5e6edba2a7 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSourceState.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSourceState.java @@ -17,6 +17,18 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.state; +import org.apache.seatunnel.connectors.seatunnel.kudu.source.KuduSourceSplit; + +import lombok.AllArgsConstructor; +import lombok.Getter; + import java.io.Serializable; +import java.util.List; +import java.util.Map; -public class KuduSourceState implements Serializable {} +@AllArgsConstructor +@Getter +public class KuduSourceState implements Serializable { + private boolean shouldEnumerate; + private Map> pendingSplits; +} diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/util/KuduUtil.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/util/KuduUtil.java new file mode 100644 index 00000000000..1006129dd82 --- /dev/null +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/util/KuduUtil.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kudu.util; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig; +import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.apache.kudu.Schema; +import org.apache.kudu.client.AsyncKuduClient; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduScanToken; +import org.apache.kudu.client.KuduTable; + +import lombok.extern.slf4j.Slf4j; +import sun.security.krb5.Config; +import sun.security.krb5.KrbException; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.List; + +@Slf4j +public class KuduUtil { + + private static final String ERROR_MESSAGE = + "principal and keytab can not be null current principal %s keytab %s"; + + public static final String KRB5_CONF_KEY = "java.security.krb5.conf"; + + public static final String HADOOP_AUTH_KEY = "hadoop.security.authentication"; + + public static final String KRB = "kerberos"; + + public static KuduClient getKuduClient(CommonConfig config) { + try { + if (config.getEnableKerberos()) { + synchronized (UserGroupInformation.class) { + UserGroupInformation ugi = loginAndReturnUgi(config); + return ugi.doAs( + (PrivilegedExceptionAction) + () -> getKuduClientInternal(config)); + } + } + return getKuduClientInternal(config); + + } catch (IOException | InterruptedException e) { + throw new KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e); + } + } + + private static UserGroupInformation loginAndReturnUgi(CommonConfig config) throws IOException { + if (StringUtils.isBlank(config.getPrincipal()) || StringUtils.isBlank(config.getKeytab())) { + throw new KuduConnectorException( + CommonErrorCode.ILLEGAL_ARGUMENT, + String.format(ERROR_MESSAGE, config.getPrincipal(), config.getKeytab())); + } + if (StringUtils.isNotBlank(config.getKrb5conf())) { + reloadKrb5conf(config.getKrb5conf()); + } + Configuration conf = new Configuration(); + conf.set(HADOOP_AUTH_KEY, KRB); + UserGroupInformation.setConfiguration(conf); + log.info( + "Start Kerberos authentication using principal {} and keytab {}", + config.getPrincipal(), + config.getKeytab()); + return UserGroupInformation.loginUserFromKeytabAndReturnUGI( + config.getPrincipal(), config.getKeytab()); + } + + private static void reloadKrb5conf(String krb5conf) { + System.setProperty(KRB5_CONF_KEY, krb5conf); + try { + Config.refresh(); + KerberosName.resetDefaultRealm(); + } catch (KrbException e) { + log.warn( + "resetting default realm failed, current default realm will still be used.", e); + } + } + + private static KuduClient getKuduClientInternal(CommonConfig config) { + return new AsyncKuduClient.AsyncKuduClientBuilder( + Arrays.asList(config.getMasters().split(","))) + .workerCount(config.getWorkerCount()) + .defaultAdminOperationTimeoutMs(config.getAdminOperationTimeout()) + .defaultOperationTimeoutMs(config.getOperationTimeout()) + .build() + .syncClient(); + } + + public static List getKuduScanToken( + KuduSourceConfig kuduSourceConfig, String[] columnName) throws IOException { + try (KuduClient client = KuduUtil.getKuduClient(kuduSourceConfig)) { + KuduTable kuduTable = client.openTable(kuduSourceConfig.getTable()); + List columnNameList = Arrays.asList(columnName); + + KuduScanToken.KuduScanTokenBuilder builder = + client.newScanTokenBuilder(kuduTable) + .batchSizeBytes(kuduSourceConfig.getBatchSizeBytes()) + .setTimeout(kuduSourceConfig.getQueryTimeout()) + .setProjectedColumnNames(columnNameList); + + addPredicates(builder, kuduSourceConfig.getFilter(), kuduTable.getSchema()); + + return builder.build(); + } catch (Exception e) { + throw new IOException("Get ScanToken error", e); + } + } + + private static void addPredicates( + KuduScanToken.KuduScanTokenBuilder kuduScanTokenBuilder, String filter, Schema schema) { + // todo Support for where condition + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/pom.xml new file mode 100644 index 00000000000..f841efec03a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/pom.xml @@ -0,0 +1,49 @@ + + + + 4.0.0 + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + + connector-kudu-e2e + SeaTunnel : E2E : Connector V2 : Kudu + + + + org.apache.seatunnel + connector-kudu + ${project.version} + test + + + org.apache.seatunnel + connector-fake + ${project.version} + test + + + + org.testcontainers + toxiproxy + ${testcontainer.version} + test + + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java new file mode 100644 index 00000000000..dd45471060f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduCDCSinkIT.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.kudu; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnTypeAttributes; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.AsyncKuduClient; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.RowResultIterator; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.ToxiproxyContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.google.common.collect.ImmutableList; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.Inet4Address; +import java.net.InterfaceAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.lang.String.format; +import static org.awaitility.Awaitility.await; + +@Slf4j +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK}, + disabledReason = "Currently SPARK do not support cdc") +public class KuduCDCSinkIT extends TestSuiteBase implements TestResource { + + private static final String IMAGE = "apache/kudu:1.15.0"; + private static final Integer KUDU_MASTER_PORT = 7051; + private static final Integer KUDU_TSERVER_PORT = 7053; + private GenericContainer master; + private GenericContainer tServers; + private KuduClient kuduClient; + + private static final String TOXIPROXY_IMAGE = "ghcr.io/shopify/toxiproxy:2.4.0"; + private static final String TOXIPROXY_NETWORK_ALIAS = "toxiproxy"; + private ToxiproxyContainer toxiProxy; + private String KUDU_SINK_TABLE = "kudu_sink_table"; + + @BeforeAll + @Override + public void startUp() throws Exception { + + String hostIP = getHostIPAddress(); + + this.master = + new GenericContainer<>(IMAGE) + .withExposedPorts(KUDU_MASTER_PORT) + .withCommand("master") + .withEnv("MASTER_ARGS", "--default_num_replicas=1") + .withNetwork(NETWORK) + .withNetworkAliases("kudu-master-cdc") + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE))); + + toxiProxy = + new ToxiproxyContainer(TOXIPROXY_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(TOXIPROXY_NETWORK_ALIAS); + toxiProxy.start(); + + String instanceName = "kudu-tserver-cdc"; + + ToxiproxyContainer.ContainerProxy proxy = + toxiProxy.getProxy(instanceName, KUDU_TSERVER_PORT); + + this.tServers = + new GenericContainer<>(IMAGE) + .withExposedPorts(KUDU_TSERVER_PORT) + .withCommand("tserver") + .withEnv("KUDU_MASTERS", "kudu-master-cdc:" + KUDU_MASTER_PORT) + .withNetwork(NETWORK) + .withNetworkAliases(instanceName) + .dependsOn(master) + .withEnv( + "TSERVER_ARGS", + format( + "--fs_wal_dir=/var/lib/kudu/tserver --logtostderr --use_hybrid_clock=false --rpc_bind_addresses=%s:%s --rpc_advertised_addresses=%s:%s", + instanceName, + KUDU_TSERVER_PORT, + hostIP, + proxy.getProxyPort())) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE))); + + Startables.deepStart(Stream.of(master)).join(); + Startables.deepStart(Stream.of(tServers)).join(); + + Awaitility.given() + .ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::getKuduClient); + } + + private void initializeKuduTable() throws KuduException { + + List columns = new ArrayList(); + + columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_bool", Type.BOOL).nullable(true).build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_int8", Type.INT8).nullable(true).build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_int16", Type.INT16) + .nullable(true) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_int32", Type.INT32) + .nullable(true) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_int64", Type.INT64) + .nullable(true) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_float", Type.FLOAT) + .nullable(true) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_double", Type.DOUBLE) + .nullable(true) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_decimal", Type.DECIMAL) + .nullable(true) + .typeAttributes( + new ColumnTypeAttributes.ColumnTypeAttributesBuilder() + .precision(20) + .scale(5) + .build()) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_string", Type.STRING) + .nullable(true) + .build()); + // spark + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_unixtime_micros", Type.UNIXTIME_MICROS) + .nullable(true) + .build()); + + Schema schema = new Schema(columns); + + ImmutableList hashKeys = ImmutableList.of("id"); + CreateTableOptions tableOptions = new CreateTableOptions(); + + tableOptions.addHashPartitions(hashKeys, 2); + tableOptions.setNumReplicas(1); + + kuduClient.createTable(KUDU_SINK_TABLE, schema, tableOptions); + } + + private void getKuduClient() { + kuduClient = + new AsyncKuduClient.AsyncKuduClientBuilder( + Arrays.asList( + "127.0.0.1" + ":" + master.getMappedPort(KUDU_MASTER_PORT))) + .defaultAdminOperationTimeoutMs(120000) + .defaultOperationTimeoutMs(120000) + .build() + .syncClient(); + } + + @TestTemplate + public void testKudu(TestContainer container) throws IOException, InterruptedException { + this.initializeKuduTable(); + Container.ExecResult execResult = container.executeJob("/write-cdc-changelog-to-kudu.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + Stream.>of( + Arrays.asList( + "3", + "true", + "1", + "2", + "3", + "4", + "4.3", + "5.3", + "6.30000", + "NEW", + "2020-02-02 02:02:02.0"), + Arrays.asList( + "1", + "true", + "2", + "2", + "3", + "4", + "4.3", + "5.3", + "6.30000", + "NEW", + "2020-02-02 02:02:02.0")) + .collect(Collectors.toList()), + readData(KUDU_SINK_TABLE)); + }); + + kuduClient.deleteTable(KUDU_SINK_TABLE); + } + + public List> readData(String tableName) throws KuduException { + List> result = new ArrayList<>(); + KuduTable kuduTable = kuduClient.openTable(tableName); + KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build(); + while (scanner.hasMoreRows()) { + RowResultIterator rowResults = scanner.nextRows(); + List row = new ArrayList<>(); + while (rowResults.hasNext()) { + RowResult rowResult = rowResults.next(); + for (int i = 0; i < rowResult.getSchema().getColumns().size(); i++) { + row.add(rowResult.getObject(i).toString()); + } + } + result.add(row); + } + return result; + } + + @Override + public void tearDown() throws Exception { + if (kuduClient != null) { + kuduClient.close(); + } + + if (master != null) { + master.close(); + } + + if (tServers != null) { + tServers.close(); + } + } + + private static String getHostIPAddress() { + try { + Enumeration networkInterfaceEnumeration = + NetworkInterface.getNetworkInterfaces(); + while (networkInterfaceEnumeration.hasMoreElements()) { + for (InterfaceAddress interfaceAddress : + networkInterfaceEnumeration.nextElement().getInterfaceAddresses()) { + if (interfaceAddress.getAddress().isSiteLocalAddress() + && interfaceAddress.getAddress() instanceof Inet4Address) { + return interfaceAddress.getAddress().getHostAddress(); + } + } + } + } catch (SocketException e) { + throw new RuntimeException(e); + } + throw new IllegalStateException( + "Could not find site local ipv4 address, failed to launch kudu"); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java new file mode 100644 index 00000000000..ae26aff7969 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.kudu; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnTypeAttributes; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.AsyncKuduClient; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.OperationResponse; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.RowResultIterator; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.ToxiproxyContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.google.common.collect.ImmutableList; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.math.BigDecimal; +import java.net.Inet4Address; +import java.net.InterfaceAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import static java.lang.String.format; +import static org.awaitility.Awaitility.await; + +@Slf4j +@DisabledOnContainer( + value = {}, + disabledReason = "Override TestSuiteBase @DisabledOnContainer") +public class KuduIT extends TestSuiteBase implements TestResource { + + private static final String IMAGE = "apache/kudu:1.15.0"; + private static final Integer KUDU_MASTER_PORT = 7051; + private static final Integer KUDU_TSERVER_PORT = 7050; + private GenericContainer master; + private GenericContainer tServers; + private KuduClient kuduClient; + + private static final String TOXIPROXY_IMAGE = "ghcr.io/shopify/toxiproxy:2.4.0"; + private static final String TOXIPROXY_NETWORK_ALIAS = "toxiproxy"; + private ToxiproxyContainer toxiProxy; + + private String KUDU_SOURCE_TABLE = "kudu_source_table"; + private String KUDU_SINK_TABLE = "kudu_sink_table"; + + @BeforeAll + @Override + public void startUp() throws Exception { + + String hostIP = getHostIPAddress(); + + this.master = + new GenericContainer<>(IMAGE) + .withExposedPorts(KUDU_MASTER_PORT) + .withCommand("master") + .withEnv("MASTER_ARGS", "--default_num_replicas=1") + .withNetwork(NETWORK) + .withNetworkAliases("kudu-master") + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE))); + + toxiProxy = + new ToxiproxyContainer(TOXIPROXY_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(TOXIPROXY_NETWORK_ALIAS); + toxiProxy.start(); + + String instanceName = "kudu-tserver"; + + ToxiproxyContainer.ContainerProxy proxy = + toxiProxy.getProxy(instanceName, KUDU_TSERVER_PORT); + + this.tServers = + new GenericContainer<>(IMAGE) + .withExposedPorts(KUDU_TSERVER_PORT) + .withCommand("tserver") + .withEnv("KUDU_MASTERS", "kudu-master:" + KUDU_MASTER_PORT) + .withNetwork(NETWORK) + .withNetworkAliases(instanceName) + .dependsOn(master) + .withEnv( + "TSERVER_ARGS", + format( + "--fs_wal_dir=/var/lib/kudu/tserver --logtostderr --use_hybrid_clock=false --rpc_bind_addresses=%s:%s --rpc_advertised_addresses=%s:%s", + instanceName, + KUDU_TSERVER_PORT, + hostIP, + proxy.getProxyPort())) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE))); + + Startables.deepStart(Stream.of(master)).join(); + Startables.deepStart(Stream.of(tServers)).join(); + + Awaitility.given() + .ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::getKuduClient); + } + + private void batchInsertData() throws KuduException { + KuduTable table = kuduClient.openTable(KUDU_SOURCE_TABLE); + KuduSession kuduSession = kuduClient.newSession(); + for (int i = 0; i < 100; i++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addObject("id", i); + row.addObject("val_bool", true); + row.addObject("val_int8", (byte) 1); + row.addObject("val_int16", (short) 300); + row.addObject("val_int32", 30000); + row.addObject("val_int64", 30000000L); + row.addObject("val_float", 1.0f); + row.addObject("val_double", 2.0d); + row.addObject("val_decimal", new BigDecimal("1.1212")); + row.addObject("val_string", "test"); + row.addObject("val_unixtime_micros", new java.sql.Timestamp(1693477266998L)); + row.addObject("val_binary", "NEW".getBytes()); + OperationResponse response = kuduSession.apply(insert); + } + } + + private void initializeKuduTable() throws KuduException { + + List columns = new ArrayList(); + + columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_bool", Type.BOOL).nullable(true).build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_int8", Type.INT8).nullable(true).build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_int16", Type.INT16) + .nullable(true) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_int32", Type.INT32) + .nullable(true) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_int64", Type.INT64) + .nullable(true) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_float", Type.FLOAT) + .nullable(true) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_double", Type.DOUBLE) + .nullable(true) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_decimal", Type.DECIMAL) + .nullable(true) + .typeAttributes( + new ColumnTypeAttributes.ColumnTypeAttributesBuilder() + .precision(20) + .scale(5) + .build()) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_string", Type.STRING) + .nullable(true) + .build()); + // spark + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_unixtime_micros", Type.UNIXTIME_MICROS) + .nullable(true) + .build()); + columns.add( + new ColumnSchema.ColumnSchemaBuilder("val_binary", Type.BINARY) + .nullable(true) + .build()); + + Schema schema = new Schema(columns); + + ImmutableList hashKeys = ImmutableList.of("id"); + CreateTableOptions tableOptions = new CreateTableOptions(); + + tableOptions.addHashPartitions(hashKeys, 2); + tableOptions.setNumReplicas(1); + kuduClient.createTable(KUDU_SOURCE_TABLE, schema, tableOptions); + kuduClient.createTable(KUDU_SINK_TABLE, schema, tableOptions); + } + + private void getKuduClient() { + kuduClient = + new AsyncKuduClient.AsyncKuduClientBuilder( + Arrays.asList( + "127.0.0.1" + ":" + master.getMappedPort(KUDU_MASTER_PORT))) + .defaultAdminOperationTimeoutMs(120000) + .defaultOperationTimeoutMs(120000) + .build() + .syncClient(); + } + + @TestTemplate + public void testKudu(TestContainer container) throws IOException, InterruptedException { + initializeKuduTable(); + batchInsertData(); + Container.ExecResult execResult = container.executeJob("/kudu_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + readData(KUDU_SINK_TABLE), readData(KUDU_SOURCE_TABLE)); + }); + kuduClient.deleteTable(KUDU_SOURCE_TABLE); + kuduClient.deleteTable(KUDU_SINK_TABLE); + } + + public List readData(String tableName) throws KuduException { + List result = new ArrayList<>(); + KuduTable kuduTable = kuduClient.openTable(tableName); + KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable).build(); + while (scanner.hasMoreRows()) { + RowResultIterator rowResults = scanner.nextRows(); + while (rowResults.hasNext()) { + RowResult rowResult = rowResults.next(); + result.add(rowResult.rowToString()); + } + } + return result; + } + + @Override + public void tearDown() throws Exception { + if (kuduClient != null) { + kuduClient.close(); + } + + if (master != null) { + master.close(); + } + + if (tServers != null) { + tServers.close(); + } + } + + private static String getHostIPAddress() { + try { + Enumeration networkInterfaceEnumeration = + NetworkInterface.getNetworkInterfaces(); + while (networkInterfaceEnumeration.hasMoreElements()) { + for (InterfaceAddress interfaceAddress : + networkInterfaceEnumeration.nextElement().getInterfaceAddresses()) { + if (interfaceAddress.getAddress().isSiteLocalAddress() + && interfaceAddress.getAddress() instanceof Inet4Address) { + return interfaceAddress.getAddress().getHostAddress(); + } + } + } + } catch (SocketException e) { + throw new RuntimeException(e); + } + throw new IllegalStateException( + "Could not find site local ipv4 address, failed to launch kudu"); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_console.conf new file mode 100644 index 00000000000..872dd41af04 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_console.conf @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "STREAMING" + execution.checkpoint.interval = 5000 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + kudu{ + kudu_masters = "kudu-master:7051" + table_name = "kudu_source_table" + result_table_name = "kudu" +} +} + +transform { +} + +sink { + console { + source_table_name = "kudu" + } + + kudu{ + source_table_name = "kudu" + kudu_masters = "kudu-master:7051" + table_name = "kudu_sink_table" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf new file mode 100644 index 00000000000..6ad7d6fd426 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/write-cdc-changelog-to-kudu.conf @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + + source { + FakeSource { + schema = { + fields { + id = int + val_bool = boolean + val_int8 = tinyint + val_int16 = smallint + val_int32 = int + val_int64 = bigint + val_float = float + val_double = double + val_decimal = "decimal(16, 1)" + val_string = string + val_unixtime_micros = timestamp + } + } + rows = [ + { + kind = INSERT + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = INSERT + fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_BEFORE + fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = UPDATE_AFTER + fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + }, + { + kind = DELETE + fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] + } + ] + } + } + +sink { + kudu{ + kudu_masters = "kudu-master-cdc:7051" + table_name = "kudu_sink_table" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 8644b551b2f..fbbf3785282 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -61,6 +61,7 @@ connector-file-ftp-e2e connector-pulsar-e2e connector-paimon-e2e + connector-kudu-e2e