Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Connector-V2] Add Kudu source and sink connector #2254

Merged
merged 23 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ seatunnel.sink.Clickhouse = connector-clickhouse
seatunnel.sink.ClickhouseFile = connector-clickhouse
seatunnel.source.Jdbc = connector-jdbc
seatunnel.sink.Jdbc = connector-jdbc
seatunnel.source.Kudu = connector-Kudu
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be seatunnel.source.Kudu = connector-kudu

seatunnel.sink.Kudu = connector-Kudu
seatunnel.sink.HdfsFile = connector-file-hadoop
seatunnel.sink.LocalFile = connector-file-local
seatunnel.source.Pulsar = connector-pulsar
Expand Down
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@
<mongo-spark.version>2.2.0</mongo-spark.version>
<spark-redis.version>2.6.0</spark-redis.version>
<commons-lang3.version>3.4</commons-lang3.version>
<kudu.version>1.11.1</kudu.version>
<commons-collections4.version>4.4</commons-collections4.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<spark.scope>provided</spark.scope>
Expand Down Expand Up @@ -261,7 +262,12 @@
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>

<!--kudu -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
<!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -1319,4 +1325,4 @@
</plugins>
</build>

</project>
</project>
30 changes: 30 additions & 0 deletions seatunnel-connectors-v2/connector-kudu/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, you forget the license header.

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>seatunnel-connectors-v2</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-kudu</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.kudu.config;

import lombok.Data;
import lombok.NonNull;

import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;



@Data
public class KuduSinkConfig {

private static final String KUDU_SAVE_MODE = "save_mode";
private static final String KUDU_MASTER = "kudu_master";
private static final String KUDU_TABLE_NAME = "kudu_table";

private SaveMode saveMode = SaveMode.APPEND;

private String kuduMaster;

/**
* Specifies the name of the table
*/
private String kuduTableName;

public enum SaveMode {
APPEND(),
OVERWRITE();

public static SaveMode fromStr(String str) {
if ("overwrite".equals(str)) {
return OVERWRITE;
} else {
return APPEND;
}
}
}

public KuduSinkConfig(@NonNull Config pluginConfig) {

this.saveMode = StringUtils.isBlank(pluginConfig.getString(KUDU_SAVE_MODE)) ? SaveMode.APPEND : SaveMode.fromStr(pluginConfig.getString(KUDU_SAVE_MODE));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some problem in this line

pluginConfig.getString("xxxx") will throw NullPointException when xxxx not in config file. So we need judge if xxxx is config file by pluginConfig.hasPath("xxxx").


this.kuduMaster = pluginConfig.getString(KUDU_MASTER);
this.kuduTableName = pluginConfig.getString(KUDU_TABLE_NAME);


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.kudu.config;



import java.io.Serializable;

public class KuduSourceConfig implements Serializable {
//kudu master ip
public static final String kuduMaster = "kudu_master";

public static final String tableName = "kudu_table";

public static final String columnsList = "columnsList";



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.*;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.table.type.*;
import org.apache.seatunnel.common.constants.PluginType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class KuduInputFormat implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(KuduInputFormat.class);

public KuduInputFormat(String kuduMaster,String tableName,String columnsList){
this.kuduMaster=kuduMaster;
this.columnsList=Arrays.asList(columnsList.split(","));
this.tableName=tableName;
// openInputFormat();
}
/**
* Declare the global variable KuduClient and use it to manipulate the Kudu table
*/
public KuduClient kuduClient;

/**
* Specify kuduMaster address
*/
public String kuduMaster;
public List<String> columnsList;
public Schema schema;
public String keyColumn;

/**
* Specifies the name of the table
*/
public String tableName;
public List<ColumnSchema> getColumnsSchemas(){
List<ColumnSchema> columns = null;
try {
schema = kuduClient.openTable(tableName).getSchema();
keyColumn = schema.getPrimaryKeyColumns().get(0).getName();
columns =schema.getColumns();
} catch (KuduException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the KuduException be catch, this method will return null. But I found you didn't handle the null value when you use the return value of this method.

e.printStackTrace();
}
return columns;
}

public static SeaTunnelRow getSeaTunnelRowData(RowResult rs, SeaTunnelRowType typeInfo) throws SQLException {

List<Object> fields = new ArrayList<>();
SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();

for (int i = 0; i < seaTunnelDataTypes.length; i++) {
Object seatunnelField;
SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i];
if (null == rs.getObject(i)) {
seatunnelField = null;
} else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getBoolean(i);
} else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getByte(i);
} else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getShort(i);
} else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getInt(i);
} else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getLong(i);
} else if (seaTunnelDataType instanceof DecimalType) {
Object value = rs.getObject(i);
seatunnelField = value instanceof BigInteger ?
new BigDecimal((BigInteger) value, 0)
: value;
} else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getFloat(i);
} else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getDouble(i);
} else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getString(i);
} else {
throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
}
fields.add(seatunnelField);
}

return new SeaTunnelRow(fields.toArray());
}

public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {

ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
try {

for (int i = 0; i < columnSchemaList.size(); i++) {
fieldNames.add(columnSchemaList.get(i).getName());
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
}
} catch (Exception e) {
logger .warn("get row type info exception", e);
throw new PrepareFailException("kudu", PluginType.SOURCE, e.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add an exception Message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new PrepareFailException("kudu", PluginType.SOURCE, e.toString()); I don't quite understand,There's an exception message in this one

}
return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
}

public void openInputFormat() {

KuduClient.KuduClientBuilder kuduClientBuilder = new
KuduClient.KuduClientBuilder(kuduMaster);
kuduClientBuilder.defaultOperationTimeoutMs(1800000);

kuduClient = kuduClientBuilder.build();

logger.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);

}


/**
*
* @param lowerBound The beginning of each slice
* @param upperBound End of each slice
* @return Get the kuduScanner object for each slice
*/
public KuduScanner getKuduBuildSplit(int lowerBound,int upperBound){
KuduScanner kuduScanner = null;
try {
KuduScanner.KuduScannerBuilder kuduScannerBuilder =
kuduClient.newScannerBuilder(kuduClient.openTable(tableName));

kuduScannerBuilder.setProjectedColumnNames(columnsList);

KuduPredicate lowerPred = KuduPredicate.newComparisonPredicate(
schema.getColumn(""+keyColumn),
KuduPredicate.ComparisonOp.GREATER_EQUAL,
lowerBound);

KuduPredicate upperPred = KuduPredicate.newComparisonPredicate(
schema.getColumn(""+keyColumn),
KuduPredicate.ComparisonOp.LESS,
upperBound);

kuduScanner = kuduScannerBuilder.addPredicate(lowerPred)
.addPredicate(upperPred).build();
} catch (KuduException e) {
e.printStackTrace();
logger .warn("get the Kuduscan object for each splice exception", e);
}
return kuduScanner;
}

public void closeInputFormat() {
if (kuduClient != null) {
try {
kuduClient.close();
} catch ( KuduException e) {
logger.warn("Kudu Client close failed.", e);
} finally {
kuduClient = null;
}
}

}
}
Loading