Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Clickhouse] Support multi-table source read #7529

Open
wants to merge 39 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c6427c8
clickhouse supports multi-table reading
zhilinli123 Aug 30, 2024
c732ca5
Merge remote-tracking branch 'origin/dev' into multi-source-clickhouse
zhilinli123 Aug 30, 2024
3cf3d2c
1
zhilinli123 Aug 30, 2024
6a667c6
1
zhilinli123 Sep 2, 2024
63b6d7d
fix ci
zhilinli123 Sep 5, 2024
75a5a19
Merge branch 'apache:dev' into multi-source-clickhouse
zhilinli123 Sep 5, 2024
5bb68cc
Merge branch 'apache:dev' into multi-source-clickhouse
zhilinli123 Sep 5, 2024
aa515fc
fix ci
zhilinli123 Sep 6, 2024
6ef832a
Merge branch 'apache:dev' into multi-source-clickhouse
zhilinli123 Sep 6, 2024
7f32a8a
fix
zhilinli123 Sep 6, 2024
5fa9fbc
fix
zhilinli123 Sep 6, 2024
7ef2baa
fix
zhilinli123 Sep 7, 2024
813f763
fix
zhilinli123 Sep 9, 2024
471a473
fix
zhilinli123 Sep 9, 2024
9b4076e
fix
zhilinli123 Sep 10, 2024
d126d28
fix
zhilinli123 Sep 12, 2024
0f67c8e
Merge remote-tracking branch 'origin/dev' into multi-source-clickhouse
zhilinli123 Sep 12, 2024
c173cbe
fix
zhilinli123 Sep 12, 2024
cd7e8a1
fix
zhilinli123 Sep 13, 2024
4a3e791
Merge remote-tracking branch 'origin/dev' into multi-source-clickhouse
zhilinli123 Sep 27, 2024
818d215
Merge remote-tracking branch 'origin/dev' into multi-source-clickhouse
zhilinli123 Oct 17, 2024
6e40d89
1
zhilinli123 Oct 17, 2024
56ed9da
1
zhilinli123 Oct 17, 2024
cdd1300
Merge branch 'apache:dev' into multi-source-clickhouse
zhilinli123 Oct 24, 2024
7e6dc65
1
zhilinli123 Oct 25, 2024
1a6f302
1
zhilinli123 Oct 28, 2024
bfc6826
Merge branch 'apache:dev' into multi-source-clickhouse
zhilinli123 Oct 28, 2024
9623a6a
1
zhilinli123 Oct 28, 2024
78c48a0
Merge remote-tracking branch 'origin/multi-source-clickhouse' into mu…
zhilinli123 Oct 28, 2024
719df6a
1
zhilinli123 Oct 29, 2024
198ef0f
Merge remote-tracking branch 'origin/dev' into multi-source-clickhouse
zhilinli123 Oct 30, 2024
af1b2b2
1
zhilinli123 Oct 30, 2024
6d63449
1
zhilinli123 Oct 30, 2024
8e4b2a0
Merge remote-tracking branch 'origin/dev' into multi-source-clickhouse
zhilinli123 Oct 30, 2024
4ec9c4f
1
zhilinli123 Oct 30, 2024
ac979ef
1
zhilinli123 Oct 31, 2024
4c12b00
1
zhilinli123 Oct 31, 2024
db635ef
Merge branch 'apache:dev' into multi-source-clickhouse
zhilinli123 Nov 5, 2024
f22f187
1
zhilinli123 Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions docs/en/connector-v2/source/Clickhouse.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

# Clickhouse

> Clickhouse source connector
Expand Down Expand Up @@ -95,6 +96,49 @@ sink {
}
```

### Multi Table read

> This is a multi-table read case

```bash


env {
parallelism = 1
job.mode = "BATCH"
}

source {
Clickhouse {
host = "clickhouse:8123"
database = "default"
username = "default"
password = ""
table_list = [
{
table_path = "t1"
sql = "select * from t1 where 1=1 "

},
{
table_path = "t2",
sql = "select * from t2"
}
]
}
}

