-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Connector-V2] Add Kudu source and sink connector #2254
Conversation
add kudu dependency
add kudu config
add kudu
plugin-mapping.properties
Outdated
@@ -102,6 +102,8 @@ seatunnel.sink.Clickhouse = connector-clickhouse | |||
seatunnel.sink.ClickhouseFile = connector-clickhouse | |||
seatunnel.source.Jdbc = connector-jdbc | |||
seatunnel.sink.Jdbc = connector-jdbc | |||
seatunnel.source.Kudu = connector-Kudu |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be seatunnel.source.Kudu = connector-kudu
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi, you forget the license header.
} catch (Exception e) { | ||
logger .warn("get row type info exception", e); | ||
throw new PrepareFailException("kudu", PluginType.SOURCE, e.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add an exception Message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new PrepareFailException("kudu", PluginType.SOURCE, e.toString()); I don't quite understand,There's an exception message in this one
...rc/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
Show resolved
Hide resolved
Can you change this pr title to |
ok,tks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some suggestions
|
||
public KuduSinkConfig(@NonNull Config pluginConfig) { | ||
|
||
this.saveMode = StringUtils.isBlank(pluginConfig.getString(KUDU_SAVE_MODE)) ? SaveMode.APPEND : SaveMode.fromStr(pluginConfig.getString(KUDU_SAVE_MODE)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some problem in this line
pluginConfig.getString("xxxx")
will throw NullPointException when xxxx
not in config file. So we need judge if xxxx
is config file by pluginConfig.hasPath("xxxx")
.
schema = kuduClient.openTable(tableName).getSchema(); | ||
keyColumn = schema.getPrimaryKeyColumns().get(0).getName(); | ||
columns =schema.getColumns(); | ||
} catch (KuduException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the KuduException
be catch, this method will return null
. But I found you didn't handle the null
value when you use the return value of this method.
import java.util.List; | ||
import java.util.Map; | ||
|
||
public class KuduSinkAggregatedCommitter implements SinkAggregatedCommitter<KuduCommitInfo, KuduAggregatedCommitInfo> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the connector don't need store state and 2PC , You can only extends AbstractSimpleSink
and that will be simply.
Please resolve ci error. Thanks |
Hi,@CalvinKirs I have finished the modification and provided the document, please help to review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 cc @CalvinKirs @ashulin
public static String getMessage(Throwable e) { | ||
StringWriter sw = null; | ||
PrintWriter pw = null; | ||
try { | ||
sw = new StringWriter(); | ||
pw = new PrintWriter(sw); | ||
// Output the error stack information to the printWriter | ||
e.printStackTrace(pw); | ||
pw.flush(); | ||
sw.flush(); | ||
} finally { | ||
if (sw != null) { | ||
try { | ||
sw.close(); | ||
} catch (IOException e1) { | ||
e1.printStackTrace(); | ||
} | ||
} | ||
if (pw != null) { | ||
pw.close(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
used try-with-resources
to simplify this
* 0 * Update pom.xml add kudu dependency * Update plugin-mapping.properties add kudu config * Update pom.xml add kudu * add email sink connector * Delete seatunnel-connectors-v2/connector-email directory * Update plugin-mapping.properties * Update plugin-mapping.properties * Update pom.xml * Create pom.xml * [Connector-V2] Add Kudu source and sink connector * [Connector-V2] Add Kudu source and sink connector * [Connector-V2] update * [Connector-V2] solve ci error * [Connector-V2] fix problem on code review * [Connector-V2] add kudu usage document and fix problem on code review * Update ExceptionUtil.java * Update pom.xml * Update Kudu.md * Update ExceptionUtil.java Co-authored-by: Hisoka <fanjiaeminem@qq.com>
Purpose of this pull request
Check list
New License Guide