diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java index 591334d9722..3ad0ec041e6 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; +import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -28,6 +29,7 @@ import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseResponse; +import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.ArrayList; @@ -35,6 +37,7 @@ import java.util.List; import java.util.Random; +@Slf4j public class ClickhouseSourceReader implements SourceReader { private final List servers; @@ -43,6 +46,7 @@ public class ClickhouseSourceReader implements SourceReader request; private final String sql; + private volatile boolean noMoreSplit; private final List splits; @@ -75,31 +79,43 @@ public void close() throws IOException { @Override public void pollNext(Collector output) throws Exception { - if (!splits.isEmpty()) { - try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) { - response.stream() - .forEach( - record -> { - Object[] values = - new Object[this.rowTypeInfo.getFieldNames().length]; - for (int i = 0; i < record.size(); i++) { - if (record.getValue(i).isNullOrEmpty()) { - values[i] = null; - } else { - values[i] = - TypeConvertUtil.valueUnwrap( - this.rowTypeInfo.getFieldType(i), - record.getValue(i)); + synchronized (output.getCheckpointLock()) { + if (!splits.isEmpty()) { + try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) { + response.stream() + .forEach( + record -> { + Object[] values = + new Object[this.rowTypeInfo.getFieldNames().length]; + for (int i = 0; i < record.size(); i++) { + if (record.getValue(i).isNullOrEmpty()) { + values[i] = null; + } else { + values[i] = + TypeConvertUtil.valueUnwrap( + this.rowTypeInfo.getFieldType(i), + record.getValue(i)); + } } - } - output.collect(new SeaTunnelRow(values)); - }); + output.collect(new SeaTunnelRow(values)); + }); + } + signalNoMoreElement(); + } + if (noMoreSplit + && splits.isEmpty() + && Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { + signalNoMoreElement(); } - this.readerContext.signalNoMoreElement(); - this.splits.clear(); } } + private void signalNoMoreElement() { + log.info("Closed the bounded ClickHouse source"); + this.readerContext.signalNoMoreElement(); + this.splits.clear(); + } + @Override public List snapshotState(long checkpointId) throws Exception { return Collections.emptyList(); @@ -111,7 +127,9 @@ public void addSplits(List splits) { } @Override - public void handleNoMoreSplits() {} + public void handleNoMoreSplits() { + noMoreSplit = true; + } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception {} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java index c0eb4b6c706..f3c1bd0c47b 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java @@ -78,6 +78,7 @@ public void registerReader(int subtaskId) { assigned = subtaskId; context.assignSplit(subtaskId, new ClickhouseSourceSplit()); } + context.signalNoMoreSplits(subtaskId); } @Override diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index 66ee281740c..76bdfaa2816 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -101,6 +101,12 @@ public void testClickhouse(TestContainer container) throws Exception { clearSinkTable(); } + @TestTemplate + public void testSourceParallelism(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob("/clickhouse_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + @BeforeAll @Override public void startUp() throws Exception { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf new file mode 100644 index 00000000000..e996be8e4a2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 3 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from source_table" + username = "default" + password = "" + result_table_name = "source_table" + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/ClickhouseSource +} + +sink { + console { + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml index c9bb71ecc07..8c5a136d1b0 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml @@ -20,6 +20,7 @@ seatunnel: backup-count: 1 queue-type: blockingqueue print-execution-info-interval: 60 + print-job-metrics-info-interval: 60 slot-service: dynamic-slot: true checkpoint: