diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java index 56f97bac931..268d3d40e6a 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkWriter.java @@ -74,7 +74,7 @@ default List snapshotState(long checkpointId) throws IOException { */ void close() throws IOException; - interface Context { + interface Context extends Serializable{ /** * Gets the configuration with which Job was started. diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java index 848cdf43c37..45164c2463b 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java @@ -76,6 +76,15 @@ public SeaTunnelDataType getFieldType(int index) { return fieldTypes[index]; } + public int indexOf(String fieldName) { + for (int i = 0; i < fieldNames.length; i++) { + if (fieldNames[i].equals(fieldName)) { + return i; + } + } + throw new IllegalArgumentException(String.format("can't find field %s", fieldName)); + } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/seatunnel-connectors/plugin-mapping.properties b/seatunnel-connectors/plugin-mapping.properties index d063545948f..b14f2f035e9 100644 --- a/seatunnel-connectors/plugin-mapping.properties +++ b/seatunnel-connectors/plugin-mapping.properties @@ -92,5 +92,8 @@ seatunnel.sink.Kafka = seatunnel-connector-seatunnel-kafka seatunnel.source.Http = seatunnel-connector-seatunnel-http seatunnel.source.Socket = seatunnel-connector-seatunnel-socket seatunnel.sink.Hive = seatunnel-connector-seatunnel-hive +seatunnel.source.Clickhouse = seatunnel-connector-seatunnel-clickhouse +seatunnel.sink.Clickhouse = seatunnel-connector-seatunnel-clickhouse +seatunnel.sink.ClickhouseFile = seatunnel-connector-seatunnel-clickhouse seatunnel.source.Jdbc = seatunnel-connector-seatunnel-jdbc seatunnel.sink.Jdbc = seatunnel-connector-seatunnel-jdbc diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java index 620e921e11f..36ab1a3d59f 100644 --- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java +++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseBatchSink.java @@ -46,8 +46,6 @@ import org.apache.flink.types.Row; import ru.yandex.clickhouse.ClickHouseConnection; -import javax.annotation.Nullable; - import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; @@ -73,7 +71,6 @@ public Config getConfig() { return config; } - @Nullable @Override public void outputBatch(FlinkEnvironment env, DataSet dataSet) { ClickhouseOutputFormat clickhouseOutputFormat = new ClickhouseOutputFormat(config, shardMetadata, fields, tableSchema); diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java index cb392b9a0b2..efd4eb7d504 100644 --- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java +++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.java @@ -224,7 +224,7 @@ private Map initFieldInjectFunctionMap() break; } } - result.put(field, function); + result.put(fieldType, function); } return result; } diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml index e3fb148aa00..38c2f6db4d9 100644 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/pom.xml @@ -36,12 +36,35 @@ ${project.version} + + org.apache.sshd + sshd-scp + + + + org.apache.commons + commons-lang3 + + com.clickhouse clickhouse-http-client 0.3.2-patch9 + + + commons-io + commons-io + 2.11.0 + + + + com.clickhouse + clickhouse-jdbc + 0.3.2-patch9 + + \ No newline at end of file diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java new file mode 100644 index 00000000000..cec1f48bb73 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.config; + +public enum ClickhouseFileCopyMethod { + SCP("scp"), + RSYNC("rsync"), + ; + private final String name; + + ClickhouseFileCopyMethod(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public static ClickhouseFileCopyMethod from(String name) { + for (ClickhouseFileCopyMethod clickhouseFileCopyMethod : ClickhouseFileCopyMethod.values()) { + if (clickhouseFileCopyMethod.getName().equalsIgnoreCase(name)) { + return clickhouseFileCopyMethod; + } + } + throw new IllegalArgumentException("Unknown ClickhouseFileCopyMethod: " + name); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java index 65b7af7c6d3..6563274ba1c 100644 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/Config.java @@ -17,19 +17,80 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.config; -/** - * The config of clickhouse - */ public class Config { - public static final String NODE_ADDRESS = "node_address"; + /** + * Bulk size of clickhouse jdbc + */ + public static final String BULK_SIZE = "bulk_size"; - public static final String DATABASE = "database"; + /** + * Clickhouse fields + */ + public static final String FIELDS = "fields"; public static final String SQL = "sql"; + /** + * Clickhouse server host + */ + public static final String HOST = "host"; + + /** + * Clickhouse table name + */ + public static final String TABLE = "table"; + + /** + * Clickhouse database name + */ + public static final String DATABASE = "database"; + + /** + * Clickhouse server username + */ public static final String USERNAME = "username"; + /** + * Clickhouse server password + */ public static final String PASSWORD = "password"; + /** + * Split mode when table is distributed engine + */ + public static final String SPLIT_MODE = "split_mode"; + + /** + * When split_mode is true, the sharding_key use for split + */ + public static final String SHARDING_KEY = "sharding_key"; + + /** + * ClickhouseFile sink connector used clickhouse-local program's path + */ + public static final String CLICKHOUSE_LOCAL_PATH = "clickhouse_local_path"; + + /** + * The method of copy Clickhouse file + */ + public static final String COPY_METHOD = "copy_method"; + + /** + * The size of each batch read temporary data into local file. + */ + public static final String TMP_BATCH_CACHE_LINE = "tmp_batch_cache_line"; + + /** + * The password of Clickhouse server node + */ + public static final String NODE_PASS = "node_pass"; + + /** + * The address of Clickhouse server node + */ + public static final String NODE_ADDRESS = "node_address"; + + public static final String CLICKHOUSE_PREFIX = "clickhouse."; + } diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java new file mode 100644 index 00000000000..72081078973 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.config; + +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +public class FileReaderOption implements Serializable { + + private ShardMetadata shardMetadata; + private Map tableSchema; + private List fields; + private String clickhouseLocalPath; + private ClickhouseFileCopyMethod copyMethod; + private boolean nodeFreePass; + private Map nodePassword; + private SeaTunnelRowType seaTunnelRowType; + + public FileReaderOption(ShardMetadata shardMetadata, Map tableSchema, + List fields, String clickhouseLocalPath, + ClickhouseFileCopyMethod copyMethod, + Map nodePassword) { + this.shardMetadata = shardMetadata; + this.tableSchema = tableSchema; + this.fields = fields; + this.clickhouseLocalPath = clickhouseLocalPath; + this.copyMethod = copyMethod; + this.nodePassword = nodePassword; + } + + public SeaTunnelRowType getSeaTunnelRowType() { + return seaTunnelRowType; + } + + public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + public boolean isNodeFreePass() { + return nodeFreePass; + } + + public void setNodeFreePass(boolean nodeFreePass) { + this.nodeFreePass = nodeFreePass; + } + + public String getClickhouseLocalPath() { + return clickhouseLocalPath; + } + + public void setClickhouseLocalPath(String clickhouseLocalPath) { + this.clickhouseLocalPath = clickhouseLocalPath; + } + + public ClickhouseFileCopyMethod getCopyMethod() { + return copyMethod; + } + + public void setCopyMethod(ClickhouseFileCopyMethod copyMethod) { + this.copyMethod = copyMethod; + } + + public Map getNodePassword() { + return nodePassword; + } + + public void setNodePassword(Map nodePassword) { + this.nodePassword = nodePassword; + } + + public ShardMetadata getShardMetadata() { + return shardMetadata; + } + + public void setShardMetadata(ShardMetadata shardMetadata) { + this.shardMetadata = shardMetadata; + } + + public Map getTableSchema() { + return tableSchema; + } + + public void setTableSchema(Map tableSchema) { + this.tableSchema = tableSchema; + } + + public List getFields() { + return fields; + } + + public void setFields(List fields) { + this.fields = fields; + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java new file mode 100644 index 00000000000..084f54bcc98 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.config; + +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class ReaderOption implements Serializable { + + private ShardMetadata shardMetadata; + private List fields; + + private Map tableSchema; + private SeaTunnelRowType seaTunnelRowType; + private Properties properties; + private int bulkSize; + + public ReaderOption(ShardMetadata shardMetadata, + Properties properties, List fields, Map tableSchema, int bulkSize) { + this.shardMetadata = shardMetadata; + this.properties = properties; + this.fields = fields; + this.tableSchema = tableSchema; + this.bulkSize = bulkSize; + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + public ShardMetadata getShardMetadata() { + return shardMetadata; + } + + public void setShardMetadata(ShardMetadata shardMetadata) { + this.shardMetadata = shardMetadata; + } + + public SeaTunnelRowType getSeaTunnelRowType() { + return seaTunnelRowType; + } + + public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + public Map getTableSchema() { + return tableSchema; + } + + public void setTableSchema(Map tableSchema) { + this.tableSchema = tableSchema; + } + + public List getFields() { + return fields; + } + + public void setFields(List fields) { + this.fields = fields; + } + + public int getBulkSize() { + return bulkSize; + } + + public void setBulkSize(int bulkSize) { + this.bulkSize = bulkSize; + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java new file mode 100644 index 00000000000..2fe72632540 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.shard; + +import com.clickhouse.client.ClickHouseCredentials; +import com.clickhouse.client.ClickHouseNode; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.Objects; + +public class Shard implements Serializable { + private static final long serialVersionUID = -1L; + + private final int shardNum; + private final int replicaNum; + + private final ClickHouseNode node; + + // cache the hash code + private int hashCode = -1; + + public Shard(int shardNum, + int shardWeight, + int replicaNum, + String hostname, + String hostAddress, + int port, + String database, + String username, + String password) { + this.shardNum = shardNum; + this.replicaNum = replicaNum; + this.node = ClickHouseNode.builder().host(hostname).address(InetSocketAddress.createUnresolved(hostAddress, + port)).database(database).weight(shardWeight).credentials(ClickHouseCredentials.fromUserAndPassword(username, password)).build(); + } + + public Shard(int shardNum, int replicaNum, ClickHouseNode node) { + this.shardNum = shardNum; + this.replicaNum = replicaNum; + this.node = node; + } + + public int getShardNum() { + return shardNum; + } + + public int getReplicaNum() { + return replicaNum; + } + + public ClickHouseNode getNode() { + return node; + } + + public String getJdbcUrl() { + return "jdbc:clickhouse://" + node.getAddress().getHostName() + + ":" + node.getAddress().getPort() + "/" + node.getDatabase().get(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Shard shard = (Shard) o; + return shardNum == shard.shardNum + && replicaNum == shard.replicaNum + && hashCode == shard.hashCode + && Objects.equals(node, shard.node); + } + + @Override + public int hashCode() { + if (hashCode == -1) { + hashCode = Objects.hash(shardNum, replicaNum, node, hashCode); + } + return hashCode; + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java new file mode 100644 index 00000000000..3c01922f1be --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/ShardMetadata.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.shard; + +import java.io.Serializable; +import java.util.Objects; + +public class ShardMetadata implements Serializable { + + private static final long serialVersionUID = -1L; + + private String shardKey; + private String shardKeyType; + private String database; + private String table; + private boolean splitMode; + private Shard defaultShard; + private String username; + private String password; + + public ShardMetadata(String shardKey, + String shardKeyType, + String database, + String table, + boolean splitMode, + Shard defaultShard, + String username, + String password) { + this.shardKey = shardKey; + this.shardKeyType = shardKeyType; + this.database = database; + this.table = table; + this.splitMode = splitMode; + this.defaultShard = defaultShard; + this.username = username; + this.password = password; + } + + public String getShardKey() { + return shardKey; + } + + public void setShardKey(String shardKey) { + this.shardKey = shardKey; + } + + public String getShardKeyType() { + return shardKeyType; + } + + public void setShardKeyType(String shardKeyType) { + this.shardKeyType = shardKeyType; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public boolean getSplitMode() { + return splitMode; + } + + public void setSplitMode(boolean splitMode) { + this.splitMode = splitMode; + } + + public Shard getDefaultShard() { + return defaultShard; + } + + public void setDefaultShard(Shard defaultShard) { + this.defaultShard = defaultShard; + } + + public boolean isSplitMode() { + return splitMode; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ShardMetadata that = (ShardMetadata) o; + return splitMode == that.splitMode + && Objects.equals(shardKey, that.shardKey) + && Objects.equals(shardKeyType, that.shardKeyType) + && Objects.equals(database, that.database) + && Objects.equals(table, that.table) + && Objects.equals(defaultShard, that.defaultShard) + && Objects.equals(username, that.username) + && Objects.equals(password, that.password); + } + + @Override + public int hashCode() { + return Objects.hash(shardKey, shardKeyType, database, table, splitMode, defaultShard, username, password); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java new file mode 100644 index 00000000000..6a15d591977 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/DistributedEngine.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink; + +import java.io.Serializable; + +public class DistributedEngine implements Serializable { + + private static final long serialVersionUID = -1L; + private String clusterName; + private String database; + private String table; + + public DistributedEngine(String clusterName, String database, String table) { + this.clusterName = clusterName; + this.database = database; + this.table = table; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java new file mode 100644 index 00000000000..ae525acee8f --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseBatchStatement.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client; + +import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder; + +import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl; + +import java.sql.PreparedStatement; + +public class ClickhouseBatchStatement { + + private final ClickHouseConnectionImpl clickHouseConnection; + private final PreparedStatement preparedStatement; + private final IntHolder intHolder; + + public ClickhouseBatchStatement(ClickHouseConnectionImpl clickHouseConnection, + PreparedStatement preparedStatement, + IntHolder intHolder) { + this.clickHouseConnection = clickHouseConnection; + this.preparedStatement = preparedStatement; + this.intHolder = intHolder; + } + + public ClickHouseConnectionImpl getClickHouseConnection() { + return clickHouseConnection; + } + + public PreparedStatement getPreparedStatement() { + return preparedStatement; + } + + public IntHolder getIntHolder() { + return intHolder; + } + +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java new file mode 100644 index 00000000000..3a3aa082c6b --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client; + +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable; + +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseFormat; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseRecord; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.ClickHouseResponse; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +@SuppressWarnings("magicnumber") +public class ClickhouseProxy { + + private final ClickHouseRequest clickhouseRequest; + private final ClickHouseClient client; + + private final Map shardToDataSource = new ConcurrentHashMap<>(16); + + public ClickhouseProxy(ClickHouseNode node) { + this.client = ClickHouseClient.newInstance(node.getProtocol()); + this.clickhouseRequest = + client.connect(node).format(ClickHouseFormat.RowBinaryWithNamesAndTypes); + + } + + public ClickHouseRequest getClickhouseConnection() { + return this.clickhouseRequest; + } + + public ClickHouseRequest getClickhouseConnection(Shard shard) { + ClickHouseClient c = shardToDataSource + .computeIfAbsent(shard, s -> ClickHouseClient.newInstance(s.getNode().getProtocol())); + return c.connect(shard.getNode()).format(ClickHouseFormat.RowBinaryWithNamesAndTypes); + } + + public DistributedEngine getClickhouseDistributedTable(String database, String table) { + ClickHouseRequest request = getClickhouseConnection(); + return getClickhouseDistributedTable(request, database, table); + } + + public DistributedEngine getClickhouseDistributedTable(ClickHouseRequest connection, String database, + String table) { + String sql = String.format("select engine_full from system.tables where database = '%s' and name = '%s' and engine = 'Distributed'", database, table); + try (ClickHouseResponse response = connection.query(sql).executeAndWait()) { + List records = response.stream().collect(Collectors.toList()); + if (!records.isEmpty()) { + ClickHouseRecord record = records.get(0); + // engineFull field will be like : Distributed(cluster, database, table[, sharding_key[, policy_name]]) + String engineFull = record.getValue(0).asString(); + List infos = Arrays.stream(engineFull.substring(12).split(",")) + .map(s -> s.replace("'", "").trim()).collect(Collectors.toList()); + return new DistributedEngine(infos.get(0), infos.get(1), infos.get(2).replace("\\)", "").trim()); + } + throw new RuntimeException("Cannot get distributed table from clickhouse, resultSet is empty"); + } catch (ClickHouseException e) { + throw new RuntimeException("Cannot get distributed table from clickhouse", e); + } + } + + /** + * Get ClickHouse table schema, the key is fileName, value is value type. + * + * @param table table name. + * @return schema map. + */ + public Map getClickhouseTableSchema(String table) { + ClickHouseRequest request = getClickhouseConnection(); + return getClickhouseTableSchema(request, table); + } + + public Map getClickhouseTableSchema(ClickHouseRequest request, String table) { + String sql = "desc " + table; + Map schema = new LinkedHashMap<>(); + try (ClickHouseResponse response = request.query(sql).executeAndWait()) { + response.records().forEach(r -> schema.put(r.getValue(0).asString(), r.getValue(1).asString())); + } catch (ClickHouseException e) { + throw new RuntimeException("Cannot get table schema from clickhouse", e); + } + return schema; + } + + /** + * Get the shard of the given cluster. + * + * @param connection clickhouse connection. + * @param clusterName cluster name. + * @param database database of the shard. + * @param port port of the shard. + * @return shard list. + */ + public List getClusterShardList(ClickHouseRequest connection, String clusterName, + String database, int port, String username, String password) { + String sql = "select shard_num,shard_weight,replica_num,host_name,host_address,port from system.clusters where cluster = '" + clusterName + "'"; + List shardList = new ArrayList<>(); + try (ClickHouseResponse response = connection.query(sql).executeAndWait()) { + response.records().forEach(r -> { + shardList.add(new Shard( + r.getValue(0).asInteger(), + r.getValue(1).asInteger(), + r.getValue(2).asInteger(), + r.getValue(3).asString(), + r.getValue(4).asString(), + port, database, username, password)); + }); + return shardList; + } catch (ClickHouseException e) { + throw new RuntimeException("Cannot get cluster shard list from clickhouse", e); + } + } + + /** + * Get ClickHouse table info. + * + * @param database database of the table. + * @param table table name of the table. + * @return clickhouse table info. + */ + public ClickhouseTable getClickhouseTable(String database, String table) { + String sql = String.format("select engine,create_table_query,engine_full,data_paths from system.tables where database = '%s' and name = '%s'", database, table); + try (ClickHouseResponse response = clickhouseRequest.query(sql).executeAndWait()) { + List records = response.stream().collect(Collectors.toList()); + if (records.isEmpty()) { + throw new RuntimeException("Cannot get table from clickhouse, resultSet is empty"); + } + ClickHouseRecord record = records.get(0); + String engine = record.getValue(0).asString(); + String createTableDDL = record.getValue(1).asString(); + String engineFull = record.getValue(2).asString(); + List dataPaths = record.getValue(3).asTuple().stream().map(Object::toString).collect(Collectors.toList()); + DistributedEngine distributedEngine = null; + if ("Distributed".equals(engine)) { + distributedEngine = getClickhouseDistributedTable(clickhouseRequest, database, table); + String localTableSQL = String.format("select engine,create_table_query from system.tables where database = '%s' and name = '%s'", + distributedEngine.getDatabase(), distributedEngine.getTable()); + try (ClickHouseResponse rs = clickhouseRequest.query(localTableSQL).executeAndWait()) { + List localTableRecords = rs.stream().collect(Collectors.toList()); + if (localTableRecords.isEmpty()) { + throw new RuntimeException("Cannot get table from clickhouse, resultSet is empty"); + } + String localEngine = localTableRecords.get(0).getValue(0).asString(); + String createLocalTableDDL = localTableRecords.get(0).getValue(1).asString(); + createTableDDL = localizationEngine(localEngine, createLocalTableDDL); + } + } + return new ClickhouseTable( + database, + table, + distributedEngine, + engine, + createTableDDL, + engineFull, + dataPaths, + getClickhouseTableSchema(clickhouseRequest, table)); + } catch (ClickHouseException e) { + throw new RuntimeException("Cannot get clickhouse table", e); + } + + } + + /** + * Localization the engine in clickhouse local table's createTableDDL to support specific engine. + * For example: change ReplicatedMergeTree to MergeTree. + * + * @param engine original engine of clickhouse local table + * @param ddl createTableDDL of clickhouse local table + * @return createTableDDL of clickhouse local table which can support specific engine + * TODO: support more engine + */ + public String localizationEngine(String engine, String ddl) { + if ("ReplicatedMergeTree".equalsIgnoreCase(engine)) { + return ddl.replaceAll("ReplicatedMergeTree(\\([^\\)]*\\))", "MergeTree()"); + } else { + return ddl; + } + } + + public void close() { + if (this.client != null) { + this.client.close(); + } + shardToDataSource.values().forEach(ClickHouseClient::close); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java new file mode 100644 index 00000000000..295547c74d6 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client; + +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.BULK_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.CLICKHOUSE_PREFIX; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.FIELDS; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SHARDING_KEY; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SPLIT_MODE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.TABLE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelContext; +import org.apache.seatunnel.api.serialization.DefaultSerializer; +import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import com.clickhouse.client.ClickHouseNode; +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +@AutoService(SeaTunnelSink.class) +public class ClickhouseSink implements SeaTunnelSink { + + private SeaTunnelContext seaTunnelContext; + private ReaderOption option; + + @Override + public String getPluginName() { + return "Clickhouse"; + } + + @SuppressWarnings("checkstyle:MagicNumber") + @Override + public void prepare(Config config) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(config, HOST, DATABASE, TABLE, USERNAME, PASSWORD); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg()); + } + Map defaultConfig = ImmutableMap.builder() + .put(BULK_SIZE, 20_000) + .put(SPLIT_MODE, false) + .build(); + + config = config.withFallback(ConfigFactory.parseMap(defaultConfig)); + + List nodes = ClickhouseUtil.createNodes(config.getString(HOST), + config.getString(DATABASE), config.getString(USERNAME), config.getString(PASSWORD)); + + Properties clickhouseProperties = new Properties(); + if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX)) { + TypesafeConfigUtils.extractSubConfig(config, CLICKHOUSE_PREFIX, false).entrySet().forEach(e -> { + clickhouseProperties.put(e.getKey(), String.valueOf(e.getValue().unwrapped())); + }); + } + clickhouseProperties.put("user", config.getString(USERNAME)); + clickhouseProperties.put("password", config.getString(PASSWORD)); + + ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0)); + Map tableSchema = proxy.getClickhouseTableSchema(config.getString(TABLE)); + String shardKey = null; + String shardKeyType = null; + if (config.getBoolean(SPLIT_MODE)) { + ClickhouseTable table = proxy.getClickhouseTable(config.getString(DATABASE), + config.getString(TABLE)); + if (!"Distributed".equals(table.getEngine())) { + throw new IllegalArgumentException("split mode only support table which engine is " + + "'Distributed' engine at now"); + } + if (config.hasPath(SHARDING_KEY)) { + shardKey = config.getString(SHARDING_KEY); + shardKeyType = tableSchema.get(shardKey); + } + } + ShardMetadata metadata = new ShardMetadata( + shardKey, + shardKeyType, + config.getString(DATABASE), + config.getString(TABLE), + config.getBoolean(SPLIT_MODE), + new Shard(1, 1, nodes.get(0)), config.getString(USERNAME), config.getString(PASSWORD)); + List fields = new ArrayList<>(); + if (config.hasPath(FIELDS)) { + fields.addAll(config.getStringList(FIELDS)); + // check if the fields exist in schema + for (String field : fields) { + if (!tableSchema.containsKey(field)) { + throw new RuntimeException("Field " + field + " does not exist in table " + config.getString(TABLE)); + } + } + } else { + fields.addAll(tableSchema.keySet()); + } + proxy.close(); + this.option = new ReaderOption(metadata, clickhouseProperties, fields, tableSchema, config.getInt(BULK_SIZE)); + } + + @Override + public SinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new ClickhouseSinkWriter(option, context); + } + + @Override + public SinkWriter restoreWriter(SinkWriter.Context context, List states) throws IOException { + return SeaTunnelSink.super.restoreWriter(context, states); + } + + @Override + public Optional> getWriterStateSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.option.setSeaTunnelRowType(seaTunnelRowType); + } + + @Override + public SeaTunnelDataType getConsumedType() { + return this.option.getSeaTunnelRowType(); + } + + @Override + public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) { + this.seaTunnelContext = seaTunnelContext; + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java new file mode 100644 index 00000000000..604f2c609ca --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ArrayInjectFunction; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.BigDecimalInjectFunction; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ClickhouseFieldInjectFunction; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateInjectFunction; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DateTimeInjectFunction; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.DoubleInjectFunction; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.FloatInjectFunction; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.IntInjectFunction; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.LongInjectFunction; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder; + +import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ClickhouseSinkWriter implements SinkWriter { + + private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseSinkWriter.class); + + private final SinkWriter.Context context; + private final ReaderOption option; + private final ShardRouter shardRouter; + private final transient ClickhouseProxy proxy; + private final String prepareSql; + private final Map statementMap; + private final Map fieldInjectFunctionMap; + private static final ClickhouseFieldInjectFunction DEFAULT_INJECT_FUNCTION = new StringInjectFunction(); + + private static final Pattern NULLABLE = Pattern.compile("Nullable\\((.*)\\)"); + private static final Pattern LOW_CARDINALITY = Pattern.compile("LowCardinality\\((.*)\\)"); + + ClickhouseSinkWriter(ReaderOption option, SinkWriter.Context context) { + this.option = option; + this.context = context; + + this.proxy = new ClickhouseProxy(option.getShardMetadata().getDefaultShard().getNode()); + this.fieldInjectFunctionMap = initFieldInjectFunctionMap(); + this.shardRouter = new ShardRouter(proxy, option.getShardMetadata()); + this.prepareSql = initPrepareSQL(); + this.statementMap = initStatementMap(); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + + Object shardKey = null; + if (StringUtils.isNotEmpty(this.option.getShardMetadata().getShardKey())) { + int i = this.option.getSeaTunnelRowType().indexOf(this.option.getShardMetadata().getShardKey()); + shardKey = element.getField(i); + } + ClickhouseBatchStatement statement = statementMap.get(shardRouter.getShard(shardKey)); + PreparedStatement clickHouseStatement = statement.getPreparedStatement(); + IntHolder sizeHolder = statement.getIntHolder(); + // add into batch + addIntoBatch(element, clickHouseStatement); + sizeHolder.setValue(sizeHolder.getValue() + 1); + // flush batch + if (sizeHolder.getValue() >= option.getBulkSize()) { + flush(clickHouseStatement); + sizeHolder.setValue(0); + } + } + + @Override + public Optional prepareCommit() throws IOException { + return Optional.empty(); + } + + @Override + public void abortPrepare() { + + } + + @Override + public void close() throws IOException { + this.proxy.close(); + for (ClickhouseBatchStatement batchStatement : statementMap.values()) { + try (ClickHouseConnectionImpl needClosedConnection = batchStatement.getClickHouseConnection(); + PreparedStatement needClosedStatement = batchStatement.getPreparedStatement()) { + IntHolder intHolder = batchStatement.getIntHolder(); + if (intHolder.getValue() > 0) { + flush(needClosedStatement); + intHolder.setValue(0); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to close prepared statement.", e); + } + } + } + + private void addIntoBatch(SeaTunnelRow row, PreparedStatement clickHouseStatement) { + try { + for (int i = 0; i < option.getFields().size(); i++) { + String fieldName = option.getFields().get(i); + Object fieldValue = row.getField(option.getSeaTunnelRowType().indexOf(fieldName)); + if (fieldValue == null) { + // field does not exist in row + // todo: do we need to transform to default value of each type + clickHouseStatement.setObject(i + 1, null); + continue; + } + String fieldType = option.getTableSchema().get(fieldName); + fieldInjectFunctionMap + .getOrDefault(fieldType, DEFAULT_INJECT_FUNCTION) + .injectFields(clickHouseStatement, i + 1, fieldValue); + } + clickHouseStatement.addBatch(); + } catch (SQLException e) { + throw new RuntimeException("Add row data into batch error", e); + } + } + + private void flush(PreparedStatement clickHouseStatement) { + try { + clickHouseStatement.executeBatch(); + } catch (Exception e) { + throw new RuntimeException("Clickhouse execute batch statement error", e); + } + } + + private Map initStatementMap() { + Map result = new HashMap<>(Common.COLLECTION_SIZE); + shardRouter.getShards().forEach((weight, s) -> { + try { + ClickHouseConnectionImpl clickhouseConnection = new ClickHouseConnectionImpl(s.getJdbcUrl(), + this.option.getProperties()); + PreparedStatement preparedStatement = clickhouseConnection.prepareStatement(prepareSql); + IntHolder intHolder = new IntHolder(); + ClickhouseBatchStatement batchStatement = + new ClickhouseBatchStatement(clickhouseConnection, preparedStatement, intHolder); + result.put(s, batchStatement); + } catch (SQLException e) { + throw new RuntimeException("Clickhouse prepare statement error: " + e.getMessage(), e); + } + }); + return result; + } + + private String initPrepareSQL() { + String[] placeholder = new String[option.getFields().size()]; + Arrays.fill(placeholder, "?"); + + return String.format("INSERT INTO %s (%s) VALUES (%s)", + shardRouter.getShardTable(), + String.join(",", option.getFields()), + String.join(",", placeholder)); + } + + private Map initFieldInjectFunctionMap() { + Map result = new HashMap<>(Common.COLLECTION_SIZE); + List clickhouseFieldInjectFunctions = Lists.newArrayList( + new ArrayInjectFunction(), + new BigDecimalInjectFunction(), + new DateInjectFunction(), + new DateTimeInjectFunction(), + new DoubleInjectFunction(), + new FloatInjectFunction(), + new IntInjectFunction(), + new LongInjectFunction(), + new StringInjectFunction() + ); + ClickhouseFieldInjectFunction defaultFunction = new StringInjectFunction(); + // get field type + for (String field : this.option.getFields()) { + ClickhouseFieldInjectFunction function = defaultFunction; + String fieldType = this.option.getTableSchema().get(field); + for (ClickhouseFieldInjectFunction clickhouseFieldInjectFunction : clickhouseFieldInjectFunctions) { + if (clickhouseFieldInjectFunction.isCurrentFieldType(unwrapCommonPrefix(fieldType))) { + function = clickhouseFieldInjectFunction; + break; + } + } + result.put(fieldType, function); + } + return result; + } + + private String unwrapCommonPrefix(String fieldType) { + Matcher nullMatcher = NULLABLE.matcher(fieldType); + Matcher lowMatcher = LOW_CARDINALITY.matcher(fieldType); + if (nullMatcher.matches()) { + return nullMatcher.group(1); + } else if (lowMatcher.matches()) { + return lowMatcher.group(1); + } else { + return fieldType; + } + } + +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java new file mode 100644 index 00000000000..4471f8157d6 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client; + +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine; + +import com.clickhouse.client.ClickHouseRequest; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; +import org.apache.commons.lang3.StringUtils; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.TreeMap; +import java.util.concurrent.ThreadLocalRandom; + +public class ShardRouter implements Serializable { + + private static final long serialVersionUID = -1L; + + private String shardTable; + private final String table; + private int shardWeightCount; + private final TreeMap shards; + private final String shardKey; + private final String shardKeyType; + private final boolean splitMode; + + private static final XXHash64 HASH_INSTANCE = XXHashFactory.fastestInstance().hash64(); + private final ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current(); + + public ShardRouter(ClickhouseProxy proxy, ShardMetadata shardMetadata) { + this.shards = new TreeMap<>(); + this.shardKey = shardMetadata.getShardKey(); + this.shardKeyType = shardMetadata.getShardKeyType(); + this.splitMode = shardMetadata.getSplitMode(); + this.table = shardMetadata.getTable(); + if (StringUtils.isNotEmpty(shardKey) && StringUtils.isEmpty(shardKeyType)) { + throw new IllegalArgumentException("Shard key " + shardKey + " not found in table " + table); + } + ClickHouseRequest connection = proxy.getClickhouseConnection(); + if (splitMode) { + DistributedEngine localTable = proxy.getClickhouseDistributedTable(connection, shardMetadata.getDatabase(), table); + this.shardTable = localTable.getTable(); + List shardList = proxy.getClusterShardList(connection, localTable.getClusterName(), + localTable.getDatabase(), shardMetadata.getDefaultShard().getNode().getPort(), + shardMetadata.getUsername(), shardMetadata.getPassword()); + int weight = 0; + for (Shard shard : shardList) { + shards.put(weight, shard); + weight += shard.getNode().getWeight(); + } + shardWeightCount = weight; + } else { + shards.put(0, shardMetadata.getDefaultShard()); + } + } + + public String getShardTable() { + return splitMode ? shardTable : table; + } + + public Shard getShard(Object shardValue) { + if (!splitMode) { + return shards.firstEntry().getValue(); + } + if (StringUtils.isEmpty(shardKey) || shardValue == null) { + return shards.lowerEntry(threadLocalRandom.nextInt(shardWeightCount + 1)).getValue(); + } + int offset = (int) (HASH_INSTANCE.hash(ByteBuffer.wrap(shardValue.toString().getBytes(StandardCharsets.UTF_8)), + 0) & Long.MAX_VALUE % shardWeightCount); + return shards.lowerEntry(offset + 1).getValue(); + } + + public TreeMap getShards() { + return shards; + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java new file mode 100644 index 00000000000..05c5112920a --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file; + +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.CLICKHOUSE_LOCAL_PATH; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.COPY_METHOD; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.FIELDS; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_ADDRESS; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_PASS; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SHARDING_KEY; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.TABLE; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelContext; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import com.clickhouse.client.ClickHouseNode; +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@AutoService(SeaTunnelSink.class) +public class ClickhouseFileSink implements SeaTunnelSink { + + private SeaTunnelContext seaTunnelContext; + private FileReaderOption readerOption; + + @Override + public String getPluginName() { + return "ClickhouseFile"; + } + + @Override + public void prepare(Config config) throws PrepareFailException { + CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH); + if (!checkResult.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SINK, checkResult.getMsg()); + } + Map defaultConfigs = ImmutableMap.builder() + .put(COPY_METHOD, ClickhouseFileCopyMethod.SCP.getName()) + .build(); + + config = config.withFallback(ConfigFactory.parseMap(defaultConfigs)); + List nodes = ClickhouseUtil.createNodes(config.getString(HOST), + config.getString(DATABASE), config.getString(USERNAME), config.getString(PASSWORD)); + + ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0)); + Map tableSchema = proxy.getClickhouseTableSchema(config.getString(TABLE)); + String shardKey = null; + String shardKeyType = null; + if (config.hasPath(SHARDING_KEY)) { + shardKey = config.getString(SHARDING_KEY); + shardKeyType = tableSchema.get(shardKey); + } + ShardMetadata shardMetadata = new ShardMetadata( + shardKey, + shardKeyType, + config.getString(DATABASE), + config.getString(TABLE), + false, // we don't need to set splitMode in clickhouse file mode. + new Shard(1, 1, nodes.get(0)), config.getString(USERNAME), config.getString(PASSWORD)); + List fields; + if (config.hasPath(FIELDS)) { + fields = config.getStringList(FIELDS); + // check if the fields exist in schema + for (String field : fields) { + if (!tableSchema.containsKey(field)) { + throw new RuntimeException("Field " + field + " does not exist in table " + config.getString(TABLE)); + } + } + } else { + fields = new ArrayList<>(tableSchema.keySet()); + } + Map nodePassword = config.getObjectList(NODE_PASS).stream() + .collect(Collectors.toMap(configObject -> configObject.toConfig().getString(NODE_ADDRESS), + configObject -> configObject.toConfig().getString(PASSWORD))); + + proxy.close(); + this.readerOption = new FileReaderOption(shardMetadata, tableSchema, fields, config.getString(CLICKHOUSE_LOCAL_PATH), + ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD)), nodePassword); + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.readerOption.setSeaTunnelRowType(seaTunnelRowType); + } + + @Override + public SeaTunnelDataType getConsumedType() { + return this.readerOption.getSeaTunnelRowType(); + } + + @Override + public SinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new ClickhouseFileSinkWriter(readerOption, context); + } + + @Override + public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) { + this.seaTunnelContext = seaTunnelContext; + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java new file mode 100644 index 00000000000..c6b6bfa1fae --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; + +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.ClickHouseResponse; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class ClickhouseFileSinkWriter implements SinkWriter { + private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseFileSinkWriter.class); + private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/seatunnel-file"; + private static final int UUID_LENGTH = 10; + private final FileReaderOption readerOption; + private final ShardRouter shardRouter; + private final ClickhouseProxy proxy; + private final ClickhouseTable clickhouseTable; + private final Map> shardLocalDataPaths; + private final Map> rowCache; + + public ClickhouseFileSinkWriter(FileReaderOption readerOption, SinkWriter.Context context) { + this.readerOption = readerOption; + proxy = new ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode()); + shardRouter = new ShardRouter(proxy, this.readerOption.getShardMetadata()); + clickhouseTable = proxy.getClickhouseTable(this.readerOption.getShardMetadata().getDatabase(), + this.readerOption.getShardMetadata().getTable()); + rowCache = new HashMap<>(Common.COLLECTION_SIZE); + + nodePasswordCheck(); + + // find file local save path of each node + shardLocalDataPaths = shardRouter.getShards().values().stream() + .collect(Collectors.toMap(Function.identity(), shard -> { + ClickhouseTable shardTable = proxy.getClickhouseTable(shard.getNode().getDatabase().get(), + clickhouseTable.getLocalTableName()); + return shardTable.getDataPaths(); + })); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + Shard shard = shardRouter.getShard(element); + rowCache.computeIfAbsent(shard, k -> new ArrayList<>()).add(element); + } + + private void nodePasswordCheck() { + if (!this.readerOption.isNodeFreePass()) { + shardRouter.getShards().values().forEach(shard -> { + if (!this.readerOption.getNodePassword().containsKey(shard.getNode().getAddress().getHostName()) + && !this.readerOption.getNodePassword().containsKey(shard.getNode().getHost())) { + throw new RuntimeException("Cannot find password of shard " + shard.getNode().getAddress().getHostName()); + } + }); + } + } + + @Override + public Optional prepareCommit() throws IOException { + return Optional.empty(); + } + + @Override + public void abortPrepare() { + + } + + @Override + public void close() throws IOException { + rowCache.forEach(this::flush); + } + + private void flush(Shard shard, List rows) { + try { + // generate clickhouse local file + // TODO generate file by sub rows to save memory + List clickhouseLocalFiles = generateClickhouseLocalFiles(rows); + // move file to server + attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles); + // clear local file + clearLocalFileDirectory(clickhouseLocalFiles); + } catch (Exception e) { + throw new RuntimeException("Flush data into clickhouse file error", e); + } + } + + private List generateClickhouseLocalFiles(List rows) throws IOException, + InterruptedException { + if (rows.isEmpty()) { + return Collections.emptyList(); + } + String uuid = UUID.randomUUID().toString().substring(0, UUID_LENGTH).replaceAll("-", "_"); + String clickhouseLocalFile = String.format("%s/%s", CLICKHOUSE_LOCAL_FILE_PREFIX, uuid); + FileUtils.forceMkdir(new File(clickhouseLocalFile)); + String clickhouseLocalFileTmpFile = clickhouseLocalFile + "/local_data.log"; + try (FileChannel fileChannel = FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), StandardOpenOption.WRITE, + StandardOpenOption.READ, StandardOpenOption.CREATE_NEW)) { + String data = rows.stream() + .map(row -> this.readerOption.getFields().stream().map(field -> row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString()) + .collect(Collectors.joining("\t"))) + .collect(Collectors.joining("\n")); + MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(), + data.getBytes(StandardCharsets.UTF_8).length); + buffer.put(data.getBytes(StandardCharsets.UTF_8)); + } + + List localPaths = Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(" ")) + .collect(Collectors.toList()); + List command = new ArrayList<>(localPaths); + if (localPaths.size() == 1) { + command.add("local"); + } + command.add("--file"); + command.add(clickhouseLocalFileTmpFile); + command.add("-S"); + command.add("\"" + this.readerOption.getFields().stream().map(field -> field + " " + readerOption.getTableSchema().get(field)).collect(Collectors.joining(",")) + "\""); + command.add("-N"); + command.add("\"" + "temp_table" + uuid + "\""); + command.add("-q"); + command.add(String.format( + "\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"", + clickhouseTable.getCreateTableDDL().replace(clickhouseTable.getDatabase() + ".", "").replaceAll("`", ""), + clickhouseTable.getLocalTableName(), + readerOption.getTableSchema().keySet().stream().map(s -> { + if (readerOption.getFields().contains(s)) { + return s; + } else { + return "NULL"; + } + }).collect(Collectors.joining(",")), + uuid)); + command.add("--path"); + command.add("\"" + clickhouseLocalFile + "\""); + LOGGER.info("Generate clickhouse local file command: {}", String.join(" ", command)); + ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", command)); + Process start = processBuilder.start(); + // we just wait for the process to finish + try (InputStream inputStream = start.getInputStream(); + InputStreamReader inputStreamReader = new InputStreamReader(inputStream); + BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) { + String line; + while ((line = bufferedReader.readLine()) != null) { + LOGGER.info(line); + } + } + start.waitFor(); + File file = new File(clickhouseLocalFile + "/data/_local/" + clickhouseTable.getLocalTableName()); + if (!file.exists()) { + throw new RuntimeException("clickhouse local file not exists"); + } + File[] files = file.listFiles(); + if (files == null) { + throw new RuntimeException("clickhouse local file not exists"); + } + return Arrays.stream(files) + .filter(File::isDirectory) + .filter(f -> !"detached".equals(f.getName())) + .map(File::getAbsolutePath).collect(Collectors.toList()); + } + + private void attachClickhouseLocalFileToServer(Shard shard, List clickhouseLocalFiles) throws ClickHouseException { + if (ClickhouseFileCopyMethod.SCP.equals(this.readerOption.getCopyMethod())) { + String hostAddress = shard.getNode().getAddress().getHostName(); + String password = readerOption.getNodePassword().getOrDefault(hostAddress, null); + FileTransfer fileTransfer = new ScpFileTransfer(hostAddress, password); + fileTransfer.init(); + fileTransfer.transferAndChown(clickhouseLocalFiles, shardLocalDataPaths.get(shard).get(0) + "detached/"); + fileTransfer.close(); + } else { + throw new RuntimeException("unsupported clickhouse file copy method " + readerOption.getCopyMethod()); + } + + ClickHouseRequest request = proxy.getClickhouseConnection(shard); + for (String clickhouseLocalFile : clickhouseLocalFiles) { + ClickHouseResponse response = request.query(String.format("ALTER TABLE %s ATTACH PART '%s'", + clickhouseTable.getLocalTableName(), + clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 1))).executeAndWait(); + response.close(); + } + } + + private void clearLocalFileDirectory(List clickhouseLocalFiles) { + String clickhouseLocalFile = clickhouseLocalFiles.get(0); + String localFileDir = clickhouseLocalFile.substring(0, CLICKHOUSE_LOCAL_FILE_PREFIX.length() + UUID_LENGTH + 1); + try { + File file = new File(localFileDir); + if (file.exists()) { + FileUtils.deleteDirectory(new File(localFileDir)); + } + } catch (IOException e) { + throw new RuntimeException("Unable to delete directory " + localFileDir, e); + } + } + +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java new file mode 100644 index 00000000000..a6c8e0fe015 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseTable.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file; + +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine; + +import java.util.List; +import java.util.Map; + +public class ClickhouseTable { + + private String database; + private String tableName; + private String engine; + private String engineFull; + private String createTableDDL; + private List dataPaths; + private final DistributedEngine distributedEngine; + private Map tableSchema; + + public ClickhouseTable(String database, + String tableName, + DistributedEngine distributedEngine, + String engine, + String createTableDDL, + String engineFull, + List dataPaths, + Map tableSchema) { + this.database = database; + this.tableName = tableName; + this.distributedEngine = distributedEngine; + this.engine = engine; + this.engineFull = engineFull; + this.createTableDDL = createTableDDL; + this.dataPaths = dataPaths; + this.tableSchema = tableSchema; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getEngine() { + return engine; + } + + public void setEngine(String engine) { + this.engine = engine; + } + + public String getEngineFull() { + return engineFull; + } + + public void setEngineFull(String engineFull) { + this.engineFull = engineFull; + } + + public String getCreateTableDDL() { + return createTableDDL; + } + + public void setCreateTableDDL(String createTableDDL) { + this.createTableDDL = createTableDDL; + } + + public List getDataPaths() { + return dataPaths; + } + + public void setDataPaths(List dataPaths) { + this.dataPaths = dataPaths; + } + + public Map getTableSchema() { + return tableSchema; + } + + public void setTableSchema(Map tableSchema) { + this.tableSchema = tableSchema; + } + + public String getLocalTableName() { + if (distributedEngine != null) { + return distributedEngine.getTable(); + } else { + return tableName; + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransfer.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransfer.java new file mode 100644 index 00000000000..ca581f0ea0e --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransfer.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file; + +import java.util.List; + +public interface FileTransfer { + + void init(); + + void transferAndChown(String sourcePath, String targetPath); + + void transferAndChown(List sourcePath, String targetPath); + + void close(); +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java new file mode 100644 index 00000000000..6fa83794ce7 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file; + +import org.apache.commons.lang3.StringUtils; +import org.apache.sshd.client.SshClient; +import org.apache.sshd.client.session.ClientSession; +import org.apache.sshd.scp.client.ScpClient; +import org.apache.sshd.scp.client.ScpClientCreator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ScpFileTransfer implements FileTransfer { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScpFileTransfer.class); + + private static final int SCP_PORT = 22; + + private final String host; + private final String password; + + private ScpClient scpClient; + private ClientSession clientSession; + private SshClient sshClient; + + public ScpFileTransfer(String host, String password) { + this.host = host; + this.password = password; + } + + @Override + public void init() { + try { + sshClient = SshClient.setUpDefaultClient(); + sshClient.start(); + clientSession = sshClient.connect("root", host, SCP_PORT).verify().getSession(); + if (password != null) { + clientSession.addPasswordIdentity(password); + } + // TODO support add publicKey to identity + if (!clientSession.auth().verify().isSuccess()) { + throw new IOException("ssh host " + host + "authentication failed"); + } + scpClient = ScpClientCreator.instance().createScpClient(clientSession); + } catch (IOException e) { + throw new RuntimeException("Failed to connect to host: " + host + " by user: root on port 22", e); + } + } + + @Override + public void transferAndChown(String sourcePath, String targetPath) { + try { + scpClient.upload( + sourcePath, + targetPath, + ScpClient.Option.Recursive, + ScpClient.Option.TargetIsDirectory, + ScpClient.Option.PreserveAttributes); + } catch (IOException e) { + throw new RuntimeException("Scp failed to transfer file: " + sourcePath + " to: " + targetPath, e); + } + // remote exec command to change file owner. Only file owner equal with server's clickhouse user can + // make ATTACH command work. + List command = new ArrayList<>(); + command.add("ls"); + command.add("-l"); + command.add(targetPath.substring(0, + StringUtils.stripEnd(targetPath, "/").lastIndexOf("/")) + "/"); + command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath); + try { + String finalCommand = String.join(" ", command); + LOGGER.info("execute remote command: " + finalCommand); + clientSession.executeRemoteCommand(finalCommand); + } catch (IOException e) { + // always return error cause xargs return shell command result + } + } + + @Override + public void transferAndChown(List sourcePaths, String targetPath) { + if (sourcePaths == null) { + throw new IllegalArgumentException("sourcePath is null"); + } + sourcePaths.forEach(sourcePath -> transferAndChown(sourcePath, targetPath)); + } + + @Override + public void close() { + if (clientSession != null && clientSession.isOpen()) { + try { + clientSession.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to close ssh session", e); + } + } + if (sshClient != null && sshClient.isOpen()) { + sshClient.stop(); + try { + sshClient.close(); + } catch (IOException e) { + throw new RuntimeException("Failed to close ssh client", e); + } + } + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java new file mode 100644 index 00000000000..c564e5501d1 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.regex.Pattern; + +public class ArrayInjectFunction implements ClickhouseFieldInjectFunction { + + private static final Pattern PATTERN = Pattern.compile("(Array.*)"); + + @Override + public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException { + statement.setArray(index, (java.sql.Array) value); + } + + @Override + public boolean isCurrentFieldType(String fieldType) { + return PATTERN.matcher(fieldType).matches(); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/BigDecimalInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/BigDecimalInjectFunction.java new file mode 100644 index 00000000000..25c73ab3f36 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/BigDecimalInjectFunction.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.regex.Pattern; + +public class BigDecimalInjectFunction implements ClickhouseFieldInjectFunction { + + private static final Pattern PATTERN = Pattern.compile("(Decimal.*)"); + + @Override + public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException { + statement.setBigDecimal(index, (java.math.BigDecimal) value); + } + + @Override + public boolean isCurrentFieldType(String fieldType) { + return PATTERN.matcher(fieldType).matches(); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ClickhouseFieldInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ClickhouseFieldInjectFunction.java new file mode 100644 index 00000000000..3e27a634396 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ClickhouseFieldInjectFunction.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject; + +import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +/** + * Injects a field into a ClickHouse statement, used to transform a java type into a ClickHouse type. + */ +public interface ClickhouseFieldInjectFunction extends Serializable { + + /** + * Inject the value into the statement. + * + * @param statement statement to inject into + * @param value value to inject + * @param index index in the statement + */ + void injectFields(PreparedStatement statement, int index, Object value) throws SQLException; + + /** + * If the fieldType need to be injected by the current function. + * + * @param fieldType field type to inject + * @return true if the fieldType need to be injected by the current function + */ + boolean isCurrentFieldType(String fieldType); + +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateInjectFunction.java new file mode 100644 index 00000000000..7a0b0b64826 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateInjectFunction.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject; + +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class DateInjectFunction implements ClickhouseFieldInjectFunction { + @Override + public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException { + if (value instanceof Date) { + statement.setDate(index, (Date) value); + } else { + statement.setDate(index, Date.valueOf(value.toString())); + } + } + + @Override + public boolean isCurrentFieldType(String fieldType) { + return "Date".equals(fieldType); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java new file mode 100644 index 00000000000..b85c56afbda --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DateTimeInjectFunction.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Timestamp; + +public class DateTimeInjectFunction implements ClickhouseFieldInjectFunction { + @Override + public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException { + if (value instanceof Timestamp) { + statement.setTimestamp(index, (Timestamp) value); + } else { + statement.setTimestamp(index, Timestamp.valueOf(value.toString())); + } + } + + @Override + public boolean isCurrentFieldType(String fieldType) { + return "DateTime".equals(fieldType); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DoubleInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DoubleInjectFunction.java new file mode 100644 index 00000000000..c416d110cbb --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/DoubleInjectFunction.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class DoubleInjectFunction implements ClickhouseFieldInjectFunction { + @Override + public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException { + if (value instanceof BigDecimal) { + statement.setDouble(index, ((BigDecimal) value).doubleValue()); + } else { + statement.setDouble(index, (Double) value); + } + } + + @Override + public boolean isCurrentFieldType(String fieldType) { + return "UInt32".equals(fieldType) + || "UInt64".equals(fieldType) + || "Int64".equals(fieldType) + || "Float64".equals(fieldType); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/FloatInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/FloatInjectFunction.java new file mode 100644 index 00000000000..84464808b76 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/FloatInjectFunction.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject; + +import java.math.BigDecimal; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class FloatInjectFunction implements ClickhouseFieldInjectFunction { + @Override + public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException { + if (value instanceof BigDecimal) { + statement.setFloat(index, ((BigDecimal) value).floatValue()); + } else { + statement.setFloat(index, (Float) value); + } + } + + @Override + public boolean isCurrentFieldType(String fieldType) { + return "Float32".equals(fieldType); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/IntInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/IntInjectFunction.java new file mode 100644 index 00000000000..f6e8c27dc87 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/IntInjectFunction.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class IntInjectFunction implements ClickhouseFieldInjectFunction { + @Override + public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException { + if (value instanceof Byte) { + statement.setByte(index, (Byte) value); + + } else if (value instanceof Short) { + statement.setShort(index, (Short) value); + + } else { + statement.setInt(index, (Integer) value); + + } + } + + @Override + public boolean isCurrentFieldType(String fieldType) { + return "Int8".equals(fieldType) + || "UInt8".equals(fieldType) + || "Int16".equals(fieldType) + || "UInt16".equals(fieldType) + || "Int32".equals(fieldType); + } + +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/LongInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/LongInjectFunction.java new file mode 100644 index 00000000000..ccd3e60b8ea --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/LongInjectFunction.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class LongInjectFunction implements ClickhouseFieldInjectFunction { + + @Override + public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException { + statement.setLong(index, (Long) value); + } + + @Override + public boolean isCurrentFieldType(String fieldType) { + return "UInt32".equals(fieldType) + || "UInt64".equals(fieldType) + || "Int64".equals(fieldType); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java new file mode 100644 index 00000000000..4894774dc85 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class StringInjectFunction implements ClickhouseFieldInjectFunction { + + @Override + public void injectFields(PreparedStatement statement, int index, Object value) throws SQLException { + statement.setString(index, value.toString()); + } + + @Override + public boolean isCurrentFieldType(String fieldType) { + return "String".equals(fieldType); + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java index c5166d8b710..a94344e72cd 100644 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java @@ -18,7 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.DATABASE; -import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.NODE_ADDRESS; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.HOST; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.SQL; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Config.USERNAME; @@ -37,22 +37,19 @@ import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil; import org.apache.seatunnel.shade.com.typesafe.config.Config; import com.clickhouse.client.ClickHouseClient; -import com.clickhouse.client.ClickHouseCredentials; import com.clickhouse.client.ClickHouseException; import com.clickhouse.client.ClickHouseFormat; import com.clickhouse.client.ClickHouseNode; -import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.ClickHouseResponse; import com.google.auto.service.AutoService; -import java.util.Arrays; import java.util.List; -import java.util.stream.Collectors; @AutoService(SeaTunnelSource.class) public class ClickhouseSource implements SeaTunnelSource { @@ -68,17 +65,12 @@ public String getPluginName() { @Override public void prepare(Config config) throws PrepareFailException { - CheckResult result = CheckConfigUtil.checkAllExists(config, NODE_ADDRESS, DATABASE, SQL, USERNAME, PASSWORD); + CheckResult result = CheckConfigUtil.checkAllExists(config, HOST, DATABASE, SQL, USERNAME, PASSWORD); if (!result.isSuccess()) { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } - servers = Arrays.stream(config.getString(NODE_ADDRESS).split(",")).map(address -> { - String[] nodeAndPort = address.split(":", 2); - return ClickHouseNode.builder().host(nodeAndPort[0]).port(ClickHouseProtocol.HTTP, - Integer.parseInt(nodeAndPort[1])).database(config.getString(DATABASE)) - .credentials(ClickHouseCredentials.fromUserAndPassword(config.getString(USERNAME), - config.getString(PASSWORD))).build(); - }).collect(Collectors.toList()); + servers = ClickhouseUtil.createNodes(config.getString(HOST), config.getString(DATABASE), + config.getString(USERNAME), config.getString(PASSWORD)); sql = config.getString(SQL); try (ClickHouseClient client = ClickHouseClient.newInstance(servers.get(0).getProtocol()); diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java index ce07b3412bd..66d0621df89 100644 --- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java @@ -33,6 +33,7 @@ public class ClickhouseSourceSplitEnumerator implements private final Set readers; private volatile int assigned = -1; + // TODO support read distributed engine use multi split ClickhouseSourceSplitEnumerator(Context enumeratorContext) { this.context = enumeratorContext; this.readers = new HashSet<>(); diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKAggCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKAggCommitInfo.java new file mode 100644 index 00000000000..2de15ac9ca9 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKAggCommitInfo.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.state; + +import java.io.Serializable; + +public class CKAggCommitInfo implements Serializable { +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKCommitInfo.java new file mode 100644 index 00000000000..99464801ddd --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKCommitInfo.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.state; + +import java.io.Serializable; + +public class CKCommitInfo implements Serializable { +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSinkState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSinkState.java new file mode 100644 index 00000000000..28d9dc2ed4f --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSinkState.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.state; + +import java.io.Serializable; + +public class ClickhouseSinkState implements Serializable { +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java new file mode 100644 index 00000000000..02e7be5966d --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/tool/IntHolder.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.tool; + +import java.io.Serializable; + +public class IntHolder implements Serializable { + + private static final long serialVersionUID = -1L; + + private int value; + + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } +} diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java new file mode 100644 index 00000000000..38c835831c7 --- /dev/null +++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.util; + +import com.clickhouse.client.ClickHouseCredentials; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseProtocol; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class ClickhouseUtil { + + public static List createNodes(String nodeAddress, String database, String username, + String password) { + return Arrays.stream(nodeAddress.split(",")).map(address -> { + String[] nodeAndPort = address.split(":", 2); + return ClickHouseNode.builder().host(nodeAndPort[0]).port(ClickHouseProtocol.HTTP, + Integer.parseInt(nodeAndPort[1])).database(database) + .credentials(ClickHouseCredentials.fromUserAndPassword(username, password)).build(); + }).collect(Collectors.toList()); + } + +} diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE index 091b8ca9a19..dd1a4432c22 100644 --- a/seatunnel-dist/release-docs/LICENSE +++ b/seatunnel-dist/release-docs/LICENSE @@ -354,7 +354,6 @@ The text of each license is the standard Apache 2.0 license. (Apache License, Version 2.0) Apache Commons Email (org.apache.commons:commons-email:1.5 - http://commons.apache.org/proper/commons-email/) (Apache License, Version 2.0) Apache Commons IO (commons-io:commons-io:2.11.0 - https://commons.apache.org/proper/commons-io/) (Apache License, Version 2.0) Apache Commons IO (commons-io:commons-io:2.5 - http://commons.apache.org/proper/commons-io/) - (Apache License, Version 2.0) Apache Commons IO (commons-io:commons-io:2.8.0 - https://commons.apache.org/proper/commons-io/) (Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.4 - http://commons.apache.org/proper/commons-lang/) (Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.5 - http://commons.apache.org/proper/commons-lang/) (Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.6 - http://commons.apache.org/proper/commons-lang/) @@ -421,6 +420,7 @@ The text of each license is the standard Apache 2.0 license. (Apache License, Version 2.0) Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.6 - http://hc.apache.org/httpcomponents-client) (Apache License, Version 2.0) Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.9 - http://hc.apache.org/httpcomponents-client) (Apache License, Version 2.0) Apache HttpClient Mime (org.apache.httpcomponents:httpmime:4.5.2 - http://hc.apache.org/httpcomponents-client) + (Apache License, Version 2.0) Apache HttpClient Mime (org.apache.httpcomponents:httpmime:4.5.13 - http://hc.apache.org/httpcomponents-client) (Apache License, Version 2.0) Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.10 - http://hc.apache.org/httpcomponents-core-ga) (Apache License, Version 2.0) Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.11 - http://hc.apache.org/httpcomponents-core-ga) (Apache License, Version 2.0) Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.12 - http://hc.apache.org/httpcomponents-core-ga) @@ -760,6 +760,7 @@ The text of each license is the standard Apache 2.0 license. (The Apache Software License, Version 2.0) Google HTTP Client Library for Java (com.google.http-client:google-http-client:1.26.0 - https://github.com/googleapis/google-http-java-client/google-http-client) (The Apache Software License, Version 2.0) Google OAuth Client Library for Java (com.google.oauth-client:google-oauth-client:1.26.0 - https://github.com/googleapis/google-oauth-java-client/google-oauth-client) (The Apache Software License, Version 2.0) Gson (com.google.code.gson:gson:2.2.4 - http://code.google.com/p/google-gson/) + (The Apache Software License, Version 2.0) Gson (com.google.code.gson:gson:2.9.0 - http://code.google.com/p/google-gson/) (The Apache Software License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:19.0 - https://github.com/google/guava/guava) (The Apache Software License, Version 2.0) HPPC Collections (com.carrotsearch:hppc:0.7.1 - http://labs.carrotsearch.com/hppc.html/hppc) (The Apache Software License, Version 2.0) HPPC Collections (com.carrotsearch:hppc:0.7.2 - http://labs.carrotsearch.com/hppc.html/hppc) @@ -833,7 +834,9 @@ The text of each license is the standard Apache 2.0 license. (The Apache Software License, Version 2.0) aggs-matrix-stats (org.elasticsearch.plugin:aggs-matrix-stats-client:7.5.1 - https://github.com/elastic/elasticsearch) (The Apache Software License, Version 2.0) cli (org.elasticsearch:elasticsearch-cli:6.3.1 - https://github.com/elastic/elasticsearch) (The Apache Software License, Version 2.0) clickhouse-client (com.clickhouse:clickhouse-client:0.3.2-patch9 - https://github.com/ClickHouse/clickhouse-jdbc) + (The Apache Software License, Version 2.0) clickhouse-grpc-client (com.clickhouse:clickhouse-grpc-client:0.3.2-patch9 - https://github.com/ClickHouse/clickhouse-jdbc) (The Apache Software License, Version 2.0) clickhouse-http-client (com.clickhouse:clickhouse-http-client:0.3.2-patch9 - https://github.com/ClickHouse/clickhouse-jdbc) + (The Apache Software License, Version 2.0) clickhouse-jdbc (com.clickhouse:clickhouse-jdbc:0.3.2-patch9 - https://github.com/ClickHouse/clickhouse-jdbc) (The Apache Software License, Version 2.0) clickhouse-jdbc (ru.yandex.clickhouse:clickhouse-jdbc:0.2 - https://github.com/yandex/clickhouse-jdbc) (The Apache Software License, Version 2.0) elasticsearch-cli (org.elasticsearch:elasticsearch-cli:7.5.1 - https://github.com/elastic/elasticsearch) (The Apache Software License, Version 2.0) elasticsearch-core (org.elasticsearch:elasticsearch-core:6.3.1 - https://github.com/elastic/elasticsearch) diff --git a/seatunnel-dist/release-docs/NOTICE b/seatunnel-dist/release-docs/NOTICE index 4f0186eb76e..fe3c949f619 100644 --- a/seatunnel-dist/release-docs/NOTICE +++ b/seatunnel-dist/release-docs/NOTICE @@ -4377,4 +4377,16 @@ Copyright 2017-2021 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). +========================================================================= + +Apache HttpClient Mime NOTICE + +========================================================================= + +Apache HttpClient Mime +Copyright 1999-2020 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + ========================================================================= \ No newline at end of file diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java index cc8427d9d40..edbd788129d 100644 --- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java +++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java @@ -62,14 +62,14 @@ public List getPluginJarPaths(List pluginIdentifiers) { return pluginIdentifiers.stream() .map(this::getPluginJarPath) .filter(Optional::isPresent) - .map(Optional::get) + .map(Optional::get).distinct() .collect(Collectors.toList()); } @Override public List getAllPlugins(List pluginIdentifiers) { return pluginIdentifiers.stream() - .map(this::getPluginInstance) + .map(this::getPluginInstance).distinct() .collect(Collectors.toList()); } diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java index 09ab6d239a4..bcd3d78877d 100644 --- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java +++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowConverter.java @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.MutableShort; import org.apache.spark.sql.catalyst.expressions.MutableValue; import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow; +import org.apache.spark.unsafe.types.UTF8String; import java.io.IOException; import java.sql.Date; @@ -77,6 +78,8 @@ private static Object convert(Object field, SeaTunnelDataType dataType) { return Timestamp.valueOf((LocalDateTime) field); case MAP: return convertMap((Map) field, (MapType) dataType, InternalRowConverter::convert); + case STRING: + return UTF8String.fromString((String) field); default: return field; } diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java index ec31649e7aa..6674db54468 100644 --- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java +++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkDataWriter.java @@ -78,6 +78,7 @@ public WriterCommitMessage commit() throws IOException { } SparkWriterCommitMessage sparkWriterCommitMessage = new SparkWriterCommitMessage<>(latestCommitInfoT); cleanCommitInfo(); + sinkWriter.close(); return sparkWriterCommitMessage; } diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index c3327b4287a..5ad2f9439f5 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -65,8 +65,11 @@ chill-java-0.9.3.jar chill_2.11-0.9.3.jar classmate-1.1.0.jar clickhouse-client-0.3.2-patch9.jar +clickhouse-grpc-client-0.3.2-patch9-netty.jar +clickhouse-http-client-0.3.2-patch9-shaded.jar clickhouse-http-client-0.3.2-patch9.jar clickhouse-jdbc-0.2.jar +clickhouse-jdbc-0.3.2-patch9.jar commons-beanutils-1.9.3.jar commons-cli-1.2.jar commons-cli-1.3.1.jar @@ -186,6 +189,7 @@ google-http-client-1.26.0.jar google-http-client-jackson2-1.26.0.jar google-oauth-client-1.26.0.jar gson-2.2.4.jar +gson-2.9.0.jar guava-19.0.jar guice-3.0.jar guice-4.1.0.jar @@ -264,6 +268,7 @@ httpasyncclient-4.1.4.jar httpclient-4.5.13.jar httpcore-4.4.4.jar httpcore-nio-4.4.4.jar +httpmime-4.5.13.jar httpmime-4.5.2.jar hudi-spark-bundle_2.11-0.10.0.jar i18n-util-1.0.4.jar