From f68bc4a98f20ecb61d09c624f90c6ec7b9825dcd Mon Sep 17 00:00:00 2001 From: chaorongzhi Date: Fri, 5 Jan 2024 17:43:28 +0800 Subject: [PATCH] =?UTF-8?q?fix(clickhouse):=20clickhouse=E6=94=AF=E6=8C=81?= =?UTF-8?q?jdbc=E6=96=B9=E5=BC=8F=E8=AF=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../connector-jdbc/pom.xml | 7 ++ .../internal/dialect/DatabaseIdentifier.java | 42 ++++++++++ .../dialect/clickhouse/ClickhouseDialect.java | 37 +++++++++ .../clickhouse/ClickhouseDialectFactory.java | 38 +++++++++ .../ClickhouseJdbcRowConverter.java | 28 +++++++ .../clickhouse/ClickhouseTypeMapper.java | 80 +++++++++++++++++++ .../seatunnel-engine-examples/pom.xml | 14 +++- 7 files changed, 244 insertions(+), 2 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseDialect.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseDialectFactory.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseJdbcRowConverter.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseTypeMapper.java diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index e76237e7e07..b8b30818199 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -46,6 +46,7 @@ 3.13.29 12.0.3-0 2.5.1 + 0.2.6 @@ -143,6 +144,12 @@ ${vertica.version} provided + + ru.yandex.clickhouse + clickhouse-jdbc + ${clickhouse.jdbc.version} + provided + diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java new file mode 100644 index 00000000000..633e100ec46 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; + +public class DatabaseIdentifier { + public static final String DB_2 = "DB2"; + public static final String DAMENG = "Dameng"; + public static final String GBASE_8A = "Gbase8a"; + public static final String HIVE = "HIVE"; + public static final String INFORMIX = "Informix"; + public static final String KINGBASE = "KingBase"; + public static final String MYSQL = "MySQL"; + public static final String ORACLE = "Oracle"; + public static final String PHOENIX = "Phoenix"; + public static final String POSTGRESQL = "Postgres"; + public static final String REDSHIFT = "Redshift"; + public static final String SAP_HANA = "SapHana"; + public static final String SNOWFLAKE = "Snowflake"; + public static final String SQLITE = "Sqlite"; + public static final String SQLSERVER = "SqlServer"; + public static final String TABLE_STORE = "Tablestore"; + public static final String TERADATA = "Teradata"; + public static final String VERTICA = "Vertica"; + public static final String OCENABASE = "OceanBase"; + public static final String TIDB = "TiDB"; + public static final String CLICKHOUSE = "Clickhouse"; +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseDialect.java new file mode 100644 index 00000000000..324b8ae1f17 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseDialect.java @@ -0,0 +1,37 @@ +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.clickhouse; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.util.Optional; + +/** + * Description Copyright © 启明星辰 版权所有 + * + * @author chaorongzhi + * @date 2024/1/5 + */ +public class ClickhouseDialect implements JdbcDialect { + @Override + public String dialectName() { + return DatabaseIdentifier.CLICKHOUSE; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new ClickhouseJdbcRowConverter(); + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new ClickhouseTypeMapper(); + } + + @Override + public Optional getUpsertStatement( + String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) { + return Optional.empty(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseDialectFactory.java new file mode 100644 index 00000000000..d089900a915 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseDialectFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.clickhouse; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect; + +import com.google.auto.service.AutoService; + +/** Factory for {@link MysqlDialect}. */ +@AutoService(JdbcDialectFactory.class) +public class ClickhouseDialectFactory implements JdbcDialectFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:clickhouse:"); + } + + @Override + public JdbcDialect create() { + return new ClickhouseDialect(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseJdbcRowConverter.java new file mode 100644 index 00000000000..ab9a2208fd6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseJdbcRowConverter.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.clickhouse; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; + +public class ClickhouseJdbcRowConverter extends AbstractJdbcRowConverter { + @Override + public String converterName() { + return DatabaseIdentifier.CLICKHOUSE; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseTypeMapper.java new file mode 100644 index 00000000000..1b0257f2b1e --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/clickhouse/ClickhouseTypeMapper.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.clickhouse; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +public class ClickhouseTypeMapper implements JdbcDialectTypeMapper { + + @Override + public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) + throws SQLException { + String clickhouseType = metadata.getColumnTypeName(colIndex); + int precision = metadata.getPrecision(colIndex); + int scale = metadata.getScale(colIndex); + switch (clickhouseType) { + case "UInt32": + case "UInt64": + case "Int64": + case "IntervalYear": + case "IntervalQuarter": + case "IntervalMonth": + case "IntervalWeek": + case "IntervalDay": + case "IntervalHour": + case "IntervalMinute": + case "IntervalSecond": + return BasicType.LONG_TYPE; + case "Float64": + return BasicType.DOUBLE_TYPE; + case "Float32": + return BasicType.FLOAT_TYPE; + case "Int8": + case "UInt8": + case "Int16": + case "UInt16": + case "Int32": + return BasicType.INT_TYPE; + case "Date": + return LocalTimeType.LOCAL_DATE_TYPE; + case "DateTime": + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case "Decimal": + case "Decimal64": + case "Decimal128": + case "Decimal256": + return new DecimalType(precision, scale); + case "String": + case "Int128": + case "UInt128": + case "Int256": + case "UInt256": + case "Point": + case "Polygon": + default: + return BasicType.STRING_TYPE; + } + } +} diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-examples/seatunnel-engine-examples/pom.xml index 33dcaed895f..8d296774396 100644 --- a/seatunnel-examples/seatunnel-engine-examples/pom.xml +++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml @@ -59,13 +59,23 @@ org.apache.seatunnel - connector-console + connector-jdbc ${project.version} org.apache.seatunnel - connector-assert + connector-clickhouse ${project.version} + + ru.yandex.clickhouse + clickhouse-jdbc + 0.2.6 + + + com.oracle.database.jdbc + ojdbc8 + 18.3.0.0 +