From 173779823dc856de03517d94e04230be5d58c556 Mon Sep 17 00:00:00 2001
From: 2013650523 <1067341434@qq.com>
Date: Fri, 22 Jul 2022 16:20:06 +0800
Subject: [PATCH 01/26] 0
---
.../connector-kudu/pom.xml | 30 +++
.../seatunnel/kudu/config/KuduSinkConfig.java | 66 ++++++
.../kudu/config/KuduSourceConfig.java | 34 +++
.../kudu/kuduclient/KuduInputFormat.java | 173 ++++++++++++++++
.../kudu/kuduclient/KuduOutputFormat.java | 165 +++++++++++++++
.../kudu/kuduclient/KuduTypeMapper.java | 104 ++++++++++
.../kudu/sink/KuduAggregatedCommitInfo.java | 31 +++
.../seatunnel/kudu/sink/KuduCommitInfo.java | 32 +++
.../seatunnel/kudu/sink/KuduSink.java | 93 +++++++++
.../sink/KuduSinkAggregatedCommitter.java | 53 +++++
.../seatunnel/kudu/sink/KuduSinkState.java | 29 +++
.../seatunnel/kudu/sink/KuduSinkWriter.java | 82 ++++++++
.../seatunnel/kudu/source/KuduSource.java | 193 ++++++++++++++++++
.../kudu/source/KuduSourceReader.java | 116 +++++++++++
.../kudu/source/KuduSourceSplit.java | 37 ++++
.../source/KuduSourceSplitEnumerator.java | 132 ++++++++++++
.../kudu/source/PartitionParameter.java | 32 +++
.../seatunnel/kudu/state/KuduSinkState.java | 23 +++
.../main/resources/kudu_to_kudu_flink.conf | 60 ++++++
.../main/resources/kudu_to_kudu_spark.conf | 64 ++++++
20 files changed, 1549 insertions(+)
create mode 100644 seatunnel-connectors-v2/connector-kudu/pom.xml
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduAggregatedCommitInfo.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduCommitInfo.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSink.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkAggregatedCommitter.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkState.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplit.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/PartitionParameter.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/state/KuduSinkState.java
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_flink.conf
create mode 100644 seatunnel-connectors-v2/connector-kudu/src/main/resources/kudu_to_kudu_spark.conf
diff --git a/seatunnel-connectors-v2/connector-kudu/pom.xml b/seatunnel-connectors-v2/connector-kudu/pom.xml
new file mode 100644
index 00000000000..c8d3591abb1
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/pom.xml
@@ -0,0 +1,30 @@
+
+
+
+ seatunnel-connectors-v2
+ org.apache.seatunnel
+ ${revision}
+
+ 4.0.0
+
+ connector-kudu
+
+
+
+ org.apache.seatunnel
+ seatunnel-api
+ ${project.version}
+
+
+ org.apache.kudu
+ kudu-client
+
+
+
+ org.apache.commons
+ commons-lang3
+
+
+
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
new file mode 100644
index 00000000000..8f4f3ae44bf
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+
+import lombok.Data;
+import lombok.NonNull;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+
+
+@Data
+public class KuduSinkConfig {
+
+ private static final String KUDU_SAVE_MODE = "save_mode";
+ private static final String KUDU_MASTER = "kudu_master";
+ private static final String KUDU_TABLE_NAME = "kudu_table";
+
+ private SaveMode saveMode = SaveMode.APPEND;
+
+ private String kuduMaster;
+
+ /**
+ * Specifies the name of the table
+ */
+ private String kuduTableName;
+
+ public enum SaveMode {
+ APPEND(),
+ OVERWRITE();
+
+ public static SaveMode fromStr(String str) {
+ if ("overwrite".equals(str)) {
+ return OVERWRITE;
+ } else {
+ return APPEND;
+ }
+ }
+ }
+
+ public KuduSinkConfig(@NonNull Config pluginConfig) {
+
+ this.saveMode = StringUtils.isBlank(pluginConfig.getString(KUDU_SAVE_MODE)) ? SaveMode.APPEND : SaveMode.fromStr(pluginConfig.getString(KUDU_SAVE_MODE));
+
+ this.kuduMaster = pluginConfig.getString(KUDU_MASTER);
+ this.kuduTableName = pluginConfig.getString(KUDU_TABLE_NAME);
+
+
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
new file mode 100644
index 00000000000..9b148140c12
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceConfig.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+
+
+
+import java.io.Serializable;
+
+public class KuduSourceConfig implements Serializable {
+ //kudu master ip
+ public static final String kuduMaster = "kudu_master";
+
+ public static final String tableName = "kudu_table";
+
+ public static final String columnsList = "columnsList";
+
+
+
+}
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
new file mode 100644
index 00000000000..6c9a2e4629c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
@@ -0,0 +1,173 @@
+package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.client.*;
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.table.type.*;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class KuduInputFormat implements Serializable {
+ private static final Logger logger = LoggerFactory.getLogger(KuduInputFormat.class);
+
+ public KuduInputFormat(String kuduMaster,String tableName,String columnsList){
+ this.kuduMaster=kuduMaster;
+ this.columnsList=Arrays.asList(columnsList.split(","));
+ this.tableName=tableName;
+ // openInputFormat();
+ }
+ /**
+ * Declare the global variable KuduClient and use it to manipulate the Kudu table
+ */
+ public KuduClient kuduClient;
+
+ /**
+ * Specify kuduMaster address
+ */
+ public String kuduMaster;
+ public List columnsList;
+ public Schema schema;
+ public String keyColumn;
+
+ /**
+ * Specifies the name of the table
+ */
+ public String tableName;
+ public List getColumnsSchemas(){
+ List columns = null;
+ try {
+ schema = kuduClient.openTable(tableName).getSchema();
+ keyColumn = schema.getPrimaryKeyColumns().get(0).getName();
+ columns =schema.getColumns();
+ } catch (KuduException e) {
+ e.printStackTrace();
+ }
+ return columns;
+ }
+
+ public static SeaTunnelRow getSeaTunnelRowData(RowResult rs, SeaTunnelRowType typeInfo) throws SQLException {
+
+ List