Skip to content

Commit

Permalink
fix(clickhouse): clickhouse支持jdbc方式读
Browse files Browse the repository at this point in the history
  • Loading branch information
chaorongzhi committed Sep 2, 2024
1 parent 1a5cc46 commit f68bc4a
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 2 deletions.
7 changes: 7 additions & 0 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<snowflake.version>3.13.29</snowflake.version>
<vertica.version>12.0.3-0</vertica.version>
<postgis.jdbc.version>2.5.1</postgis.jdbc.version>
<clickhouse.jdbc.version>0.2.6</clickhouse.jdbc.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -143,6 +144,12 @@
<version>${vertica.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse.jdbc.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

public class DatabaseIdentifier {
public static final String DB_2 = "DB2";
public static final String DAMENG = "Dameng";
public static final String GBASE_8A = "Gbase8a";
public static final String HIVE = "HIVE";
public static final String INFORMIX = "Informix";
public static final String KINGBASE = "KingBase";
public static final String MYSQL = "MySQL";
public static final String ORACLE = "Oracle";
public static final String PHOENIX = "Phoenix";
public static final String POSTGRESQL = "Postgres";
public static final String REDSHIFT = "Redshift";
public static final String SAP_HANA = "SapHana";
public static final String SNOWFLAKE = "Snowflake";
public static final String SQLITE = "Sqlite";
public static final String SQLSERVER = "SqlServer";
public static final String TABLE_STORE = "Tablestore";
public static final String TERADATA = "Teradata";
public static final String VERTICA = "Vertica";
public static final String OCENABASE = "OceanBase";
public static final String TIDB = "TiDB";
public static final String CLICKHOUSE = "Clickhouse";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.clickhouse;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.util.Optional;

/**
* Description Copyright © 启明星辰 版权所有
*
* @author chaorongzhi
* @date 2024/1/5
*/
public class ClickhouseDialect implements JdbcDialect {
@Override
public String dialectName() {
return DatabaseIdentifier.CLICKHOUSE;
}

@Override
public JdbcRowConverter getRowConverter() {
return new ClickhouseJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new ClickhouseTypeMapper();
}

@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.clickhouse;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;

import com.google.auto.service.AutoService;

/** Factory for {@link MysqlDialect}. */
@AutoService(JdbcDialectFactory.class)
public class ClickhouseDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:clickhouse:");
}

@Override
public JdbcDialect create() {
return new ClickhouseDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.clickhouse;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;

public class ClickhouseJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
public String converterName() {
return DatabaseIdentifier.CLICKHOUSE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.clickhouse;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class ClickhouseTypeMapper implements JdbcDialectTypeMapper {

@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex)
throws SQLException {
String clickhouseType = metadata.getColumnTypeName(colIndex);
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (clickhouseType) {
case "UInt32":
case "UInt64":
case "Int64":
case "IntervalYear":
case "IntervalQuarter":
case "IntervalMonth":
case "IntervalWeek":
case "IntervalDay":
case "IntervalHour":
case "IntervalMinute":
case "IntervalSecond":
return BasicType.LONG_TYPE;
case "Float64":
return BasicType.DOUBLE_TYPE;
case "Float32":
return BasicType.FLOAT_TYPE;
case "Int8":
case "UInt8":
case "Int16":
case "UInt16":
case "Int32":
return BasicType.INT_TYPE;
case "Date":
return LocalTimeType.LOCAL_DATE_TYPE;
case "DateTime":
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case "Decimal":
case "Decimal64":
case "Decimal128":
case "Decimal256":
return new DecimalType(precision, scale);
case "String":
case "Int128":
case "UInt128":
case "Int256":
case "UInt256":
case "Point":
case "Polygon":
default:
return BasicType.STRING_TYPE;
}
}
}
14 changes: 12 additions & 2 deletions seatunnel-examples/seatunnel-engine-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,23 @@
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-console</artifactId>
<artifactId>connector-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
<artifactId>connector-clickhouse</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.6</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>18.3.0.0</version>
</dependency>
</dependencies>
</project>

0 comments on commit f68bc4a

Please sign in to comment.