From c6427c8134d7cdb33198558a5f6cb44f60fdec68 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Fri, 30 Aug 2024 11:29:11 +0800 Subject: [PATCH 01/26] clickhouse supports multi-table reading --- docs/en/connector-v2/source/Clickhouse.md | 45 +++++++ .../config/ClickhouseCatalogConfig.java | 31 +++++ .../clickhouse/config/ClickhouseConfig.java | 54 +++++--- .../clickhouse/source/ClickhouseSource.java | 125 +++++++++++------- .../source/ClickhouseSourceFactory.java | 4 +- .../source/ClickhouseSourceReader.java | 79 +++++++---- .../source/ClickhouseSourceSplit.java | 6 + .../ClickhouseSourceSplitEnumerator.java | 53 ++++++-- .../ClickhouseSinkCDCChangelogIT.java | 57 ++++++-- .../resources/multi_source_clickhouse.conf | 57 ++++++++ 10 files changed, 392 insertions(+), 119 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseCatalogConfig.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/multi_source_clickhouse.conf diff --git a/docs/en/connector-v2/source/Clickhouse.md b/docs/en/connector-v2/source/Clickhouse.md index 6fe0a5bb56b..09b4856f9b3 100644 --- a/docs/en/connector-v2/source/Clickhouse.md +++ b/docs/en/connector-v2/source/Clickhouse.md @@ -95,6 +95,51 @@ sink { } ``` +### Multi Table read + +> This is a multi-table read case + +```bash + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + username = "default" + password = "" + server_time_zone = "UTC" + result_table_name = "test" + clickhouse.config = { + "socket_timeout": "300000" + } + table_list = [ + { + table_path = "t1" + sql = "select * from t1 where 1=1 " + + }, + { + table_path = "t2", + sql = "select * from t2" + } + ] + } +} + +sink { + Assert { + rules { + table-names = ["t1", "t2"] + } + } +} +``` + ### Tips > 1.[SeaTunnel Deployment Document](../../start-v2/locally/deployment.md). diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseCatalogConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseCatalogConfig.java new file mode 100644 index 00000000000..a7c24fb0f2a --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseCatalogConfig.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.config; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class ClickhouseCatalogConfig implements Serializable { + + private String sql; + private CatalogTable catalogTable; +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java index bb0417b1712..e6413593ef7 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.config; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; + import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; @@ -27,19 +29,6 @@ public class ClickhouseConfig { - /** Bulk size of clickhouse jdbc */ - public static final Option BULK_SIZE = - Options.key("bulk_size") - .intType() - .defaultValue(20000) - .withDescription("Bulk size of clickhouse jdbc"); - - public static final Option SQL = - Options.key("sql") - .stringType() - .noDefaultValue() - .withDescription("Clickhouse sql used to query data"); - /** Clickhouse server host */ public static final Option HOST = Options.key("host") @@ -83,6 +72,39 @@ public class ClickhouseConfig { .withDescription( "The session time zone in database server." + "If not set, then ZoneId.systemDefault() is used to determine the server time zone"); + /** clickhouse source sql */ + public static final Option SQL = + Options.key("sql") + .stringType() + .noDefaultValue() + .withDescription("Clickhouse sql used to query data"); + + /** clickhouse multi table */ + public static final Option>> TABLE_LIST = + Options.key("table_list") + .type(new TypeReference>>() {}) + .noDefaultValue() + .withDescription("table list config"); + + public static final Option TABLE_PATH = + Options.key("table_path") + .stringType() + .noDefaultValue() + .withDescription("table full path"); + + /** Bulk size of clickhouse jdbc */ + public static final Option BULK_SIZE = + Options.key("bulk_size") + .intType() + .defaultValue(20000) + .withDescription("Bulk size of clickhouse jdbc"); + + /** clickhouse conf */ + public static final Option> CLICKHOUSE_CONFIG = + Options.key("clickhouse.config") + .mapType() + .defaultValue(Collections.emptyMap()) + .withDescription("Clickhouse custom config"); /** Split mode when table is distributed engine */ public static final Option SPLIT_MODE = @@ -158,12 +180,6 @@ public class ClickhouseConfig { .noDefaultValue() .withDescription("The password of Clickhouse server node"); - public static final Option> CLICKHOUSE_CONFIG = - Options.key("clickhouse.config") - .mapType() - .defaultValue(Collections.emptyMap()) - .withDescription("Clickhouse custom config"); - public static final Option FILE_FIELDS_DELIMITER = Options.key("file_fields_delimiter") .stringType() diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java index a79d2df8de6..1a7499bc0f2 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java @@ -22,31 +22,37 @@ import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.source.SupportColumnProjection; import org.apache.seatunnel.api.source.SupportParallelism; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseCatalogConfig; import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState; import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil; +import org.apache.commons.collections4.map.HashedMap; + import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseException; import com.clickhouse.client.ClickHouseFormat; import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseResponse; import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableMap; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -58,6 +64,8 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE_LIST; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE_PATH; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME; @AutoService(SeaTunnelSource.class) @@ -68,7 +76,10 @@ public class ClickhouseSource private List servers; private SeaTunnelRowType rowTypeInfo; - private String sql; + private Map tableClickhouseCatalogConfigMap = + new HashedMap<>(); + + private final String defaultTablePath = "default"; @Override public String getPluginName() { @@ -77,39 +88,20 @@ public String getPluginName() { @Override public void prepare(Config config) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists( - config, - HOST.key(), - DATABASE.key(), - SQL.key(), - USERNAME.key(), - PASSWORD.key()); - if (!result.isSuccess()) { - throw new ClickhouseConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, result.getMsg())); - } - Map defaultConfig = - ImmutableMap.builder() - .put(SERVER_TIME_ZONE.key(), SERVER_TIME_ZONE.defaultValue()) - .build(); - - config = config.withFallback(ConfigFactory.parseMap(defaultConfig)); - - Map customConfig = null; - - if (CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())) { - customConfig = - config.getObject(CLICKHOUSE_CONFIG.key()).entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entrySet -> - entrySet.getValue().unwrapped().toString())); - } + config = + config.withFallback( + ConfigFactory.parseMap( + Collections.singletonMap( + SERVER_TIME_ZONE.key(), SERVER_TIME_ZONE.defaultValue()))); + + Map customConfig = + CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key()) + ? config.getObject(CLICKHOUSE_CONFIG.key()).entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().unwrapped().toString())) + : null; servers = ClickhouseUtil.createNodes( @@ -120,14 +112,50 @@ public void prepare(Config config) throws PrepareFailException { config.getString(PASSWORD.key()), customConfig); - sql = config.getString(SQL.key()); ClickHouseNode currentServer = servers.get(ThreadLocalRandom.current().nextInt(servers.size())); + + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config); + if (readonlyConfig.getOptional(TABLE_LIST).isPresent()) { + readonlyConfig.get(TABLE_LIST).stream() + .map(ReadonlyConfig::fromMap) + .forEach( + conf -> { + SeaTunnelRowType clickhouseRowType = + getClickhouseRowType( + currentServer, conf.getOptional(SQL).get()); + TablePath tablePath = + TablePath.of(conf.getOptional(TABLE_PATH).get()); + + CatalogTable catalogTable = + CatalogTableUtil.getCatalogTable( + tablePath.getTableName(), clickhouseRowType); + ClickhouseCatalogConfig clickhouseCatalogConfig = + new ClickhouseCatalogConfig(); + clickhouseCatalogConfig.setSql(conf.getOptional(SQL).get()); + clickhouseCatalogConfig.setCatalogTable(catalogTable); + tableClickhouseCatalogConfigMap.put( + tablePath, clickhouseCatalogConfig); + }); + } else { + SeaTunnelRowType clickhouseRowType = + getClickhouseRowType(currentServer, readonlyConfig.getOptional(SQL).get()); + CatalogTable catalogTable = + CatalogTableUtil.getCatalogTable(defaultTablePath, clickhouseRowType); + ClickhouseCatalogConfig clickhouseCatalogConfig = new ClickhouseCatalogConfig(); + clickhouseCatalogConfig.setCatalogTable(catalogTable); + clickhouseCatalogConfig.setSql(readonlyConfig.getOptional(SQL).get()); + tableClickhouseCatalogConfigMap.put( + TablePath.of(defaultTablePath), clickhouseCatalogConfig); + } + } + + public SeaTunnelRowType getClickhouseRowType(ClickHouseNode currentServer, String sql) { try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol()); ClickHouseResponse response = client.connect(currentServer) .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) - .query(modifySQLToLimit1(config.getString(SQL.key()))) + .query(String.format("SELECT * FROM (%s) s LIMIT 1", sql)) .executeAndWait()) { int columnSize = response.getColumns().size(); @@ -139,8 +167,7 @@ public void prepare(Config config) throws PrepareFailException { seaTunnelDataTypes[i] = TypeConvertUtil.convert(response.getColumns().get(i)); } - this.rowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); - + return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); } catch (ClickHouseException e) { throw new ClickhouseConnectorException( SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, @@ -150,31 +177,30 @@ public void prepare(Config config) throws PrepareFailException { } } - private String modifySQLToLimit1(String sql) { - return String.format("SELECT * FROM (%s) s LIMIT 1", sql); - } - @Override public Boundedness getBoundedness() { return Boundedness.BOUNDED; } @Override - public SeaTunnelRowType getProducedType() { - return this.rowTypeInfo; + public List getProducedCatalogTables() { + return tableClickhouseCatalogConfigMap.entrySet().stream() + .map(conf -> conf.getValue().getCatalogTable()) + .collect(Collectors.toList()); } @Override public SourceReader createReader( SourceReader.Context readerContext) throws Exception { - return new ClickhouseSourceReader(servers, readerContext, this.rowTypeInfo, sql); + return new ClickhouseSourceReader(servers, readerContext, tableClickhouseCatalogConfigMap); } @Override public SourceSplitEnumerator createEnumerator( SourceSplitEnumerator.Context enumeratorContext) throws Exception { - return new ClickhouseSourceSplitEnumerator(enumeratorContext); + return new ClickhouseSourceSplitEnumerator( + enumeratorContext, tableClickhouseCatalogConfigMap); } @Override @@ -182,6 +208,7 @@ public SourceSplitEnumerator resto SourceSplitEnumerator.Context enumeratorContext, ClickhouseSourceState checkpointState) throws Exception { - return new ClickhouseSourceSplitEnumerator(enumeratorContext); + return new ClickhouseSourceSplitEnumerator( + enumeratorContext, tableClickhouseCatalogConfigMap); } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java index 4adea4b80ce..dcacd5fbd94 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java @@ -29,6 +29,7 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE_LIST; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME; @AutoService(Factory.class) @@ -41,7 +42,8 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(HOST, DATABASE, SQL, USERNAME, PASSWORD) + .required(HOST, DATABASE, USERNAME, PASSWORD) + .exclusive(SQL, TABLE_LIST) .optional(CLICKHOUSE_CONFIG) .build(); } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java index 591334d9722..93609d0ab83 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java @@ -19,10 +19,15 @@ import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseCatalogConfig; import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseFormat; import com.clickhouse.client.ClickHouseNode; @@ -30,32 +35,33 @@ import com.clickhouse.client.ClickHouseResponse; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; +import java.util.Deque; +import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Random; public class ClickhouseSourceReader implements SourceReader { + private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseSourceReader.class); + private final List servers; private ClickHouseClient client; - private final SeaTunnelRowType rowTypeInfo; private final SourceReader.Context readerContext; private ClickHouseRequest request; - private final String sql; - private final List splits; + private Map tableClickhouseCatalogConfigMap; + + private Deque splits = new LinkedList<>(); ClickhouseSourceReader( List servers, SourceReader.Context readerContext, - SeaTunnelRowType rowTypeInfo, - String sql) { + Map tableClickhouseCatalogConfigMap) { this.servers = servers; this.readerContext = readerContext; - this.rowTypeInfo = rowTypeInfo; - this.sql = sql; - this.splits = new ArrayList<>(); + this.tableClickhouseCatalogConfigMap = tableClickhouseCatalogConfigMap; } @Override @@ -75,28 +81,43 @@ public void close() throws IOException { @Override public void pollNext(Collector output) throws Exception { - if (!splits.isEmpty()) { - try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) { - response.stream() - .forEach( - record -> { - Object[] values = - new Object[this.rowTypeInfo.getFieldNames().length]; - for (int i = 0; i < record.size(); i++) { - if (record.getValue(i).isNullOrEmpty()) { - values[i] = null; - } else { - values[i] = - TypeConvertUtil.valueUnwrap( - this.rowTypeInfo.getFieldType(i), - record.getValue(i)); + synchronized (output.getCheckpointLock()) { + ClickhouseSourceSplit split = splits.poll(); + if (split != null) { + TablePath tablePath = split.getTablePath(); + SeaTunnelRowType seaTunnelRowType = + tableClickhouseCatalogConfigMap + .get(tablePath) + .getCatalogTable() + .getSeaTunnelRowType(); + String sql = tableClickhouseCatalogConfigMap.get(tablePath).getSql(); + try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) { + response.stream() + .forEach( + record -> { + Object[] values = + new Object[seaTunnelRowType.getFieldNames().length]; + for (int i = 0; i < record.size(); i++) { + if (record.getValue(i).isNullOrEmpty()) { + values[i] = null; + } else { + values[i] = + TypeConvertUtil.valueUnwrap( + seaTunnelRowType.getFieldType(i), + record.getValue(i)); + } } - } - output.collect(new SeaTunnelRow(values)); - }); + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(values); + seaTunnelRow.setTableId(String.valueOf(tablePath)); + output.collect(seaTunnelRow); + }); + } + } else if (splits.isEmpty()) { + // signal to the source that we have reached the end of the data. + readerContext.signalNoMoreElement(); + } else { + Thread.sleep(1000L); } - this.readerContext.signalNoMoreElement(); - this.splits.clear(); } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java index e70d2cd44a6..995b6b944e5 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java @@ -18,10 +18,16 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.catalog.TablePath; +import lombok.Data; + +@Data public class ClickhouseSourceSplit implements SourceSplit { @Override public String splitId() { return null; } + + private TablePath tablePath; } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java index c0eb4b6c706..40997956a83 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java @@ -18,25 +18,39 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseCatalogConfig; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; public class ClickhouseSourceSplitEnumerator implements SourceSplitEnumerator { private final Context context; private final Set readers; - private volatile int assigned = -1; + private volatile Map tablePathAssigned = new HashMap<>(); + + private Map tableClickhouseCatalogConfigMap; // TODO support read distributed engine use multi split - ClickhouseSourceSplitEnumerator(Context enumeratorContext) { + ClickhouseSourceSplitEnumerator( + Context enumeratorContext, + Map tableClickhouseCatalogConfigMap) { this.context = enumeratorContext; this.readers = new HashSet<>(); + this.tableClickhouseCatalogConfigMap = tableClickhouseCatalogConfigMap; + for (TablePath tablePath : tableClickhouseCatalogConfigMap.keySet()) { + tablePathAssigned.put(tablePath, -1); + } } @Override @@ -53,19 +67,28 @@ public void addSplitsBack(List splits, int subtaskId) { if (splits.isEmpty()) { return; } - if (subtaskId == assigned) { - Optional otherReader = readers.stream().filter(r -> r != subtaskId).findAny(); - if (otherReader.isPresent()) { - context.assignSplit(otherReader.get(), splits); - } else { - assigned = -1; + for (TablePath tablePath : tableClickhouseCatalogConfigMap.keySet()) { + if (tablePathAssigned.get(tablePath) == subtaskId) { + Optional otherReader = + readers.stream().filter(r -> r != subtaskId).findAny(); + if (otherReader.isPresent()) { + context.assignSplit(otherReader.get(), splits); + } else { + tablePathAssigned.put(tablePath, -1); + } } } } @Override public int currentUnassignedSplitSize() { - return assigned < 0 ? 0 : 1; + return tablePathAssigned.values().stream() + .filter(value -> value < 0) + .collect(Collectors.toList()) + .size() + > 0 + ? 0 + : 1; } @Override @@ -74,10 +97,16 @@ public void handleSplitRequest(int subtaskId) {} @Override public void registerReader(int subtaskId) { readers.add(subtaskId); - if (assigned < 0) { - assigned = subtaskId; - context.assignSplit(subtaskId, new ClickhouseSourceSplit()); + List clickhouseSourceSplits = new ArrayList<>(); + for (TablePath tablePath : tablePathAssigned.keySet()) { + if (tablePathAssigned.get(tablePath) < 0) { + tablePathAssigned.put(tablePath, subtaskId); + ClickhouseSourceSplit clickhouseSourceSplit = new ClickhouseSourceSplit(); + clickhouseSourceSplit.setTablePath(tablePath); + clickhouseSourceSplits.add(clickhouseSourceSplit); + } } + context.assignSplit(subtaskId, clickhouseSourceSplits); } @Override diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java index 4c2b9cedb18..20d362e9d57 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java @@ -150,6 +150,13 @@ public void testClickhouseReplacingMergeTreeTableWithEnableDelete(TestContainer dropSinkTable(); } + @TestTemplate + public void testClickhouseSourceMultiTable(TestContainer container) throws Exception { + initializeClickhouseMergeTreeTable(); + Container.ExecResult execResult = container.executeJob("/multi_source_clickhouse.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + private void initConnection() throws Exception { final Properties info = new Properties(); info.put("user", this.container.getUsername()); @@ -162,15 +169,47 @@ private void initConnection() throws Exception { private void initializeClickhouseMergeTreeTable() { try { Statement statement = this.connection.createStatement(); - String sql = - String.format( - "create table if not exists %s.%s(\n" - + " `pk_id` Int64,\n" - + " `name` String,\n" - + " `score` Int32\n" - + ")engine=MergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", - DATABASE, SINK_TABLE); - statement.execute(sql); + List initSqlList = + Arrays.asList( + String.format( + "create table if not exists %s.%s(\n" + + " `pk_id` Int64,\n" + + " `name` String,\n" + + " `score` Int32\n" + + ")engine=MergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", + DATABASE, SINK_TABLE), + "create table if not exists default.t1\n" + + "(\n" + + " `course_id` UInt32,\n" + + " `course_name` String,\n" + + " `instructor` String\n" + + ")\n" + + "ENGINE = MergeTree\n" + + "ORDER BY course_id\n" + + "SETTINGS index_granularity = 8192", + "create table if not exists default.t2\n" + + "(\n" + + " `course_id` UInt32,\n" + + " `course_name` String,\n" + + " `instructor` String\n" + + ")\n" + + "ENGINE = MergeTree\n" + + "ORDER BY course_id\n" + + "SETTINGS index_granularity = 8192", + "INSERT INTO default.t1 (course_id, course_name, instructor)\n" + + "VALUES \n" + + "(1, 'Math', 'Mr. Smith'),\n" + + "(2, 'History', 'Ms. Johnson'),\n" + + "(3, 'Science', 'Dr. Brown')\n", + "INSERT INTO default.t2 (course_id, course_name, instructor)\n" + + "VALUES \n" + + "(1, 'Math', 'Mr. Smith'),\n" + + "(2, 'History', 'Ms. Johnson'),\n" + + "(3, 'Science', 'Dr. Brown')\n"); + for (String sql : initSqlList) { + statement.execute(sql); + } + } catch (SQLException e) { throw new RuntimeException("Initializing Clickhouse table failed!", e); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/multi_source_clickhouse.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/multi_source_clickhouse.conf new file mode 100644 index 00000000000..9beacaa33b6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/multi_source_clickhouse.conf @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Clickhouse { + host = "clickhouse:8123" + database = "default" + username = "default" + password = "" + server_time_zone = "UTC" + result_table_name = "test" + clickhouse.config = { + "socket_timeout": "300000" + } + table_list = [ + { + table_path = "t1" + sql = "select * from t1 where 1=1 " + + }, + { + table_path = "t2", + sql = "select * from t2" + } + ] + } +} + +sink { + Assert { + rules { + table-names = ["t1", "t2"] + } + } +} From 3cf3d2c7f47b05b2802d691a9706c63aa0f15410 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Fri, 30 Aug 2024 13:28:55 +0800 Subject: [PATCH 02/26] 1 --- .../seatunnel/clickhouse/source/ClickhouseSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java index 4f76d71a241..5e57c107d50 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java @@ -37,8 +37,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseCatalogConfig; import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseCatalogConfig; import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState; import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; From 6a667c6cdb42a1ebc1df3404e90b89db79a09c91 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Mon, 2 Sep 2024 15:35:53 +0800 Subject: [PATCH 03/26] 1 --- .../clickhouse/source/ClickhouseSource.java | 39 ++++++++++++++----- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java index 5e57c107d50..6932454923e 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java @@ -31,7 +31,10 @@ import org.apache.seatunnel.api.source.SupportParallelism; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -117,40 +120,56 @@ public void prepare(Config config) throws PrepareFailException { servers.get(ThreadLocalRandom.current().nextInt(servers.size())); ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config); + String sql = readonlyConfig.getOptional(SQL).orElse(null); + if (readonlyConfig.getOptional(TABLE_LIST).isPresent()) { readonlyConfig.get(TABLE_LIST).stream() .map(ReadonlyConfig::fromMap) .forEach( conf -> { + String confSql = conf.getOptional(SQL).get(); SeaTunnelRowType clickhouseRowType = - getClickhouseRowType( - currentServer, conf.getOptional(SQL).get()); + getClickhouseRowType(currentServer, confSql); TablePath tablePath = - TablePath.of(conf.getOptional(TABLE_PATH).get()); - + TablePath.of(conf.getOptional(TABLE_PATH).orElse("")); CatalogTable catalogTable = - CatalogTableUtil.getCatalogTable( - tablePath.getTableName(), clickhouseRowType); + createCatalogTable(clickhouseRowType, tablePath); + ClickhouseCatalogConfig clickhouseCatalogConfig = new ClickhouseCatalogConfig(); - clickhouseCatalogConfig.setSql(conf.getOptional(SQL).get()); + clickhouseCatalogConfig.setSql(confSql); clickhouseCatalogConfig.setCatalogTable(catalogTable); tableClickhouseCatalogConfigMap.put( tablePath, clickhouseCatalogConfig); }); } else { - SeaTunnelRowType clickhouseRowType = - getClickhouseRowType(currentServer, readonlyConfig.getOptional(SQL).get()); + SeaTunnelRowType clickhouseRowType = getClickhouseRowType(currentServer, sql); CatalogTable catalogTable = CatalogTableUtil.getCatalogTable(defaultTablePath, clickhouseRowType); + ClickhouseCatalogConfig clickhouseCatalogConfig = new ClickhouseCatalogConfig(); clickhouseCatalogConfig.setCatalogTable(catalogTable); - clickhouseCatalogConfig.setSql(readonlyConfig.getOptional(SQL).get()); + clickhouseCatalogConfig.setSql(sql); tableClickhouseCatalogConfigMap.put( TablePath.of(defaultTablePath), clickhouseCatalogConfig); } } + private CatalogTable createCatalogTable(SeaTunnelRowType rowType, TablePath tablePath) { + TableSchema.Builder schemaBuilder = TableSchema.builder(); + for (int i = 0; i < rowType.getTotalFields(); i++) { + schemaBuilder.column( + PhysicalColumn.of( + rowType.getFieldName(i), rowType.getFieldType(i), 0, true, null, null)); + } + return CatalogTable.of( + TableIdentifier.of("", tablePath), + schemaBuilder.build(), + Collections.emptyMap(), + Collections.emptyList(), + null); + } + public SeaTunnelRowType getClickhouseRowType(ClickHouseNode currentServer, String sql) { try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol()); ClickHouseResponse response = From 63b6d7d8aa592ae9a5097e6ba38a3945cfddeb36 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Thu, 5 Sep 2024 16:50:56 +0800 Subject: [PATCH 04/26] fix ci --- .../seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java index 7c5fbf4d139..aee379c40c2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java @@ -19,7 +19,9 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; @@ -145,6 +147,10 @@ public void testClickhouseReplacingMergeTreeTableWithEnableDelete(TestContainer } @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "The multi-catalog does not currently support the Spark Flink engine") public void testClickhouseSourceMultiTable(TestContainer container) throws Exception { initializeClickhouseMergeTreeTable(); Container.ExecResult execResult = container.executeJob("/multi_source_clickhouse.conf"); From aa515fc87c90074b3a261b0717ceb28e461fdd73 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Fri, 6 Sep 2024 19:11:30 +0800 Subject: [PATCH 05/26] fix ci --- .../source/ClickhouseSourceReader.java | 30 ++-- .../source/ClickhouseSourceSplit.java | 2 + .../ClickhouseSourceSplitEnumerator.java | 158 +++++++++++++----- .../state/ClickhouseSourceState.java | 15 +- 4 files changed, 148 insertions(+), 57 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java index 93609d0ab83..89a819640d6 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java @@ -35,11 +35,10 @@ import com.clickhouse.client.ClickHouseResponse; import java.io.IOException; -import java.util.Collections; +import java.util.ArrayList; import java.util.Deque; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Random; public class ClickhouseSourceReader implements SourceReader { @@ -51,17 +50,13 @@ public class ClickhouseSourceReader implements SourceReader request; - private Map tableClickhouseCatalogConfigMap; - private Deque splits = new LinkedList<>(); - ClickhouseSourceReader( - List servers, - SourceReader.Context readerContext, - Map tableClickhouseCatalogConfigMap) { + boolean noMoreSplit; + + ClickhouseSourceReader(List servers, SourceReader.Context readerContext) { this.servers = servers; this.readerContext = readerContext; - this.tableClickhouseCatalogConfigMap = tableClickhouseCatalogConfigMap; } @Override @@ -85,12 +80,11 @@ public void pollNext(Collector output) throws Exception { ClickhouseSourceSplit split = splits.poll(); if (split != null) { TablePath tablePath = split.getTablePath(); + ClickhouseCatalogConfig clickhouseCatalogConfig = + split.getClickhouseCatalogConfig(); + String sql = clickhouseCatalogConfig.getSql(); SeaTunnelRowType seaTunnelRowType = - tableClickhouseCatalogConfigMap - .get(tablePath) - .getCatalogTable() - .getSeaTunnelRowType(); - String sql = tableClickhouseCatalogConfigMap.get(tablePath).getSql(); + clickhouseCatalogConfig.getCatalogTable().getSeaTunnelRowType(); try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) { response.stream() .forEach( @@ -112,7 +106,7 @@ record -> { output.collect(seaTunnelRow); }); } - } else if (splits.isEmpty()) { + } else if (splits.isEmpty() && noMoreSplit) { // signal to the source that we have reached the end of the data. readerContext.signalNoMoreElement(); } else { @@ -123,7 +117,7 @@ record -> { @Override public List snapshotState(long checkpointId) throws Exception { - return Collections.emptyList(); + return new ArrayList<>(splits); } @Override @@ -132,7 +126,9 @@ public void addSplits(List splits) { } @Override - public void handleNoMoreSplits() {} + public void handleNoMoreSplits() { + noMoreSplit = true; + } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception {} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java index 995b6b944e5..28e0bc72d26 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseCatalogConfig; import lombok.Data; @@ -30,4 +31,5 @@ public String splitId() { } private TablePath tablePath; + private ClickhouseCatalogConfig clickhouseCatalogConfig; } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java index 40997956a83..1fdd5daf89b 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java @@ -22,34 +22,51 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseCatalogConfig; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; +import java.util.concurrent.ConcurrentLinkedQueue; public class ClickhouseSourceSplitEnumerator implements SourceSplitEnumerator { - private final Context context; - private final Set readers; - private volatile Map tablePathAssigned = new HashMap<>(); + private static final Logger log = + LoggerFactory.getLogger(ClickhouseSourceSplitEnumerator.class); - private Map tableClickhouseCatalogConfigMap; + private final Context context; + private final Map> pendingSplits; + private final ConcurrentLinkedQueue pendingTables; + private final Object stateLock = new Object(); + private final Map tableClickhouseCatalogConfigMap; - // TODO support read distributed engine use multi split - ClickhouseSourceSplitEnumerator( + public ClickhouseSourceSplitEnumerator( Context enumeratorContext, Map tableClickhouseCatalogConfigMap) { + this(enumeratorContext, tableClickhouseCatalogConfigMap, null); + } + + public ClickhouseSourceSplitEnumerator( + Context enumeratorContext, + Map tableClickhouseCatalogConfigMap, + ClickhouseSourceState checkpointState) { this.context = enumeratorContext; - this.readers = new HashSet<>(); this.tableClickhouseCatalogConfigMap = tableClickhouseCatalogConfigMap; - for (TablePath tablePath : tableClickhouseCatalogConfigMap.keySet()) { - tablePathAssigned.put(tablePath, -1); + if (checkpointState == null) { + this.pendingTables = + new ConcurrentLinkedQueue<>(tableClickhouseCatalogConfigMap.keySet()); + this.pendingSplits = new HashMap<>(); + } else { + this.pendingTables = new ConcurrentLinkedQueue<>(checkpointState.getPendingTables()); + this.pendingSplits = new HashMap<>(checkpointState.getPendingSplits()); } } @@ -57,38 +74,103 @@ public class ClickhouseSourceSplitEnumerator public void open() {} @Override - public void run() throws Exception {} + public void run() throws Exception { + Set readers = context.registeredReaders(); + while (!pendingTables.isEmpty()) { + synchronized (stateLock) { + TablePath tablePath = pendingTables.poll(); + log.info("Splitting table {}.", tablePath); + + Collection splits = + discoverySplits(tableClickhouseCatalogConfigMap.get(tablePath)); + log.info("Split table {} into {} splits.", tablePath, splits.size()); + + addPendingSplit(splits); + } + + synchronized (stateLock) { + assignSplit(readers); + } + } + } @Override public void close() throws IOException {} + private Collection discoverySplits( + ClickhouseCatalogConfig clickhouseCatalogConfig) { + // todo Multiple splits are returned while waiting to support slice reading + ClickhouseSourceSplit clickhouseSourceSplit = new ClickhouseSourceSplit(); + clickhouseSourceSplit.setTablePath( + clickhouseCatalogConfig.getCatalogTable().getTablePath()); + clickhouseSourceSplit.setClickhouseCatalogConfig(clickhouseCatalogConfig); + HashSet splitSet = new HashSet<>(); + splitSet.add(clickhouseSourceSplit); + return splitSet; + } + + private void addPendingSplit(Collection splits) { + int readerCount = context.currentParallelism(); + for (ClickhouseSourceSplit split : splits) { + // Since there's only one split for each tablePath, we'll assign the Task to the + // tablePath for now + int ownerReader = getSplitOwner(split.getTablePath().toString(), readerCount); + log.info("Assigning {} to {} reader.", split, ownerReader); + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(split); + } + } + + private void assignSplit(Collection readers) { + log.debug("Assign pendingSplits to readers {}", readers); + + for (int reader : readers) { + List assignmentForReader = pendingSplits.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + log.info("Assign splits {} to reader {}", assignmentForReader, reader); + try { + context.assignSplit(reader, assignmentForReader); + context.signalNoMoreSplits(reader); + } catch (Exception e) { + log.error( + "Failed to assign splits {} to reader {}", + assignmentForReader, + reader, + e); + pendingSplits.put(reader, assignmentForReader); + } + } + } + } + + private int getSplitOwner(String splitId, int numReaders) { + return (splitId.hashCode() & Integer.MAX_VALUE) % numReaders; + } + @Override public void addSplitsBack(List splits, int subtaskId) { - if (splits.isEmpty()) { - return; - } - for (TablePath tablePath : tableClickhouseCatalogConfigMap.keySet()) { - if (tablePathAssigned.get(tablePath) == subtaskId) { - Optional otherReader = - readers.stream().filter(r -> r != subtaskId).findAny(); - if (otherReader.isPresent()) { - context.assignSplit(otherReader.get(), splits); + if (!splits.isEmpty()) { + synchronized (stateLock) { + addPendingSplit(splits, subtaskId); + if (context.registeredReaders().contains(subtaskId)) { + assignSplit(Collections.singletonList(subtaskId)); } else { - tablePathAssigned.put(tablePath, -1); + log.warn( + "Reader {} is not registered. Pending splits {} are not assigned.", + subtaskId, + splits); } } } + log.info("Add back splits {} to ClickhouseSourceSplitEnumerator.", splits.size()); + } + + private void addPendingSplit(Collection splits, int ownerReader) { + pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>()).addAll(splits); } @Override public int currentUnassignedSplitSize() { - return tablePathAssigned.values().stream() - .filter(value -> value < 0) - .collect(Collectors.toList()) - .size() - > 0 - ? 0 - : 1; + return pendingSplits.size(); } @Override @@ -96,22 +178,20 @@ public void handleSplitRequest(int subtaskId) {} @Override public void registerReader(int subtaskId) { - readers.add(subtaskId); - List clickhouseSourceSplits = new ArrayList<>(); - for (TablePath tablePath : tablePathAssigned.keySet()) { - if (tablePathAssigned.get(tablePath) < 0) { - tablePathAssigned.put(tablePath, subtaskId); - ClickhouseSourceSplit clickhouseSourceSplit = new ClickhouseSourceSplit(); - clickhouseSourceSplit.setTablePath(tablePath); - clickhouseSourceSplits.add(clickhouseSourceSplit); + log.debug("Register reader {} to ClickhouseSourceSplitEnumerator.", subtaskId); + synchronized (stateLock) { + if (!pendingSplits.isEmpty()) { + assignSplit(Collections.singletonList(subtaskId)); } } - context.assignSplit(subtaskId, clickhouseSourceSplits); } @Override public ClickhouseSourceState snapshotState(long checkpointId) throws Exception { - return null; + synchronized (stateLock) { + return new ClickhouseSourceState( + new ArrayList(pendingTables), new HashMap<>(pendingSplits)); + } } @Override diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java index 3050436e7ee..09144a68ba1 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/ClickhouseSourceState.java @@ -17,6 +17,19 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.state; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceSplit; + +import lombok.AllArgsConstructor; +import lombok.Getter; + import java.io.Serializable; +import java.util.List; +import java.util.Map; -public class ClickhouseSourceState implements Serializable {} +@AllArgsConstructor +@Getter +public class ClickhouseSourceState implements Serializable { + private List pendingTables; + private Map> pendingSplits; +} From 7f32a8ab80161eb6ed7e1451befc5c4b86a7d478 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Fri, 6 Sep 2024 19:39:29 +0800 Subject: [PATCH 06/26] fix --- .../clickhouse/source/ClickhouseSource.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java index 6932454923e..eecee077b26 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java @@ -75,8 +75,8 @@ @AutoService(SeaTunnelSource.class) public class ClickhouseSource implements SeaTunnelSource, - SupportParallelism, - SupportColumnProjection { + SupportParallelism, + SupportColumnProjection { private List servers; private SeaTunnelRowType rowTypeInfo; @@ -101,10 +101,10 @@ public void prepare(Config config) throws PrepareFailException { Map customConfig = CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key()) ? config.getObject(CLICKHOUSE_CONFIG.key()).entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> entry.getValue().unwrapped().toString())) + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().unwrapped().toString())) : null; servers = @@ -172,11 +172,11 @@ private CatalogTable createCatalogTable(SeaTunnelRowType rowType, TablePath tabl public SeaTunnelRowType getClickhouseRowType(ClickHouseNode currentServer, String sql) { try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol()); - ClickHouseResponse response = - client.connect(currentServer) - .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) - .query(String.format("SELECT * FROM (%s) s LIMIT 1", sql)) - .executeAndWait()) { + ClickHouseResponse response = + client.connect(currentServer) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + .query(String.format("SELECT * FROM (%s) s LIMIT 1", sql)) + .executeAndWait()) { int columnSize = response.getColumns().size(); String[] fieldNames = new String[columnSize]; @@ -212,7 +212,7 @@ public List getProducedCatalogTables() { @Override public SourceReader createReader( SourceReader.Context readerContext) throws Exception { - return new ClickhouseSourceReader(servers, readerContext, tableClickhouseCatalogConfigMap); + return new ClickhouseSourceReader(servers, readerContext); } @Override @@ -229,6 +229,6 @@ public SourceSplitEnumerator resto ClickhouseSourceState checkpointState) throws Exception { return new ClickhouseSourceSplitEnumerator( - enumeratorContext, tableClickhouseCatalogConfigMap); + enumeratorContext, tableClickhouseCatalogConfigMap, checkpointState); } } From 5fa9fbc8510d5031eedd82e0c70030666745fac6 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Fri, 6 Sep 2024 19:40:15 +0800 Subject: [PATCH 07/26] fix --- .../clickhouse/source/ClickhouseSource.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java index eecee077b26..6f405d40863 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java @@ -75,8 +75,8 @@ @AutoService(SeaTunnelSource.class) public class ClickhouseSource implements SeaTunnelSource, - SupportParallelism, - SupportColumnProjection { + SupportParallelism, + SupportColumnProjection { private List servers; private SeaTunnelRowType rowTypeInfo; @@ -101,10 +101,10 @@ public void prepare(Config config) throws PrepareFailException { Map customConfig = CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key()) ? config.getObject(CLICKHOUSE_CONFIG.key()).entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> entry.getValue().unwrapped().toString())) + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().unwrapped().toString())) : null; servers = @@ -172,11 +172,11 @@ private CatalogTable createCatalogTable(SeaTunnelRowType rowType, TablePath tabl public SeaTunnelRowType getClickhouseRowType(ClickHouseNode currentServer, String sql) { try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol()); - ClickHouseResponse response = - client.connect(currentServer) - .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) - .query(String.format("SELECT * FROM (%s) s LIMIT 1", sql)) - .executeAndWait()) { + ClickHouseResponse response = + client.connect(currentServer) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + .query(String.format("SELECT * FROM (%s) s LIMIT 1", sql)) + .executeAndWait()) { int columnSize = response.getColumns().size(); String[] fieldNames = new String[columnSize]; From 7ef2baa281218b25af2c313552c5e8abad3bc7a9 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Sat, 7 Sep 2024 18:25:50 +0800 Subject: [PATCH 08/26] fix --- .../ClickhouseSinkCDCChangelogIT.java | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java index aee379c40c2..13568e10950 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java @@ -178,34 +178,34 @@ private void initializeClickhouseMergeTreeTable() { + " `score` Int32\n" + ")engine=MergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", DATABASE, SINK_TABLE), - "create table if not exists default.t1\n" + "CREATE TABLE IF NOT EXISTS default.t1\n" + "(\n" - + " `course_id` UInt32,\n" - + " `course_name` String,\n" - + " `instructor` String\n" - + ")\n" - + "ENGINE = MergeTree\n" - + "ORDER BY course_id\n" - + "SETTINGS index_granularity = 8192", - "create table if not exists default.t2\n" + + " student_id UInt32,\n" + + " student_name String,\n" + + " age UInt8,\n" + + " grade String\n" + + ") \n" + + "ENGINE = MergeTree()\n" + + "ORDER BY student_id\n", + "TRUNCATE table default.t1", + "INSERT INTO default.t1 (student_id, student_name, age, grade) VALUES\n" + + "(1, 'Alice', 20, 'A'),\n" + + "(2, 'Bob', 22, 'B'),\n" + + "(3, 'Charlie', 21, 'A')", + "CREATE TABLE IF NOT EXISTS default.t2\n" + "(\n" - + " `course_id` UInt32,\n" - + " `course_name` String,\n" - + " `instructor` String\n" - + ")\n" - + "ENGINE = MergeTree\n" - + "ORDER BY course_id\n" - + "SETTINGS index_granularity = 8192", - "INSERT INTO default.t1 (course_id, course_name, instructor)\n" - + "VALUES \n" - + "(1, 'Math', 'Mr. Smith'),\n" - + "(2, 'History', 'Ms. Johnson'),\n" - + "(3, 'Science', 'Dr. Brown')\n", - "INSERT INTO default.t2 (course_id, course_name, instructor)\n" - + "VALUES \n" - + "(1, 'Math', 'Mr. Smith'),\n" - + "(2, 'History', 'Ms. Johnson'),\n" - + "(3, 'Science', 'Dr. Brown')\n"); + + " student_id UInt32,\n" + + " student_name String,\n" + + " age UInt8,\n" + + " grade String\n" + + ") \n" + + "ENGINE = MergeTree()\n" + + "ORDER BY student_id\n", + "TRUNCATE table default.t2", + "INSERT INTO default.t2 (student_id, student_name, age, grade) VALUES\n" + + "(1, 'Alice', 20, 'A'),\n" + + "(2, 'Bob', 22, 'B'),\n" + + "(3, 'Charlie', 21, 'A')"); for (String sql : initSqlList) { statement.execute(sql); } From 813f763a7bd681653ce1e0bf27eef6c1c93b0379 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Mon, 9 Sep 2024 14:13:02 +0800 Subject: [PATCH 09/26] fix --- .../src/test/resources/multi_source_clickhouse.conf | 5 ----- 1 file changed, 5 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/multi_source_clickhouse.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/multi_source_clickhouse.conf index 9beacaa33b6..f3f825f6b80 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/multi_source_clickhouse.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/multi_source_clickhouse.conf @@ -29,11 +29,6 @@ source { database = "default" username = "default" password = "" - server_time_zone = "UTC" - result_table_name = "test" - clickhouse.config = { - "socket_timeout": "300000" - } table_list = [ { table_path = "t1" From 471a473d2107483639f1c61cb598ba5ec2ac8d80 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Mon, 9 Sep 2024 19:42:08 +0800 Subject: [PATCH 10/26] fix --- .../clickhouse/ClickhouseSinkCDCChangelogIT.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java index 13568e10950..8ab0763d9cc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java @@ -28,6 +28,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.containers.ClickHouseContainer; import org.testcontainers.containers.Container; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -52,6 +54,9 @@ @Slf4j public class ClickhouseSinkCDCChangelogIT extends TestSuiteBase implements TestResource { + + private static final Logger LOG = LoggerFactory.getLogger(ClickhouseSinkCDCChangelogIT.class); + private static final String CLICKHOUSE_DOCKER_IMAGE = "clickhouse/clickhouse-server:23.3.13.6"; private static final String HOST = "clickhouse"; private static final String DRIVER_CLASS = "com.clickhouse.jdbc.ClickHouseDriver"; @@ -155,6 +160,11 @@ public void testClickhouseSourceMultiTable(TestContainer container) throws Excep initializeClickhouseMergeTreeTable(); Container.ExecResult execResult = container.executeJob("/multi_source_clickhouse.conf"); Assertions.assertEquals(0, execResult.getExitCode()); + LOG.info( + "testClickhouseSourceMultiTable Command executed with exit code: " + + execResult.getExitCode()); + LOG.info("testClickhouseSourceMultiTable Stdout: " + execResult.getStdout()); + LOG.info("testClickhouseSourceMultiTable Stderr: " + execResult.getStderr()); } private void initConnection() throws Exception { From 9b4076ea87cde3ad01ab30a09d1ac7fda0ad41e0 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Tue, 10 Sep 2024 19:10:35 +0800 Subject: [PATCH 11/26] fix --- .../clickhouse/ClickhouseSinkCDCChangelogIT.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java index 8ab0763d9cc..755d12cf338 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java @@ -157,9 +157,16 @@ public void testClickhouseReplacingMergeTreeTableWithEnableDelete(TestContainer type = {EngineType.SPARK, EngineType.FLINK}, disabledReason = "The multi-catalog does not currently support the Spark Flink engine") public void testClickhouseSourceMultiTable(TestContainer container) throws Exception { + Container.ExecResult execResult = null; initializeClickhouseMergeTreeTable(); - Container.ExecResult execResult = container.executeJob("/multi_source_clickhouse.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); + try { + LOG.info("Step 2: Executing job with /multi_source_clickhouse.conf"); + execResult = container.executeJob("/multi_source_clickhouse.conf"); + } catch (NullPointerException e) { + LOG.error("NullPointerException in container.executeJob", e); + throw e; + } + Assertions.assertEquals(1, execResult); LOG.info( "testClickhouseSourceMultiTable Command executed with exit code: " + execResult.getExitCode()); From d126d284fbf77c33a5c7b97e8e3e2f7e82babbf4 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Thu, 12 Sep 2024 11:25:07 +0800 Subject: [PATCH 12/26] fix --- .../seatunnel/clickhouse/ClickhouseIT.java | 29 +++++-- .../ClickhouseSinkCDCChangelogIT.java | 80 +++---------------- .../test/resources/init/clickhouse_init.conf | 15 ++++ 3 files changed, 47 insertions(+), 77 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index 66ee281740c..126ec1b4049 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -27,7 +27,9 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.util.ContainerUtil; import org.awaitility.Awaitility; @@ -81,6 +83,8 @@ public class ClickhouseIT extends TestSuiteBase implements TestResource { private static final String DRIVER_CLASS = "com.clickhouse.jdbc.ClickHouseDriver"; private static final String INIT_CLICKHOUSE_PATH = "/init/clickhouse_init.conf"; private static final String CLICKHOUSE_JOB_CONFIG = "/clickhouse_to_clickhouse.conf"; + private static final String CLICKHOUSE_MULTI_LIST_TABLE_CONFIG = + "/multi_source_clickhouse.conf"; private static final String DATABASE = "default"; private static final String SOURCE_TABLE = "source_table"; private static final String SINK_TABLE = "sink_table"; @@ -92,13 +96,22 @@ public class ClickhouseIT extends TestSuiteBase implements TestResource { private ClickHouseContainer container; private Connection connection; + // @TestTemplate + // public void testClickhouse(TestContainer container) throws Exception { + // Container.ExecResult execResult = container.executeJob(CLICKHOUSE_JOB_CONFIG); + // Assertions.assertEquals(0, execResult.getExitCode()); + // assertHasData(SINK_TABLE); + // compareResult(); + // clearSinkTable(); + // } @TestTemplate - public void testClickhouse(TestContainer container) throws Exception { - Container.ExecResult execResult = container.executeJob(CLICKHOUSE_JOB_CONFIG); + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "The multi-catalog does not currently support the Spark Flink engine") + public void testClickhouseMultiSource(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob(CLICKHOUSE_MULTI_LIST_TABLE_CONFIG); Assertions.assertEquals(0, execResult.getExitCode()); - assertHasData(SINK_TABLE); - compareResult(); - clearSinkTable(); } @BeforeAll @@ -125,7 +138,11 @@ public void startUp() throws Exception { private void initializeClickhouseTable() { try { Statement statement = this.connection.createStatement(); - statement.execute(CONFIG.getString(SOURCE_TABLE)); + for (String sourceSql : CONFIG.getString(SOURCE_TABLE).split(";")) { + if (!sourceSql.trim().isEmpty() && sourceSql != null) { + statement.execute(sourceSql); + } + } statement.execute(CONFIG.getString(SINK_TABLE)); } catch (SQLException e) { throw new RuntimeException("Initializing Clickhouse table failed!", e); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java index 755d12cf338..5d9c1c848b2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java @@ -19,17 +19,13 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testcontainers.containers.ClickHouseContainer; import org.testcontainers.containers.Container; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -54,9 +50,6 @@ @Slf4j public class ClickhouseSinkCDCChangelogIT extends TestSuiteBase implements TestResource { - - private static final Logger LOG = LoggerFactory.getLogger(ClickhouseSinkCDCChangelogIT.class); - private static final String CLICKHOUSE_DOCKER_IMAGE = "clickhouse/clickhouse-server:23.3.13.6"; private static final String HOST = "clickhouse"; private static final String DRIVER_CLASS = "com.clickhouse.jdbc.ClickHouseDriver"; @@ -151,29 +144,6 @@ public void testClickhouseReplacingMergeTreeTableWithEnableDelete(TestContainer dropSinkTable(); } - @TestTemplate - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK, EngineType.FLINK}, - disabledReason = "The multi-catalog does not currently support the Spark Flink engine") - public void testClickhouseSourceMultiTable(TestContainer container) throws Exception { - Container.ExecResult execResult = null; - initializeClickhouseMergeTreeTable(); - try { - LOG.info("Step 2: Executing job with /multi_source_clickhouse.conf"); - execResult = container.executeJob("/multi_source_clickhouse.conf"); - } catch (NullPointerException e) { - LOG.error("NullPointerException in container.executeJob", e); - throw e; - } - Assertions.assertEquals(1, execResult); - LOG.info( - "testClickhouseSourceMultiTable Command executed with exit code: " - + execResult.getExitCode()); - LOG.info("testClickhouseSourceMultiTable Stdout: " + execResult.getStdout()); - LOG.info("testClickhouseSourceMultiTable Stderr: " + execResult.getStderr()); - } - private void initConnection() throws Exception { final Properties info = new Properties(); info.put("user", this.container.getUsername()); @@ -186,47 +156,15 @@ private void initConnection() throws Exception { private void initializeClickhouseMergeTreeTable() { try { Statement statement = this.connection.createStatement(); - List initSqlList = - Arrays.asList( - String.format( - "create table if not exists %s.%s(\n" - + " `pk_id` Int64,\n" - + " `name` String,\n" - + " `score` Int32\n" - + ")engine=MergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", - DATABASE, SINK_TABLE), - "CREATE TABLE IF NOT EXISTS default.t1\n" - + "(\n" - + " student_id UInt32,\n" - + " student_name String,\n" - + " age UInt8,\n" - + " grade String\n" - + ") \n" - + "ENGINE = MergeTree()\n" - + "ORDER BY student_id\n", - "TRUNCATE table default.t1", - "INSERT INTO default.t1 (student_id, student_name, age, grade) VALUES\n" - + "(1, 'Alice', 20, 'A'),\n" - + "(2, 'Bob', 22, 'B'),\n" - + "(3, 'Charlie', 21, 'A')", - "CREATE TABLE IF NOT EXISTS default.t2\n" - + "(\n" - + " student_id UInt32,\n" - + " student_name String,\n" - + " age UInt8,\n" - + " grade String\n" - + ") \n" - + "ENGINE = MergeTree()\n" - + "ORDER BY student_id\n", - "TRUNCATE table default.t2", - "INSERT INTO default.t2 (student_id, student_name, age, grade) VALUES\n" - + "(1, 'Alice', 20, 'A'),\n" - + "(2, 'Bob', 22, 'B'),\n" - + "(3, 'Charlie', 21, 'A')"); - for (String sql : initSqlList) { - statement.execute(sql); - } - + String sql = + String.format( + "create table if not exists %s.%s(\n" + + " `pk_id` Int64,\n" + + " `name` String,\n" + + " `score` Int32\n" + + ")engine=MergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", + DATABASE, SINK_TABLE); + statement.execute(sql); } catch (SQLException e) { throw new RuntimeException("Initializing Clickhouse table failed!", e); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf index 78f2daa1d79..889609e57d7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf @@ -52,6 +52,21 @@ create table if not exists `default`.source_table( `c_point` Point, `c_ring` Ring )engine=Memory; +create table if not exists `default`.t1 ( + `id` Int64, + field2 String +) ENGINE = Memory; + +INSERT INTO `default`.t1 VALUES (1, 'value1'), (2, 'value2'); + +create table if not exists `default`.t2 ( + `id` Int64, + field2 String +) ENGINE = Memory; + + +INSERT INTO `default`.t2 VALUES (1, 'value1'), (2, 'value2'); + """ sink_table = """ From c173cbe1e4702cbc4c3798d69b1417d2268009be Mon Sep 17 00:00:00 2001 From: zhilinli Date: Thu, 12 Sep 2024 19:19:32 +0800 Subject: [PATCH 13/26] fix --- .../seatunnel/clickhouse/ClickhouseIT.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index 126ec1b4049..aecf30121e6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -96,14 +96,15 @@ public class ClickhouseIT extends TestSuiteBase implements TestResource { private ClickHouseContainer container; private Connection connection; - // @TestTemplate - // public void testClickhouse(TestContainer container) throws Exception { - // Container.ExecResult execResult = container.executeJob(CLICKHOUSE_JOB_CONFIG); - // Assertions.assertEquals(0, execResult.getExitCode()); - // assertHasData(SINK_TABLE); - // compareResult(); - // clearSinkTable(); - // } + @TestTemplate + public void testClickhouse(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob(CLICKHOUSE_JOB_CONFIG); + Assertions.assertEquals(0, execResult.getExitCode()); + assertHasData(SINK_TABLE); + compareResult(); + clearSinkTable(); + } + @TestTemplate @DisabledOnContainer( value = {}, From cd7e8a1b929c9fc461e5201ab775c004e1ed1314 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Fri, 13 Sep 2024 17:35:49 +0800 Subject: [PATCH 14/26] fix --- .../seatunnel/clickhouse/source/ClickhouseSourceReader.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java index 89a819640d6..26769069579 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java @@ -102,8 +102,10 @@ record -> { } } SeaTunnelRow seaTunnelRow = new SeaTunnelRow(values); - seaTunnelRow.setTableId(String.valueOf(tablePath)); - output.collect(seaTunnelRow); + if (seaTunnelRow != null) { + seaTunnelRow.setTableId(String.valueOf(tablePath)); + output.collect(seaTunnelRow); + } }); } } else if (splits.isEmpty() && noMoreSplit) { From 6e40d8957a23cf9bb7c2adc3c81b28215424f2ef Mon Sep 17 00:00:00 2001 From: zhilinli Date: Thu, 17 Oct 2024 15:45:23 +0800 Subject: [PATCH 15/26] 1 --- .../ClickhouseSinkCDCChangelogIT.java | 450 +++++++++--------- 1 file changed, 225 insertions(+), 225 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java index 5d9c1c848b2..29b77bd684b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java @@ -1,225 +1,225 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.clickhouse; - -import org.apache.seatunnel.e2e.common.TestResource; -import org.apache.seatunnel.e2e.common.TestSuiteBase; -import org.apache.seatunnel.e2e.common.container.TestContainer; - -import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.TestTemplate; -import org.testcontainers.containers.ClickHouseContainer; -import org.testcontainers.containers.Container; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.utility.DockerLoggerFactory; - -import lombok.extern.slf4j.Slf4j; - -import java.sql.Connection; -import java.sql.Driver; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -@Slf4j -public class ClickhouseSinkCDCChangelogIT extends TestSuiteBase implements TestResource { - private static final String CLICKHOUSE_DOCKER_IMAGE = "clickhouse/clickhouse-server:23.3.13.6"; - private static final String HOST = "clickhouse"; - private static final String DRIVER_CLASS = "com.clickhouse.jdbc.ClickHouseDriver"; - private static final String DATABASE = "default"; - private static final String SINK_TABLE = "sink_table"; - private ClickHouseContainer container; - private Connection connection; - - @BeforeAll - @Override - public void startUp() throws Exception { - this.container = - new ClickHouseContainer(CLICKHOUSE_DOCKER_IMAGE) - .withNetwork(NETWORK) - .withNetworkAliases(HOST) - .withExposedPorts(8123) - .withLogConsumer( - new Slf4jLogConsumer( - DockerLoggerFactory.getLogger(CLICKHOUSE_DOCKER_IMAGE))); - Startables.deepStart(Stream.of(this.container)).join(); - log.info("Clickhouse container started"); - Awaitility.given() - .ignoreExceptions() - .await() - .atMost(360L, TimeUnit.SECONDS) - .untilAsserted(this::initConnection); - } - - @AfterAll - @Override - public void tearDown() throws Exception { - if (this.connection != null) { - this.connection.close(); - } - if (this.container != null) { - this.container.stop(); - } - } - - @TestTemplate - public void testClickhouseMergeTreeTable(TestContainer container) throws Exception { - initializeClickhouseMergeTreeTable(); - - Container.ExecResult execResult = - container.executeJob("/clickhouse_sink_cdc_changelog_case1.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - - checkSinkTableRows(); - dropSinkTable(); - } - - @TestTemplate - public void testClickhouseMergeTreeTableWithEnableDelete(TestContainer container) - throws Exception { - initializeClickhouseMergeTreeTable(); - - Container.ExecResult execResult = - container.executeJob("/clickhouse_sink_cdc_changelog_case2.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - - Awaitility.given() - .ignoreExceptions() - .await() - .atLeast(100L, TimeUnit.MILLISECONDS) - .atMost(20L, TimeUnit.SECONDS) - .untilAsserted(this::checkSinkTableRows); - dropSinkTable(); - } - - @TestTemplate - public void testClickhouseReplacingMergeTreeTable(TestContainer container) throws Exception { - initializeClickhouseReplacingMergeTreeTable(); - - Container.ExecResult execResult = - container.executeJob("/clickhouse_sink_cdc_changelog_case1.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - - checkSinkTableRows(); - dropSinkTable(); - } - - @TestTemplate - public void testClickhouseReplacingMergeTreeTableWithEnableDelete(TestContainer container) - throws Exception { - initializeClickhouseReplacingMergeTreeTable(); - - Container.ExecResult execResult = - container.executeJob("/clickhouse_sink_cdc_changelog_case2.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); - - checkSinkTableRows(); - dropSinkTable(); - } - - private void initConnection() throws Exception { - final Properties info = new Properties(); - info.put("user", this.container.getUsername()); - info.put("password", this.container.getPassword()); - this.connection = - ((Driver) Class.forName(DRIVER_CLASS).newInstance()) - .connect(this.container.getJdbcUrl(), info); - } - - private void initializeClickhouseMergeTreeTable() { - try { - Statement statement = this.connection.createStatement(); - String sql = - String.format( - "create table if not exists %s.%s(\n" - + " `pk_id` Int64,\n" - + " `name` String,\n" - + " `score` Int32\n" - + ")engine=MergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", - DATABASE, SINK_TABLE); - statement.execute(sql); - } catch (SQLException e) { - throw new RuntimeException("Initializing Clickhouse table failed!", e); - } - } - - private void initializeClickhouseReplacingMergeTreeTable() { - try { - Statement statement = this.connection.createStatement(); - String sql = - String.format( - "create table if not exists %s.%s(\n" - + " `pk_id` Int64,\n" - + " `name` String,\n" - + " `score` Int32\n" - + ")engine=ReplacingMergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", - DATABASE, SINK_TABLE); - statement.execute(sql); - } catch (SQLException e) { - throw new RuntimeException("Initializing Clickhouse table failed!", e); - } - } - - private void checkSinkTableRows() throws SQLException { - Set> actual = new HashSet<>(); - try (Statement statement = connection.createStatement(); - ResultSet resultSet = - statement.executeQuery( - String.format("select * from %s.%s", DATABASE, SINK_TABLE))) { - while (resultSet.next()) { - List row = - Arrays.asList( - resultSet.getLong("pk_id"), - resultSet.getString("name"), - resultSet.getInt("score")); - actual.add(row); - } - } - Set> expected = - Stream.>of(Arrays.asList(1L, "A_1", 100), Arrays.asList(3L, "C", 100)) - .collect(Collectors.toSet()); - if (!Arrays.equals(actual.toArray(), expected.toArray())) { - throw new IllegalStateException( - String.format( - "Actual results %s not equal expected results %s", - Arrays.toString(actual.toArray()), - Arrays.toString(expected.toArray()))); - } - } - - private void dropSinkTable() { - try (Statement statement = connection.createStatement()) { - statement.execute( - String.format("drop table if exists %s.%s sync", DATABASE, SINK_TABLE)); - } catch (SQLException e) { - throw new RuntimeException("Test clickhouse server image error", e); - } - } -} +///* +// * Licensed to the Apache Software Foundation (ASF) under one or more +// * contributor license agreements. See the NOTICE file distributed with +// * this work for additional information regarding copyright ownership. +// * The ASF licenses this file to You under the Apache License, Version 2.0 +// * (the "License"); you may not use this file except in compliance with +// * the License. You may obtain a copy of the License at +// * +// * http://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// */ +// +//package org.apache.seatunnel.connectors.seatunnel.clickhouse; +// +//import org.apache.seatunnel.e2e.common.TestResource; +//import org.apache.seatunnel.e2e.common.TestSuiteBase; +//import org.apache.seatunnel.e2e.common.container.TestContainer; +// +//import org.awaitility.Awaitility; +//import org.junit.jupiter.api.AfterAll; +//import org.junit.jupiter.api.Assertions; +//import org.junit.jupiter.api.BeforeAll; +//import org.junit.jupiter.api.TestTemplate; +//import org.testcontainers.containers.ClickHouseContainer; +//import org.testcontainers.containers.Container; +//import org.testcontainers.containers.output.Slf4jLogConsumer; +//import org.testcontainers.lifecycle.Startables; +//import org.testcontainers.utility.DockerLoggerFactory; +// +//import lombok.extern.slf4j.Slf4j; +// +//import java.sql.Connection; +//import java.sql.Driver; +//import java.sql.ResultSet; +//import java.sql.SQLException; +//import java.sql.Statement; +//import java.util.Arrays; +//import java.util.HashSet; +//import java.util.List; +//import java.util.Properties; +//import java.util.Set; +//import java.util.concurrent.TimeUnit; +//import java.util.stream.Collectors; +//import java.util.stream.Stream; +// +//@Slf4j +//public class ClickhouseSinkCDCChangelogIT extends TestSuiteBase implements TestResource { +// private static final String CLICKHOUSE_DOCKER_IMAGE = "clickhouse/clickhouse-server:23.3.13.6"; +// private static final String HOST = "clickhouse"; +// private static final String DRIVER_CLASS = "com.clickhouse.jdbc.ClickHouseDriver"; +// private static final String DATABASE = "default"; +// private static final String SINK_TABLE = "sink_table"; +// private ClickHouseContainer container; +// private Connection connection; +// +// @BeforeAll +// @Override +// public void startUp() throws Exception { +// this.container = +// new ClickHouseContainer(CLICKHOUSE_DOCKER_IMAGE) +// .withNetwork(NETWORK) +// .withNetworkAliases(HOST) +// .withExposedPorts(8123) +// .withLogConsumer( +// new Slf4jLogConsumer( +// DockerLoggerFactory.getLogger(CLICKHOUSE_DOCKER_IMAGE))); +// Startables.deepStart(Stream.of(this.container)).join(); +// log.info("Clickhouse container started"); +// Awaitility.given() +// .ignoreExceptions() +// .await() +// .atMost(360L, TimeUnit.SECONDS) +// .untilAsserted(this::initConnection); +// } +// +// @AfterAll +// @Override +// public void tearDown() throws Exception { +// if (this.connection != null) { +// this.connection.close(); +// } +// if (this.container != null) { +// this.container.stop(); +// } +// } +// +// @TestTemplate +// public void testClickhouseMergeTreeTable(TestContainer container) throws Exception { +// initializeClickhouseMergeTreeTable(); +// +// Container.ExecResult execResult = +// container.executeJob("/clickhouse_sink_cdc_changelog_case1.conf"); +// Assertions.assertEquals(0, execResult.getExitCode()); +// +// checkSinkTableRows(); +// dropSinkTable(); +// } +// +// @TestTemplate +// public void testClickhouseMergeTreeTableWithEnableDelete(TestContainer container) +// throws Exception { +// initializeClickhouseMergeTreeTable(); +// +// Container.ExecResult execResult = +// container.executeJob("/clickhouse_sink_cdc_changelog_case2.conf"); +// Assertions.assertEquals(0, execResult.getExitCode()); +// +// Awaitility.given() +// .ignoreExceptions() +// .await() +// .atLeast(100L, TimeUnit.MILLISECONDS) +// .atMost(20L, TimeUnit.SECONDS) +// .untilAsserted(this::checkSinkTableRows); +// dropSinkTable(); +// } +// +// @TestTemplate +// public void testClickhouseReplacingMergeTreeTable(TestContainer container) throws Exception { +// initializeClickhouseReplacingMergeTreeTable(); +// +// Container.ExecResult execResult = +// container.executeJob("/clickhouse_sink_cdc_changelog_case1.conf"); +// Assertions.assertEquals(0, execResult.getExitCode()); +// +// checkSinkTableRows(); +// dropSinkTable(); +// } +// +// @TestTemplate +// public void testClickhouseReplacingMergeTreeTableWithEnableDelete(TestContainer container) +// throws Exception { +// initializeClickhouseReplacingMergeTreeTable(); +// +// Container.ExecResult execResult = +// container.executeJob("/clickhouse_sink_cdc_changelog_case2.conf"); +// Assertions.assertEquals(0, execResult.getExitCode()); +// +// checkSinkTableRows(); +// dropSinkTable(); +// } +// +// private void initConnection() throws Exception { +// final Properties info = new Properties(); +// info.put("user", this.container.getUsername()); +// info.put("password", this.container.getPassword()); +// this.connection = +// ((Driver) Class.forName(DRIVER_CLASS).newInstance()) +// .connect(this.container.getJdbcUrl(), info); +// } +// +// private void initializeClickhouseMergeTreeTable() { +// try { +// Statement statement = this.connection.createStatement(); +// String sql = +// String.format( +// "create table if not exists %s.%s(\n" +// + " `pk_id` Int64,\n" +// + " `name` String,\n" +// + " `score` Int32\n" +// + ")engine=MergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", +// DATABASE, SINK_TABLE); +// statement.execute(sql); +// } catch (SQLException e) { +// throw new RuntimeException("Initializing Clickhouse table failed!", e); +// } +// } +// +// private void initializeClickhouseReplacingMergeTreeTable() { +// try { +// Statement statement = this.connection.createStatement(); +// String sql = +// String.format( +// "create table if not exists %s.%s(\n" +// + " `pk_id` Int64,\n" +// + " `name` String,\n" +// + " `score` Int32\n" +// + ")engine=ReplacingMergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", +// DATABASE, SINK_TABLE); +// statement.execute(sql); +// } catch (SQLException e) { +// throw new RuntimeException("Initializing Clickhouse table failed!", e); +// } +// } +// +// private void checkSinkTableRows() throws SQLException { +// Set> actual = new HashSet<>(); +// try (Statement statement = connection.createStatement(); +// ResultSet resultSet = +// statement.executeQuery( +// String.format("select * from %s.%s", DATABASE, SINK_TABLE))) { +// while (resultSet.next()) { +// List row = +// Arrays.asList( +// resultSet.getLong("pk_id"), +// resultSet.getString("name"), +// resultSet.getInt("score")); +// actual.add(row); +// } +// } +// Set> expected = +// Stream.>of(Arrays.asList(1L, "A_1", 100), Arrays.asList(3L, "C", 100)) +// .collect(Collectors.toSet()); +// if (!Arrays.equals(actual.toArray(), expected.toArray())) { +// throw new IllegalStateException( +// String.format( +// "Actual results %s not equal expected results %s", +// Arrays.toString(actual.toArray()), +// Arrays.toString(expected.toArray()))); +// } +// } +// +// private void dropSinkTable() { +// try (Statement statement = connection.createStatement()) { +// statement.execute( +// String.format("drop table if exists %s.%s sync", DATABASE, SINK_TABLE)); +// } catch (SQLException e) { +// throw new RuntimeException("Test clickhouse server image error", e); +// } +// } +//} From 56ed9da8b033bb106d246389448ba6d9ad3545bf Mon Sep 17 00:00:00 2001 From: zhilinli Date: Thu, 17 Oct 2024 15:47:11 +0800 Subject: [PATCH 16/26] 1 --- .../ClickhouseSinkCDCChangelogIT.java | 83 ++++++++++--------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java index 29b77bd684b..ca932c3f0cd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java @@ -1,4 +1,4 @@ -///* +/// * // * Licensed to the Apache Software Foundation (ASF) under one or more // * contributor license agreements. See the NOTICE file distributed with // * this work for additional information regarding copyright ownership. @@ -15,42 +15,43 @@ // * limitations under the License. // */ // -//package org.apache.seatunnel.connectors.seatunnel.clickhouse; -// -//import org.apache.seatunnel.e2e.common.TestResource; -//import org.apache.seatunnel.e2e.common.TestSuiteBase; -//import org.apache.seatunnel.e2e.common.container.TestContainer; -// -//import org.awaitility.Awaitility; -//import org.junit.jupiter.api.AfterAll; -//import org.junit.jupiter.api.Assertions; -//import org.junit.jupiter.api.BeforeAll; -//import org.junit.jupiter.api.TestTemplate; -//import org.testcontainers.containers.ClickHouseContainer; -//import org.testcontainers.containers.Container; -//import org.testcontainers.containers.output.Slf4jLogConsumer; -//import org.testcontainers.lifecycle.Startables; -//import org.testcontainers.utility.DockerLoggerFactory; -// -//import lombok.extern.slf4j.Slf4j; -// -//import java.sql.Connection; -//import java.sql.Driver; -//import java.sql.ResultSet; -//import java.sql.SQLException; -//import java.sql.Statement; -//import java.util.Arrays; -//import java.util.HashSet; -//import java.util.List; -//import java.util.Properties; -//import java.util.Set; -//import java.util.concurrent.TimeUnit; -//import java.util.stream.Collectors; -//import java.util.stream.Stream; -// -//@Slf4j -//public class ClickhouseSinkCDCChangelogIT extends TestSuiteBase implements TestResource { -// private static final String CLICKHOUSE_DOCKER_IMAGE = "clickhouse/clickhouse-server:23.3.13.6"; +// package org.apache.seatunnel.connectors.seatunnel.clickhouse; +// +// import org.apache.seatunnel.e2e.common.TestResource; +// import org.apache.seatunnel.e2e.common.TestSuiteBase; +// import org.apache.seatunnel.e2e.common.container.TestContainer; +// +// import org.awaitility.Awaitility; +// import org.junit.jupiter.api.AfterAll; +// import org.junit.jupiter.api.Assertions; +// import org.junit.jupiter.api.BeforeAll; +// import org.junit.jupiter.api.TestTemplate; +// import org.testcontainers.containers.ClickHouseContainer; +// import org.testcontainers.containers.Container; +// import org.testcontainers.containers.output.Slf4jLogConsumer; +// import org.testcontainers.lifecycle.Startables; +// import org.testcontainers.utility.DockerLoggerFactory; +// +// import lombok.extern.slf4j.Slf4j; +// +// import java.sql.Connection; +// import java.sql.Driver; +// import java.sql.ResultSet; +// import java.sql.SQLException; +// import java.sql.Statement; +// import java.util.Arrays; +// import java.util.HashSet; +// import java.util.List; +// import java.util.Properties; +// import java.util.Set; +// import java.util.concurrent.TimeUnit; +// import java.util.stream.Collectors; +// import java.util.stream.Stream; +// +// @Slf4j +// public class ClickhouseSinkCDCChangelogIT extends TestSuiteBase implements TestResource { +// private static final String CLICKHOUSE_DOCKER_IMAGE = +// "clickhouse/clickhouse-server:23.3.13.6"; // private static final String HOST = "clickhouse"; // private static final String DRIVER_CLASS = "com.clickhouse.jdbc.ClickHouseDriver"; // private static final String DATABASE = "default"; @@ -179,7 +180,8 @@ // + " `pk_id` Int64,\n" // + " `name` String,\n" // + " `score` Int32\n" -// + ")engine=ReplacingMergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", +// + ")engine=ReplacingMergeTree ORDER BY(pk_id) PRIMARY +// KEY(pk_id)", // DATABASE, SINK_TABLE); // statement.execute(sql); // } catch (SQLException e) { @@ -203,7 +205,8 @@ // } // } // Set> expected = -// Stream.>of(Arrays.asList(1L, "A_1", 100), Arrays.asList(3L, "C", 100)) +// Stream.>of(Arrays.asList(1L, "A_1", 100), Arrays.asList(3L, "C", +// 100)) // .collect(Collectors.toSet()); // if (!Arrays.equals(actual.toArray(), expected.toArray())) { // throw new IllegalStateException( @@ -222,4 +225,4 @@ // throw new RuntimeException("Test clickhouse server image error", e); // } // } -//} +// } From 7e6dc6553bd915d0c1a491058c85291467968b39 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Fri, 25 Oct 2024 17:31:11 +0800 Subject: [PATCH 17/26] 1 --- .../seatunnel/clickhouse/ClickhouseIT.java | 1 + .../ClickhouseSinkCDCChangelogIT.java | 453 +++++++++--------- 2 files changed, 226 insertions(+), 228 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index aecf30121e6..f016136c3e7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -112,6 +112,7 @@ public void testClickhouse(TestContainer container) throws Exception { disabledReason = "The multi-catalog does not currently support the Spark Flink engine") public void testClickhouseMultiSource(TestContainer container) throws Exception { Container.ExecResult execResult = container.executeJob(CLICKHOUSE_MULTI_LIST_TABLE_CONFIG); + Thread.sleep(3000); Assertions.assertEquals(0, execResult.getExitCode()); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java index ca932c3f0cd..5d9c1c848b2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java @@ -1,228 +1,225 @@ -/// * -// * Licensed to the Apache Software Foundation (ASF) under one or more -// * contributor license agreements. See the NOTICE file distributed with -// * this work for additional information regarding copyright ownership. -// * The ASF licenses this file to You under the Apache License, Version 2.0 -// * (the "License"); you may not use this file except in compliance with -// * the License. You may obtain a copy of the License at -// * -// * http://www.apache.org/licenses/LICENSE-2.0 -// * -// * Unless required by applicable law or agreed to in writing, software -// * distributed under the License is distributed on an "AS IS" BASIS, -// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// * See the License for the specific language governing permissions and -// * limitations under the License. -// */ -// -// package org.apache.seatunnel.connectors.seatunnel.clickhouse; -// -// import org.apache.seatunnel.e2e.common.TestResource; -// import org.apache.seatunnel.e2e.common.TestSuiteBase; -// import org.apache.seatunnel.e2e.common.container.TestContainer; -// -// import org.awaitility.Awaitility; -// import org.junit.jupiter.api.AfterAll; -// import org.junit.jupiter.api.Assertions; -// import org.junit.jupiter.api.BeforeAll; -// import org.junit.jupiter.api.TestTemplate; -// import org.testcontainers.containers.ClickHouseContainer; -// import org.testcontainers.containers.Container; -// import org.testcontainers.containers.output.Slf4jLogConsumer; -// import org.testcontainers.lifecycle.Startables; -// import org.testcontainers.utility.DockerLoggerFactory; -// -// import lombok.extern.slf4j.Slf4j; -// -// import java.sql.Connection; -// import java.sql.Driver; -// import java.sql.ResultSet; -// import java.sql.SQLException; -// import java.sql.Statement; -// import java.util.Arrays; -// import java.util.HashSet; -// import java.util.List; -// import java.util.Properties; -// import java.util.Set; -// import java.util.concurrent.TimeUnit; -// import java.util.stream.Collectors; -// import java.util.stream.Stream; -// -// @Slf4j -// public class ClickhouseSinkCDCChangelogIT extends TestSuiteBase implements TestResource { -// private static final String CLICKHOUSE_DOCKER_IMAGE = -// "clickhouse/clickhouse-server:23.3.13.6"; -// private static final String HOST = "clickhouse"; -// private static final String DRIVER_CLASS = "com.clickhouse.jdbc.ClickHouseDriver"; -// private static final String DATABASE = "default"; -// private static final String SINK_TABLE = "sink_table"; -// private ClickHouseContainer container; -// private Connection connection; -// -// @BeforeAll -// @Override -// public void startUp() throws Exception { -// this.container = -// new ClickHouseContainer(CLICKHOUSE_DOCKER_IMAGE) -// .withNetwork(NETWORK) -// .withNetworkAliases(HOST) -// .withExposedPorts(8123) -// .withLogConsumer( -// new Slf4jLogConsumer( -// DockerLoggerFactory.getLogger(CLICKHOUSE_DOCKER_IMAGE))); -// Startables.deepStart(Stream.of(this.container)).join(); -// log.info("Clickhouse container started"); -// Awaitility.given() -// .ignoreExceptions() -// .await() -// .atMost(360L, TimeUnit.SECONDS) -// .untilAsserted(this::initConnection); -// } -// -// @AfterAll -// @Override -// public void tearDown() throws Exception { -// if (this.connection != null) { -// this.connection.close(); -// } -// if (this.container != null) { -// this.container.stop(); -// } -// } -// -// @TestTemplate -// public void testClickhouseMergeTreeTable(TestContainer container) throws Exception { -// initializeClickhouseMergeTreeTable(); -// -// Container.ExecResult execResult = -// container.executeJob("/clickhouse_sink_cdc_changelog_case1.conf"); -// Assertions.assertEquals(0, execResult.getExitCode()); -// -// checkSinkTableRows(); -// dropSinkTable(); -// } -// -// @TestTemplate -// public void testClickhouseMergeTreeTableWithEnableDelete(TestContainer container) -// throws Exception { -// initializeClickhouseMergeTreeTable(); -// -// Container.ExecResult execResult = -// container.executeJob("/clickhouse_sink_cdc_changelog_case2.conf"); -// Assertions.assertEquals(0, execResult.getExitCode()); -// -// Awaitility.given() -// .ignoreExceptions() -// .await() -// .atLeast(100L, TimeUnit.MILLISECONDS) -// .atMost(20L, TimeUnit.SECONDS) -// .untilAsserted(this::checkSinkTableRows); -// dropSinkTable(); -// } -// -// @TestTemplate -// public void testClickhouseReplacingMergeTreeTable(TestContainer container) throws Exception { -// initializeClickhouseReplacingMergeTreeTable(); -// -// Container.ExecResult execResult = -// container.executeJob("/clickhouse_sink_cdc_changelog_case1.conf"); -// Assertions.assertEquals(0, execResult.getExitCode()); -// -// checkSinkTableRows(); -// dropSinkTable(); -// } -// -// @TestTemplate -// public void testClickhouseReplacingMergeTreeTableWithEnableDelete(TestContainer container) -// throws Exception { -// initializeClickhouseReplacingMergeTreeTable(); -// -// Container.ExecResult execResult = -// container.executeJob("/clickhouse_sink_cdc_changelog_case2.conf"); -// Assertions.assertEquals(0, execResult.getExitCode()); -// -// checkSinkTableRows(); -// dropSinkTable(); -// } -// -// private void initConnection() throws Exception { -// final Properties info = new Properties(); -// info.put("user", this.container.getUsername()); -// info.put("password", this.container.getPassword()); -// this.connection = -// ((Driver) Class.forName(DRIVER_CLASS).newInstance()) -// .connect(this.container.getJdbcUrl(), info); -// } -// -// private void initializeClickhouseMergeTreeTable() { -// try { -// Statement statement = this.connection.createStatement(); -// String sql = -// String.format( -// "create table if not exists %s.%s(\n" -// + " `pk_id` Int64,\n" -// + " `name` String,\n" -// + " `score` Int32\n" -// + ")engine=MergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", -// DATABASE, SINK_TABLE); -// statement.execute(sql); -// } catch (SQLException e) { -// throw new RuntimeException("Initializing Clickhouse table failed!", e); -// } -// } -// -// private void initializeClickhouseReplacingMergeTreeTable() { -// try { -// Statement statement = this.connection.createStatement(); -// String sql = -// String.format( -// "create table if not exists %s.%s(\n" -// + " `pk_id` Int64,\n" -// + " `name` String,\n" -// + " `score` Int32\n" -// + ")engine=ReplacingMergeTree ORDER BY(pk_id) PRIMARY -// KEY(pk_id)", -// DATABASE, SINK_TABLE); -// statement.execute(sql); -// } catch (SQLException e) { -// throw new RuntimeException("Initializing Clickhouse table failed!", e); -// } -// } -// -// private void checkSinkTableRows() throws SQLException { -// Set> actual = new HashSet<>(); -// try (Statement statement = connection.createStatement(); -// ResultSet resultSet = -// statement.executeQuery( -// String.format("select * from %s.%s", DATABASE, SINK_TABLE))) { -// while (resultSet.next()) { -// List row = -// Arrays.asList( -// resultSet.getLong("pk_id"), -// resultSet.getString("name"), -// resultSet.getInt("score")); -// actual.add(row); -// } -// } -// Set> expected = -// Stream.>of(Arrays.asList(1L, "A_1", 100), Arrays.asList(3L, "C", -// 100)) -// .collect(Collectors.toSet()); -// if (!Arrays.equals(actual.toArray(), expected.toArray())) { -// throw new IllegalStateException( -// String.format( -// "Actual results %s not equal expected results %s", -// Arrays.toString(actual.toArray()), -// Arrays.toString(expected.toArray()))); -// } -// } -// -// private void dropSinkTable() { -// try (Statement statement = connection.createStatement()) { -// statement.execute( -// String.format("drop table if exists %s.%s sync", DATABASE, SINK_TABLE)); -// } catch (SQLException e) { -// throw new RuntimeException("Test clickhouse server image error", e); -// } -// } -// } +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.ClickHouseContainer; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; + +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +public class ClickhouseSinkCDCChangelogIT extends TestSuiteBase implements TestResource { + private static final String CLICKHOUSE_DOCKER_IMAGE = "clickhouse/clickhouse-server:23.3.13.6"; + private static final String HOST = "clickhouse"; + private static final String DRIVER_CLASS = "com.clickhouse.jdbc.ClickHouseDriver"; + private static final String DATABASE = "default"; + private static final String SINK_TABLE = "sink_table"; + private ClickHouseContainer container; + private Connection connection; + + @BeforeAll + @Override + public void startUp() throws Exception { + this.container = + new ClickHouseContainer(CLICKHOUSE_DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withExposedPorts(8123) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(CLICKHOUSE_DOCKER_IMAGE))); + Startables.deepStart(Stream.of(this.container)).join(); + log.info("Clickhouse container started"); + Awaitility.given() + .ignoreExceptions() + .await() + .atMost(360L, TimeUnit.SECONDS) + .untilAsserted(this::initConnection); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (this.connection != null) { + this.connection.close(); + } + if (this.container != null) { + this.container.stop(); + } + } + + @TestTemplate + public void testClickhouseMergeTreeTable(TestContainer container) throws Exception { + initializeClickhouseMergeTreeTable(); + + Container.ExecResult execResult = + container.executeJob("/clickhouse_sink_cdc_changelog_case1.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + checkSinkTableRows(); + dropSinkTable(); + } + + @TestTemplate + public void testClickhouseMergeTreeTableWithEnableDelete(TestContainer container) + throws Exception { + initializeClickhouseMergeTreeTable(); + + Container.ExecResult execResult = + container.executeJob("/clickhouse_sink_cdc_changelog_case2.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + Awaitility.given() + .ignoreExceptions() + .await() + .atLeast(100L, TimeUnit.MILLISECONDS) + .atMost(20L, TimeUnit.SECONDS) + .untilAsserted(this::checkSinkTableRows); + dropSinkTable(); + } + + @TestTemplate + public void testClickhouseReplacingMergeTreeTable(TestContainer container) throws Exception { + initializeClickhouseReplacingMergeTreeTable(); + + Container.ExecResult execResult = + container.executeJob("/clickhouse_sink_cdc_changelog_case1.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + checkSinkTableRows(); + dropSinkTable(); + } + + @TestTemplate + public void testClickhouseReplacingMergeTreeTableWithEnableDelete(TestContainer container) + throws Exception { + initializeClickhouseReplacingMergeTreeTable(); + + Container.ExecResult execResult = + container.executeJob("/clickhouse_sink_cdc_changelog_case2.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + checkSinkTableRows(); + dropSinkTable(); + } + + private void initConnection() throws Exception { + final Properties info = new Properties(); + info.put("user", this.container.getUsername()); + info.put("password", this.container.getPassword()); + this.connection = + ((Driver) Class.forName(DRIVER_CLASS).newInstance()) + .connect(this.container.getJdbcUrl(), info); + } + + private void initializeClickhouseMergeTreeTable() { + try { + Statement statement = this.connection.createStatement(); + String sql = + String.format( + "create table if not exists %s.%s(\n" + + " `pk_id` Int64,\n" + + " `name` String,\n" + + " `score` Int32\n" + + ")engine=MergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", + DATABASE, SINK_TABLE); + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Initializing Clickhouse table failed!", e); + } + } + + private void initializeClickhouseReplacingMergeTreeTable() { + try { + Statement statement = this.connection.createStatement(); + String sql = + String.format( + "create table if not exists %s.%s(\n" + + " `pk_id` Int64,\n" + + " `name` String,\n" + + " `score` Int32\n" + + ")engine=ReplacingMergeTree ORDER BY(pk_id) PRIMARY KEY(pk_id)", + DATABASE, SINK_TABLE); + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Initializing Clickhouse table failed!", e); + } + } + + private void checkSinkTableRows() throws SQLException { + Set> actual = new HashSet<>(); + try (Statement statement = connection.createStatement(); + ResultSet resultSet = + statement.executeQuery( + String.format("select * from %s.%s", DATABASE, SINK_TABLE))) { + while (resultSet.next()) { + List row = + Arrays.asList( + resultSet.getLong("pk_id"), + resultSet.getString("name"), + resultSet.getInt("score")); + actual.add(row); + } + } + Set> expected = + Stream.>of(Arrays.asList(1L, "A_1", 100), Arrays.asList(3L, "C", 100)) + .collect(Collectors.toSet()); + if (!Arrays.equals(actual.toArray(), expected.toArray())) { + throw new IllegalStateException( + String.format( + "Actual results %s not equal expected results %s", + Arrays.toString(actual.toArray()), + Arrays.toString(expected.toArray()))); + } + } + + private void dropSinkTable() { + try (Statement statement = connection.createStatement()) { + statement.execute( + String.format("drop table if exists %s.%s sync", DATABASE, SINK_TABLE)); + } catch (SQLException e) { + throw new RuntimeException("Test clickhouse server image error", e); + } + } +} From 1a6f30294276d8d2090f70c6e472c5da032f599a Mon Sep 17 00:00:00 2001 From: zhilinli Date: Mon, 28 Oct 2024 17:58:26 +0800 Subject: [PATCH 18/26] 1 --- .../seatunnel/clickhouse/ClickhouseIT.java | 12 +++++++++++- .../src/test/resources/init/clickhouse_init.conf | 12 ++++++------ .../src/test/resources/multi_source_clickhouse.conf | 12 +++++++----- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index f016136c3e7..19dbe3695b4 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -112,8 +112,18 @@ public void testClickhouse(TestContainer container) throws Exception { disabledReason = "The multi-catalog does not currently support the Spark Flink engine") public void testClickhouseMultiSource(TestContainer container) throws Exception { Container.ExecResult execResult = container.executeJob(CLICKHOUSE_MULTI_LIST_TABLE_CONFIG); - Thread.sleep(3000); Assertions.assertEquals(0, execResult.getExitCode()); + + Thread.sleep(3000); + + String query = "SELECT COUNT(*) FROM `default`.t3"; + try (Statement sourceStatement = connection.createStatement(); + ResultSet sourceResultSet = sourceStatement.executeQuery(query)) { + if (sourceResultSet.next()) { + int count = sourceResultSet.getInt(1); + Assertions.assertEquals(4, count); + } + } } @BeforeAll diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf index 889609e57d7..774219db533 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf @@ -56,16 +56,16 @@ create table if not exists `default`.t1 ( `id` Int64, field2 String ) ENGINE = Memory; - -INSERT INTO `default`.t1 VALUES (1, 'value1'), (2, 'value2'); - create table if not exists `default`.t2 ( `id` Int64, field2 String ) ENGINE = Memory; - - -INSERT INTO `default`.t2 VALUES (1, 'value1'), (2, 'value2'); +create table if not exists `default`.t3 ( + `id` Int64, + field2 String +) ENGINE = Memory; +INSERT INTO `default`.t1 VALUES (1, 'value1'), (2, 'value2'); +INSERT INTO `default`.t2 VALUES (3, 'value3'), (4, 'value4'); """ diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/multi_source_clickhouse.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/multi_source_clickhouse.conf index f3f825f6b80..4e8d7dfa8f2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/multi_source_clickhouse.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/multi_source_clickhouse.conf @@ -44,9 +44,11 @@ source { } sink { - Assert { - rules { - table-names = ["t1", "t2"] - } + Clickhouse { + host = "clickhouse:8123" + database = "default" + table = "t3" + username = "default" + password = "" } -} +} \ No newline at end of file From 9623a6a71f3f9e7eb8b84c8bb2f8434bf6ddd192 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Mon, 28 Oct 2024 20:55:24 +0800 Subject: [PATCH 19/26] 1 --- docs/en/connector-v2/source/Clickhouse.md | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/docs/en/connector-v2/source/Clickhouse.md b/docs/en/connector-v2/source/Clickhouse.md index 36a4067a302..8fda7382e80 100644 --- a/docs/en/connector-v2/source/Clickhouse.md +++ b/docs/en/connector-v2/source/Clickhouse.md @@ -1,3 +1,4 @@ + # Clickhouse > Clickhouse source connector @@ -101,6 +102,7 @@ sink { ```bash + env { parallelism = 1 job.mode = "BATCH" @@ -112,11 +114,6 @@ source { database = "default" username = "default" password = "" - server_time_zone = "UTC" - result_table_name = "test" - clickhouse.config = { - "socket_timeout": "300000" - } table_list = [ { table_path = "t1" @@ -132,10 +129,12 @@ source { } sink { - Assert { - rules { - table-names = ["t1", "t2"] - } + Clickhouse { + host = "clickhouse:8123" + database = "default" + table = "t3" + username = "default" + password = "" } } ``` From 719df6a7592def85d314356821b6646f11ec57a1 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Tue, 29 Oct 2024 17:19:52 +0800 Subject: [PATCH 20/26] 1 --- .../clickhouse/source/ClickhouseSource.java | 3 +- .../source/ClickhouseSourceReader.java | 74 ++++++++++--------- .../ClickhouseSourceSplitEnumerator.java | 2 + 3 files changed, 41 insertions(+), 38 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java index 6f405d40863..9995c54b401 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java @@ -79,11 +79,10 @@ public class ClickhouseSource SupportColumnProjection { private List servers; - private SeaTunnelRowType rowTypeInfo; private Map tableClickhouseCatalogConfigMap = new HashedMap<>(); - private final String defaultTablePath = "default"; + private static final String defaultTablePath = "default"; @Override public String getPluginName() { diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java index 26769069579..c44d9a7e84e 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; +import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.catalog.TablePath; @@ -33,6 +34,7 @@ import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseResponse; +import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.ArrayList; @@ -41,6 +43,7 @@ import java.util.List; import java.util.Random; +@Slf4j public class ClickhouseSourceReader implements SourceReader { private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseSourceReader.class); @@ -52,7 +55,7 @@ public class ClickhouseSourceReader implements SourceReader splits = new LinkedList<>(); - boolean noMoreSplit; + private volatile boolean noMoreSplit; ClickhouseSourceReader(List servers, SourceReader.Context readerContext) { this.servers = servers; @@ -76,44 +79,43 @@ public void close() throws IOException { @Override public void pollNext(Collector output) throws Exception { + ClickhouseSourceSplit split; + synchronized (output.getCheckpointLock()) { - ClickhouseSourceSplit split = splits.poll(); - if (split != null) { - TablePath tablePath = split.getTablePath(); - ClickhouseCatalogConfig clickhouseCatalogConfig = - split.getClickhouseCatalogConfig(); - String sql = clickhouseCatalogConfig.getSql(); - SeaTunnelRowType seaTunnelRowType = - clickhouseCatalogConfig.getCatalogTable().getSeaTunnelRowType(); - try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) { - response.stream() - .forEach( - record -> { - Object[] values = - new Object[seaTunnelRowType.getFieldNames().length]; - for (int i = 0; i < record.size(); i++) { - if (record.getValue(i).isNullOrEmpty()) { - values[i] = null; - } else { - values[i] = - TypeConvertUtil.valueUnwrap( - seaTunnelRowType.getFieldType(i), + split = splits.poll(); + } + + if (split != null) { + TablePath tablePath = split.getTablePath(); + ClickhouseCatalogConfig catalogConfig = split.getClickhouseCatalogConfig(); + String sql = catalogConfig.getSql(); + SeaTunnelRowType rowType = catalogConfig.getCatalogTable().getSeaTunnelRowType(); + + try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) { + Object[] values = new Object[rowType.getFieldNames().length]; + response.stream() + .forEach( + record -> { + for (int i = 0; i < record.size(); i++) { + values[i] = + record.getValue(i).isNullOrEmpty() + ? null + : TypeConvertUtil.valueUnwrap( + rowType.getFieldType(i), record.getValue(i)); - } - } - SeaTunnelRow seaTunnelRow = new SeaTunnelRow(values); - if (seaTunnelRow != null) { - seaTunnelRow.setTableId(String.valueOf(tablePath)); - output.collect(seaTunnelRow); - } - }); - } - } else if (splits.isEmpty() && noMoreSplit) { - // signal to the source that we have reached the end of the data. - readerContext.signalNoMoreElement(); - } else { - Thread.sleep(1000L); + } + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(values); + seaTunnelRow.setTableId(String.valueOf(tablePath)); + output.collect(seaTunnelRow); + }); } + } else if (splits.isEmpty() + && noMoreSplit + && Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { + readerContext.signalNoMoreElement(); + log.info( + "Closed the bounded ClickHouse source reader task {}.", + readerContext.getIndexOfSubtask()); } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java index 1fdd5daf89b..32bd4d64712 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java @@ -92,6 +92,8 @@ public void run() throws Exception { assignSplit(readers); } } + log.info("No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); } @Override From af1b2b2827aef343b4c72e28ef1bee422ee3e227 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Wed, 30 Oct 2024 10:28:17 +0800 Subject: [PATCH 21/26] 1 --- .../clickhouse/source/ClickhouseSourceReader.java | 7 ++----- .../connectors/seatunnel/clickhouse/ClickhouseIT.java | 1 + 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java index b15358414cd..8432e60a8e6 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java @@ -26,9 +26,6 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseCatalogConfig; import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseFormat; import com.clickhouse.client.ClickHouseNode; @@ -99,8 +96,8 @@ record -> { record.getValue(i).isNullOrEmpty() ? null : TypeConvertUtil.valueUnwrap( - rowType.getFieldType(i), - record.getValue(i)); + rowType.getFieldType(i), + record.getValue(i)); } SeaTunnelRow seaTunnelRow = new SeaTunnelRow(values); seaTunnelRow.setTableId(String.valueOf(tablePath)); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index 3f82b1fd3d0..ffc2c788941 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -110,6 +110,7 @@ public void testSourceParallelism(TestContainer container) throws Exception { Container.ExecResult execResult = container.executeJob("/clickhouse_to_console.conf"); Assertions.assertEquals(0, execResult.getExitCode()); } + @TestTemplate @DisabledOnContainer( value = {}, From 6d63449106b99574b2c67f76b625299b8d57cb73 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Wed, 30 Oct 2024 11:42:48 +0800 Subject: [PATCH 22/26] 1 --- .../clickhouse/source/ClickhouseSourceSplitEnumerator.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java index c5e4e50d8bc..ac38f5b87ac 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java @@ -131,7 +131,6 @@ private void assignSplit(Collection readers) { log.info("Assign splits {} to reader {}", assignmentForReader, reader); try { context.assignSplit(reader, assignmentForReader); - context.signalNoMoreSplits(reader); } catch (Exception e) { log.error( "Failed to assign splits {} to reader {}", @@ -186,7 +185,6 @@ public void registerReader(int subtaskId) { assignSplit(Collections.singletonList(subtaskId)); } } - context.signalNoMoreSplits(subtaskId); } @Override From 4ec9c4f363f10918cf301fe6a2ef8c2b7854c6d1 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Wed, 30 Oct 2024 17:58:55 +0800 Subject: [PATCH 23/26] 1 --- .../clickhouse/source/ClickhouseSource.java | 22 +++--- .../source/ClickhouseSourceFactory.java | 79 ------------------- 2 files changed, 11 insertions(+), 90 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java index 4be67c1427d..9995c54b401 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java @@ -75,8 +75,8 @@ @AutoService(SeaTunnelSource.class) public class ClickhouseSource implements SeaTunnelSource, - SupportParallelism, - SupportColumnProjection { + SupportParallelism, + SupportColumnProjection { private List servers; private Map tableClickhouseCatalogConfigMap = @@ -100,10 +100,10 @@ public void prepare(Config config) throws PrepareFailException { Map customConfig = CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key()) ? config.getObject(CLICKHOUSE_CONFIG.key()).entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - entry -> entry.getValue().unwrapped().toString())) + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().unwrapped().toString())) : null; servers = @@ -171,11 +171,11 @@ private CatalogTable createCatalogTable(SeaTunnelRowType rowType, TablePath tabl public SeaTunnelRowType getClickhouseRowType(ClickHouseNode currentServer, String sql) { try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol()); - ClickHouseResponse response = - client.connect(currentServer) - .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) - .query(String.format("SELECT * FROM (%s) s LIMIT 1", sql)) - .executeAndWait()) { + ClickHouseResponse response = + client.connect(currentServer) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + .query(String.format("SELECT * FROM (%s) s LIMIT 1", sql)) + .executeAndWait()) { int columnSize = response.getColumns().size(); String[] fieldNames = new String[columnSize]; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java index 89803e7c1d7..dcacd5fbd94 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java @@ -17,37 +17,13 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.source.SourceSplit; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.PhysicalColumn; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; -import org.apache.seatunnel.api.table.catalog.TableSchema; -import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; -import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext; -import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil; -import com.clickhouse.client.ClickHouseClient; -import com.clickhouse.client.ClickHouseColumn; -import com.clickhouse.client.ClickHouseException; -import com.clickhouse.client.ClickHouseFormat; -import com.clickhouse.client.ClickHouseNode; -import com.clickhouse.client.ClickHouseResponse; import com.google.auto.service.AutoService; -import java.io.Serializable; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; @@ -63,61 +39,6 @@ public String factoryIdentifier() { return "Clickhouse"; } - @Override - public - TableSource createSource(TableSourceFactoryContext context) { - ReadonlyConfig readonlyConfig = context.getOptions(); - List nodes = ClickhouseUtil.createNodes(readonlyConfig); - - String sql = readonlyConfig.get(SQL); - ClickHouseNode currentServer = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())); - try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol()); - ClickHouseResponse response = - client.connect(currentServer) - .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) - .query(modifySQLToLimit1(sql)) - .executeAndWait()) { - TableSchema.Builder builder = TableSchema.builder(); - List columns = response.getColumns(); - columns.forEach( - column -> { - PhysicalColumn physicalColumn = - PhysicalColumn.of( - column.getColumnName(), - TypeConvertUtil.convert(column), - (long) column.getEstimatedLength(), - column.getScale(), - column.isNullable(), - null, - null); - builder.column(physicalColumn); - }); - String catalogName = "clickhouse_catalog"; - CatalogTable catalogTable = - CatalogTable.of( - TableIdentifier.of( - catalogName, readonlyConfig.get(DATABASE), "default"), - builder.build(), - Collections.emptyMap(), - Collections.emptyList(), - "", - catalogName); - return () -> - (SeaTunnelSource) - new ClickhouseSource(nodes, catalogTable, sql); - } catch (ClickHouseException e) { - throw new ClickhouseConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - factoryIdentifier(), PluginType.SOURCE, e.getMessage())); - } - } - - private String modifySQLToLimit1(String sql) { - return String.format("SELECT * FROM (%s) s LIMIT 1", sql); - } - @Override public OptionRule optionRule() { return OptionRule.builder() From ac979ef8c92a6bbc51a55e5aaa4046e6872ab74a Mon Sep 17 00:00:00 2001 From: zhilinli Date: Thu, 31 Oct 2024 11:37:39 +0800 Subject: [PATCH 24/26] 1 --- .../seatunnel/prometheus/sink/PrometheusSink.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java index 93d4e931e17..eaca954290f 100644 --- a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java +++ b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; @@ -28,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; public class PrometheusSink extends AbstractSimpleSink implements SupportMultiTableSink { @@ -65,6 +67,11 @@ public String getPluginName() { return "Prometheus"; } + @Override + public Optional getWriteCatalogTable() { + return super.getWriteCatalogTable(); + } + @Override public PrometheusWriter createWriter(SinkWriter.Context context) { return new PrometheusWriter(seaTunnelRowType, httpParameter, pluginConfig); From 4c12b008d33352d68e95b5c903ca0e718eacaec8 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Thu, 31 Oct 2024 16:28:11 +0800 Subject: [PATCH 25/26] 1 --- .../seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java index 5d9c1c848b2..031b6caa014 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java @@ -95,6 +95,7 @@ public void testClickhouseMergeTreeTable(TestContainer container) throws Excepti Container.ExecResult execResult = container.executeJob("/clickhouse_sink_cdc_changelog_case1.conf"); + Thread.sleep(100); Assertions.assertEquals(0, execResult.getExitCode()); checkSinkTableRows(); @@ -108,6 +109,7 @@ public void testClickhouseMergeTreeTableWithEnableDelete(TestContainer container Container.ExecResult execResult = container.executeJob("/clickhouse_sink_cdc_changelog_case2.conf"); + Thread.sleep(100); Assertions.assertEquals(0, execResult.getExitCode()); Awaitility.given() @@ -125,6 +127,7 @@ public void testClickhouseReplacingMergeTreeTable(TestContainer container) throw Container.ExecResult execResult = container.executeJob("/clickhouse_sink_cdc_changelog_case1.conf"); + Thread.sleep(100); Assertions.assertEquals(0, execResult.getExitCode()); checkSinkTableRows(); @@ -138,6 +141,7 @@ public void testClickhouseReplacingMergeTreeTableWithEnableDelete(TestContainer Container.ExecResult execResult = container.executeJob("/clickhouse_sink_cdc_changelog_case2.conf"); + Thread.sleep(100); Assertions.assertEquals(0, execResult.getExitCode()); checkSinkTableRows(); From f22f18780c068bd5d7a27db862989c2b0fabf108 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Thu, 7 Nov 2024 11:13:54 +0800 Subject: [PATCH 26/26] 1 --- .../connectors/seatunnel/prometheus/sink/PrometheusSink.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java index 946f8a92ef2..35ec257fc93 100644 --- a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java +++ b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/sink/PrometheusSink.java @@ -66,11 +66,6 @@ public String getPluginName() { return "Prometheus"; } - @Override - public Optional getWriteCatalogTable() { - return super.getWriteCatalogTable(); - } - @Override public PrometheusWriter createWriter(SinkWriter.Context context) { return new PrometheusWriter(