Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API-Draft] [Connector] Add Clickhouse source and sink connector #2051

Merged
merged 26 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ba487e1
update sink template code
Hisoka-X Jun 15, 2022
45efe70
fix plugin discovery maybe repeat
Hisoka-X Jun 15, 2022
784ddbf
add clickhouse client sink
Hisoka-X Jun 17, 2022
1860699
add clickhouse file sink
Hisoka-X Jun 18, 2022
ba6be12
add clickhouse file sink
Hisoka-X Jun 20, 2022
0ba5ffb
Merge branch 'api-draft' into api-draft-ck-sink
Hisoka-X Jun 22, 2022
d4c9375
support clickhouse file sink
Hisoka-X Jun 22, 2022
36a9773
fix clickhouse client sink bug and fix SinkWriter should be serialize…
Hisoka-X Jun 23, 2022
1729054
fix clickhouse file sink bug and not call seatunnel writer close meth…
Hisoka-X Jun 23, 2022
f353bfe
fix check style
Hisoka-X Jun 23, 2022
65610d3
fix check style
Hisoka-X Jun 23, 2022
4ad657c
fix check style
Hisoka-X Jun 23, 2022
bd4cae8
update known-dependencies
Hisoka-X Jun 23, 2022
9e20eaa
update known-dependencies
Hisoka-X Jun 23, 2022
1d5dd02
fix clickhouse file sink chown bug
Hisoka-X Jun 24, 2022
dd10df5
Merge branch 'api-draft' into api-draft-ck-sink
Hisoka-X Jun 25, 2022
3411dca
add UTF8String support
Hisoka-X Jun 25, 2022
b7f1336
add missed clickhouse license
Hisoka-X Jun 27, 2022
fbcd212
Merge branch 'api-draft' into api-draft-ck-sink
Hisoka-X Jun 27, 2022
5c37b09
Merge branch 'api-draft' into api-draft-ck-sink
Hisoka-X Jun 27, 2022
3f964e1
Merge branch 'api-draft' into api-draft-ck-sink
Hisoka-X Jun 27, 2022
be69c59
fix clickhouse override error
Hisoka-X Jun 27, 2022
06e196a
fix clickhouse override error
Hisoka-X Jun 27, 2022
6fc2e96
add httpmime license
Hisoka-X Jun 28, 2022
6e4b891
update NOTICE
Hisoka-X Jun 28, 2022
8fea831
remove unused commons-io
Hisoka-X Jun 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ default List<StateT> snapshotState(long checkpointId) throws IOException {
*/
void close() throws IOException;

interface Context {
interface Context extends Serializable{

/**
* Gets the configuration with which Job was started.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ public SeaTunnelDataType<?> getFieldType(int index) {
return fieldTypes[index];
}

public int indexOf(String fieldName) {
for (int i = 0; i < fieldNames.length; i++) {
if (fieldNames[i].equals(fieldName)) {
return i;
}
}
throw new IllegalArgumentException(String.format("can't find field %s", fieldName));
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
Expand Down
3 changes: 3 additions & 0 deletions seatunnel-connectors/plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,8 @@ seatunnel.sink.Kafka = seatunnel-connector-seatunnel-kafka
seatunnel.source.Http = seatunnel-connector-seatunnel-http
seatunnel.source.Socket = seatunnel-connector-seatunnel-socket
seatunnel.sink.Hive = seatunnel-connector-seatunnel-hive
seatunnel.source.Clickhouse = seatunnel-connector-seatunnel-clickhouse
seatunnel.sink.Clickhouse = seatunnel-connector-seatunnel-clickhouse
seatunnel.sink.ClickhouseFile = seatunnel-connector-seatunnel-clickhouse
seatunnel.source.Jdbc = seatunnel-connector-seatunnel-jdbc
seatunnel.sink.Jdbc = seatunnel-connector-seatunnel-jdbc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
import org.apache.flink.types.Row;
import ru.yandex.clickhouse.ClickHouseConnection;

import javax.annotation.Nullable;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -73,7 +71,6 @@ public Config getConfig() {
return config;
}

@Nullable
@Override
public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
ClickhouseOutputFormat clickhouseOutputFormat = new ClickhouseOutputFormat(config, shardMetadata, fields, tableSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private Map<String, ClickhouseFieldInjectFunction> initFieldInjectFunctionMap()
break;
}
}
result.put(field, function);
result.put(fieldType, function);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,35 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.sshd</groupId>
<artifactId>sshd-scp</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<!-- TODO add to dependency management after version unify -->
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-http-client</artifactId>
<version>0.3.2-patch9</version>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>

<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-patch9</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;

public enum ClickhouseFileCopyMethod {
SCP("scp"),
RSYNC("rsync"),
;
private final String name;

ClickhouseFileCopyMethod(String name) {
this.name = name;
}

public String getName() {
return name;
}

public static ClickhouseFileCopyMethod from(String name) {
for (ClickhouseFileCopyMethod clickhouseFileCopyMethod : ClickhouseFileCopyMethod.values()) {
if (clickhouseFileCopyMethod.getName().equalsIgnoreCase(name)) {
return clickhouseFileCopyMethod;
}
}
throw new IllegalArgumentException("Unknown ClickhouseFileCopyMethod: " + name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,80 @@

package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;

/**
* The config of clickhouse
*/
public class Config {

public static final String NODE_ADDRESS = "node_address";
/**
* Bulk size of clickhouse jdbc
*/
public static final String BULK_SIZE = "bulk_size";

public static final String DATABASE = "database";
/**
* Clickhouse fields
*/
public static final String FIELDS = "fields";

public static final String SQL = "sql";

/**
* Clickhouse server host
*/
public static final String HOST = "host";

/**
* Clickhouse table name
*/
public static final String TABLE = "table";

/**
* Clickhouse database name
*/
public static final String DATABASE = "database";

/**
* Clickhouse server username
*/
public static final String USERNAME = "username";

/**
* Clickhouse server password
*/
public static final String PASSWORD = "password";

/**
* Split mode when table is distributed engine
*/
public static final String SPLIT_MODE = "split_mode";

/**
* When split_mode is true, the sharding_key use for split
*/
public static final String SHARDING_KEY = "sharding_key";

/**
* ClickhouseFile sink connector used clickhouse-local program's path
*/
public static final String CLICKHOUSE_LOCAL_PATH = "clickhouse_local_path";

/**
* The method of copy Clickhouse file
*/
public static final String COPY_METHOD = "copy_method";

/**
* The size of each batch read temporary data into local file.
*/
public static final String TMP_BATCH_CACHE_LINE = "tmp_batch_cache_line";

/**
* The password of Clickhouse server node
*/
public static final String NODE_PASS = "node_pass";

/**
* The address of Clickhouse server node
*/
public static final String NODE_ADDRESS = "node_address";

public static final String CLICKHOUSE_PREFIX = "clickhouse.";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;

import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

public class FileReaderOption implements Serializable {

private ShardMetadata shardMetadata;
private Map<String, String> tableSchema;
private List<String> fields;
private String clickhouseLocalPath;
private ClickhouseFileCopyMethod copyMethod;
private boolean nodeFreePass;
private Map<String, String> nodePassword;
private SeaTunnelRowType seaTunnelRowType;

public FileReaderOption(ShardMetadata shardMetadata, Map<String, String> tableSchema,
List<String> fields, String clickhouseLocalPath,
ClickhouseFileCopyMethod copyMethod,
Map<String, String> nodePassword) {
this.shardMetadata = shardMetadata;
this.tableSchema = tableSchema;
this.fields = fields;
this.clickhouseLocalPath = clickhouseLocalPath;
this.copyMethod = copyMethod;
this.nodePassword = nodePassword;
}

public SeaTunnelRowType getSeaTunnelRowType() {
return seaTunnelRowType;
}

public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
}

public boolean isNodeFreePass() {
return nodeFreePass;
}

public void setNodeFreePass(boolean nodeFreePass) {
this.nodeFreePass = nodeFreePass;
}

public String getClickhouseLocalPath() {
return clickhouseLocalPath;
}

public void setClickhouseLocalPath(String clickhouseLocalPath) {
this.clickhouseLocalPath = clickhouseLocalPath;
}

public ClickhouseFileCopyMethod getCopyMethod() {
return copyMethod;
}

public void setCopyMethod(ClickhouseFileCopyMethod copyMethod) {
this.copyMethod = copyMethod;
}

public Map<String, String> getNodePassword() {
return nodePassword;
}

public void setNodePassword(Map<String, String> nodePassword) {
this.nodePassword = nodePassword;
}

public ShardMetadata getShardMetadata() {
return shardMetadata;
}

public void setShardMetadata(ShardMetadata shardMetadata) {
this.shardMetadata = shardMetadata;
}

public Map<String, String> getTableSchema() {
return tableSchema;
}

public void setTableSchema(Map<String, String> tableSchema) {
this.tableSchema = tableSchema;
}

public List<String> getFields() {
return fields;
}

public void setFields(List<String> fields) {
this.fields = fields;
}
}
Loading