sink {
Clickhouse {
host = "clickhouse:8123"
database = "default"
table = "t3"
username = "default"
password = ""
}
}
```

### Tips

> 1.[SeaTunnel Deployment Document](../../start-v2/locally/deployment.md).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;

import org.apache.seatunnel.api.table.catalog.CatalogTable;

import lombok.Data;

import java.io.Serializable;

@Data
public class ClickhouseCatalogConfig implements Serializable {

private String sql;
private CatalogTable catalogTable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

Expand All @@ -27,19 +29,6 @@

public class ClickhouseConfig {

/** Bulk size of clickhouse jdbc */
public static final Option<Integer> BULK_SIZE =
Options.key("bulk_size")
.intType()
.defaultValue(20000)
.withDescription("Bulk size of clickhouse jdbc");

public static final Option<String> SQL =
Options.key("sql")
.stringType()
.noDefaultValue()
.withDescription("Clickhouse sql used to query data");

/** Clickhouse server host */
public static final Option<String> HOST =
Options.key("host")
Expand Down Expand Up @@ -83,6 +72,39 @@ public class ClickhouseConfig {
.withDescription(
"The session time zone in database server."
+ "If not set, then ZoneId.systemDefault() is used to determine the server time zone");
/** clickhouse source sql */
public static final Option<String> SQL =
Options.key("sql")
.stringType()
.noDefaultValue()
.withDescription("Clickhouse sql used to query data");

/** clickhouse multi table */
public static final Option<List<Map<String, Object>>> TABLE_LIST =
Options.key("table_list")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription("table list config");

public static final Option<String> TABLE_PATH =
Options.key("table_path")
.stringType()
.noDefaultValue()
.withDescription("table full path");

/** Bulk size of clickhouse jdbc */
public static final Option<Integer> BULK_SIZE =
Options.key("bulk_size")
.intType()
.defaultValue(20000)
.withDescription("Bulk size of clickhouse jdbc");

/** clickhouse conf */
public static final Option<Map<String, String>> CLICKHOUSE_CONFIG =
Options.key("clickhouse.config")
.mapType()
.defaultValue(Collections.emptyMap())
.withDescription("Clickhouse custom config");

/** Split mode when table is distributed engine */
public static final Option<Boolean> SPLIT_MODE =
Expand Down Expand Up @@ -158,12 +180,6 @@ public class ClickhouseConfig {
.noDefaultValue()
.withDescription("The password of Clickhouse server node");

public static final Option<Map<String, String>> CLICKHOUSE_CONFIG =
Options.key("clickhouse.config")
.mapType()
.defaultValue(Collections.emptyMap())
.withDescription("Clickhouse custom config");

public static final Option<String> FILE_FIELDS_DELIMITER =
Options.key("file_fields_delimiter")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,70 +17,217 @@

package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseCatalogConfig;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;

import org.apache.commons.collections4.map.HashedMap;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseResponse;
import com.google.auto.service.AutoService;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE_LIST;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE_PATH;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;

@AutoService(SeaTunnelSource.class)
public class ClickhouseSource
implements SeaTunnelSource<SeaTunnelRow, ClickhouseSourceSplit, ClickhouseSourceState>,
SupportParallelism,
SupportColumnProjection {

private List<ClickHouseNode> servers;
private CatalogTable catalogTable;
private String sql;
private Map<TablePath, ClickhouseCatalogConfig> tableClickhouseCatalogConfigMap =
new HashedMap<>();

public ClickhouseSource(List<ClickHouseNode> servers, CatalogTable catalogTable, String sql) {
this.servers = servers;
this.catalogTable = catalogTable;
this.sql = sql;
}
private static final String defaultTablePath = "default";

@Override
public String getPluginName() {
return "Clickhouse";
}

@Override
public void prepare(Config config) throws PrepareFailException {
config =
config.withFallback(
ConfigFactory.parseMap(
Collections.singletonMap(
SERVER_TIME_ZONE.key(), SERVER_TIME_ZONE.defaultValue())));

Map<String, String> customConfig =
CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())
? config.getObject(CLICKHOUSE_CONFIG.key()).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().unwrapped().toString()))
: null;

servers =
ClickhouseUtil.createNodes(
config.getString(HOST.key()),
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
config.getString(PASSWORD.key()),
customConfig);

ClickHouseNode currentServer =
servers.get(ThreadLocalRandom.current().nextInt(servers.size()));

ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
String sql = readonlyConfig.getOptional(SQL).orElse(null);

if (readonlyConfig.getOptional(TABLE_LIST).isPresent()) {
readonlyConfig.get(TABLE_LIST).stream()
.map(ReadonlyConfig::fromMap)
.forEach(
conf -> {
String confSql = conf.getOptional(SQL).get();
SeaTunnelRowType clickhouseRowType =
getClickhouseRowType(currentServer, confSql);
TablePath tablePath =
TablePath.of(conf.getOptional(TABLE_PATH).orElse(""));
CatalogTable catalogTable =
createCatalogTable(clickhouseRowType, tablePath);

ClickhouseCatalogConfig clickhouseCatalogConfig =
new ClickhouseCatalogConfig();
clickhouseCatalogConfig.setSql(confSql);
clickhouseCatalogConfig.setCatalogTable(catalogTable);
tableClickhouseCatalogConfigMap.put(
tablePath, clickhouseCatalogConfig);
});
} else {
SeaTunnelRowType clickhouseRowType = getClickhouseRowType(currentServer, sql);
CatalogTable catalogTable =
CatalogTableUtil.getCatalogTable(defaultTablePath, clickhouseRowType);

ClickhouseCatalogConfig clickhouseCatalogConfig = new ClickhouseCatalogConfig();
clickhouseCatalogConfig.setCatalogTable(catalogTable);
clickhouseCatalogConfig.setSql(sql);
tableClickhouseCatalogConfigMap.put(
TablePath.of(defaultTablePath), clickhouseCatalogConfig);
}
}

private CatalogTable createCatalogTable(SeaTunnelRowType rowType, TablePath tablePath) {
TableSchema.Builder schemaBuilder = TableSchema.builder();
for (int i = 0; i < rowType.getTotalFields(); i++) {
schemaBuilder.column(
PhysicalColumn.of(
rowType.getFieldName(i), rowType.getFieldType(i), 0, true, null, null));
}
return CatalogTable.of(
TableIdentifier.of("", tablePath),
schemaBuilder.build(),
Collections.emptyMap(),
Collections.emptyList(),
null);
}

public SeaTunnelRowType getClickhouseRowType(ClickHouseNode currentServer, String sql) {
try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol());
ClickHouseResponse response =
client.connect(currentServer)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query(String.format("SELECT * FROM (%s) s LIMIT 1", sql))
.executeAndWait()) {

int columnSize = response.getColumns().size();
String[] fieldNames = new String[columnSize];
SeaTunnelDataType<?>[] seaTunnelDataTypes = new SeaTunnelDataType[columnSize];

for (int i = 0; i < columnSize; i++) {
fieldNames[i] = response.getColumns().get(i).getColumnName();
seaTunnelDataTypes[i] = TypeConvertUtil.convert(response.getColumns().get(i));
}

return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
} catch (ClickHouseException e) {
throw new ClickhouseConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, ExceptionUtils.getMessage(e)));
}
}

@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
}

@Override
public List<CatalogTable> getProducedCatalogTables() {
return Collections.singletonList(catalogTable);
return tableClickhouseCatalogConfigMap.entrySet().stream()
.map(conf -> conf.getValue().getCatalogTable())
.collect(Collectors.toList());
}

@Override
public SourceReader<SeaTunnelRow, ClickhouseSourceSplit> createReader(
SourceReader.Context readerContext) throws Exception {
return new ClickhouseSourceReader(
servers, readerContext, this.catalogTable.getSeaTunnelRowType(), sql);
return new ClickhouseSourceReader(servers, readerContext);
}

@Override
public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState> createEnumerator(
SourceSplitEnumerator.Context<ClickhouseSourceSplit> enumeratorContext)
throws Exception {
return new ClickhouseSourceSplitEnumerator(enumeratorContext);
return new ClickhouseSourceSplitEnumerator(
enumeratorContext, tableClickhouseCatalogConfigMap);
}

@Override
public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState> restoreEnumerator(
SourceSplitEnumerator.Context<ClickhouseSourceSplit> enumeratorContext,
ClickhouseSourceState checkpointState)
throws Exception {
return new ClickhouseSourceSplitEnumerator(enumeratorContext);
return new ClickhouseSourceSplitEnumerator(
enumeratorContext, tableClickhouseCatalogConfigMap, checkpointState);
}
}
Loading
Loading