From baa7cb89e40c55159414cc4d4c9c6ad5b2875d82 Mon Sep 17 00:00:00 2001 From: "YOMO.LEE" <75362129@qq.com> Date: Tue, 22 Oct 2024 17:28:28 +0800 Subject: [PATCH 01/11] [Fix][Doc] Fix LocalFile doc (#7887) Supplement and optimize the description of the LocalFile connector on filtering files [(#7887)](https://github.com/apache/seatunnel/issues/7887) --- docs/en/connector-v2/source/LocalFile.md | 29 ++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index 6d11b992e3a..533d7fa91bf 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -254,6 +254,12 @@ Specifies Whether to process data using the tag attribute format. Filter pattern, which used for filtering files. +The filtering format is similar to wildcard matching file names in Linux. + +However, it should be noted that unlike Linux wildcard characters, when encountering file suffixes, the middle dot cannot be omitted. + +For example, `abc20241022.csv`, the normal Linux wildcard `abc*` is sufficient, but here we need to use `abc*.*` , Pay attention to a point in the middle. + ### compress_codec [string] The compress codec of files and the details that supported as the following shown: @@ -406,6 +412,29 @@ sink { ``` +### Filter File + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + LocalFile { + path = "/seatunnel/read/" + file_format_type = "csv" + skip_header_row_number = 1 + // file example abcD2024.csv + file_filter_pattern = "abc[DX]*.*" + } +} +sink { + Console { + } +} +``` + ## Changelog ### 2.2.0-beta 2022-09-26 From 427668128471aea7fb34dcca10666bd079fe79ea Mon Sep 17 00:00:00 2001 From: "YOMO.LEE" <75362129@qq.com> Date: Fri, 25 Oct 2024 18:34:23 +0800 Subject: [PATCH 02/11] [Fix][Connector-V2][ClickHouse] Fix ClickHouse Bug (#7897) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、When the ClickHouse connector is set to multi parallelism, the task extraction is completed but cannot be stopped normally [(#7897)](https://github.com/apache/seatunnel/issues/7897) 2、Added E2E test cases for this issue [(#7897)](https://github.com/apache/seatunnel/issues/7897) 3、Local developers want to observe **Job Progress Information** in a timely manner, Need to modify the following configuration.The configuration in config is invalid ``` seatunnel engine/seatunnel-engineer-common/src/main/resources/seatunnely.yaml ``` --- .../clickhouse/source/ClickhouseSourceReader.java | 13 +++++++++++-- .../source/ClickhouseSourceSplitEnumerator.java | 2 ++ .../src/main/resources/seatunnel.yaml | 1 + 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java index 591334d9722..b21519083b7 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -34,7 +36,7 @@ import java.util.Collections; import java.util.List; import java.util.Random; - +@Slf4j public class ClickhouseSourceReader implements SourceReader { private final List servers; @@ -43,6 +45,7 @@ public class ClickhouseSourceReader implements SourceReader request; private final String sql; + private volatile boolean noMoreSplit; private final List splits; @@ -97,6 +100,12 @@ record -> { } this.readerContext.signalNoMoreElement(); this.splits.clear(); + } else if (noMoreSplit + && splits.isEmpty() + && Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { + log.info("Closed the bounded ClickHouse source"); + this.readerContext.signalNoMoreElement(); + this.splits.clear(); } } @@ -111,7 +120,7 @@ public void addSplits(List splits) { } @Override - public void handleNoMoreSplits() {} + public void handleNoMoreSplits() {noMoreSplit = true;} @Override public void notifyCheckpointComplete(long checkpointId) throws Exception {} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java index c0eb4b6c706..21937db9a81 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java @@ -77,6 +77,8 @@ public void registerReader(int subtaskId) { if (assigned < 0) { assigned = subtaskId; context.assignSplit(subtaskId, new ClickhouseSourceSplit()); + } else { + context.signalNoMoreSplits(subtaskId); } } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml index c9bb71ecc07..8c5a136d1b0 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml +++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml @@ -20,6 +20,7 @@ seatunnel: backup-count: 1 queue-type: blockingqueue print-execution-info-interval: 60 + print-job-metrics-info-interval: 60 slot-service: dynamic-slot: true checkpoint: From 1b8066765f2328f4624fb4137b867566f4d2fa69 Mon Sep 17 00:00:00 2001 From: "YOMO.LEE" <75362129@qq.com> Date: Fri, 25 Oct 2024 22:33:01 +0800 Subject: [PATCH 03/11] [Fix][Connector-V2][ClickHouse] Fix ClickHouse Bug (#7897) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、When the ClickHouse connector is set to multi parallelism, the task extraction is completed but cannot be stopped normally [(#7897)](https://github.com/apache/seatunnel/issues/7897) 2、Added E2E test cases for this issue [(#7897)](https://github.com/apache/seatunnel/issues/7897) 3、Local developers want to observe **Job Progress Information** in a timely manner, Need to modify the following configuration.The configuration in config is invalid ``` seatunnel engine/seatunnel-engineer-common/src/main/resources/seatunnely.yaml ``` --- .../source/ClickhouseSourceReader.java | 7 ++- .../seatunnel/clickhouse/ClickhouseIT.java | 7 +++ .../test/resources/clickhouse_to_console.conf | 45 +++++++++++++++++++ 3 files changed, 57 insertions(+), 2 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java index b21519083b7..fd3c6300b5d 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.source; -import lombok.extern.slf4j.Slf4j; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; @@ -30,12 +29,14 @@ import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseResponse; +import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; + @Slf4j public class ClickhouseSourceReader implements SourceReader { @@ -120,7 +121,9 @@ public void addSplits(List splits) { } @Override - public void handleNoMoreSplits() {noMoreSplit = true;} + public void handleNoMoreSplits() { + noMoreSplit = true; + } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception {} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index 66ee281740c..3e8b0a090fc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -101,6 +101,13 @@ public void testClickhouse(TestContainer container) throws Exception { clearSinkTable(); } + @TestTemplate + public void testSourceParallelism(TestContainer container) throws Exception { + System.out.println("=========多并行度测试==========="); + Container.ExecResult execResult = container.executeJob("/clickhouse_to_console.conf"); + System.out.println(execResult.getExitCode()); + } + @BeforeAll @Override public void startUp() throws Exception { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf new file mode 100644 index 00000000000..755131276ba --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 2 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Clickhouse { + host = "clickhouse:8123" + database = "default" + sql = "select * from source_table" + username = "default" + password = "" + result_table_name = "source_table" + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/ClickhouseSource +} + +sink { + console { + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink +} \ No newline at end of file From e64b8a67ddd8ae6730813c66cb2e929e2768d6eb Mon Sep 17 00:00:00 2001 From: "YOMO.LEE" <75362129@qq.com> Date: Sat, 26 Oct 2024 12:02:28 +0800 Subject: [PATCH 04/11] [Fix][Connector-V2][ClickHouse] Fix ClickHouse Bug (#7897) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、When the ClickHouse connector is set to multi parallelism, the task extraction is completed but cannot be stopped normally [(#7897)](https://github.com/apache/seatunnel/issues/7897) 2、Added E2E test cases for this issue [(#7897)](https://github.com/apache/seatunnel/issues/7897) 3、Local developers want to observe **Job Progress Information** in a timely manner, Need to modify the following configuration.The configuration in config is invalid ``` seatunnel engine/seatunnel-engineer-common/src/main/resources/seatunnely.yaml ``` --- .../connectors/seatunnel/clickhouse/ClickhouseIT.java | 4 ++-- .../src/test/resources/clickhouse_to_console.conf | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index 3e8b0a090fc..e0a09dc56dc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -103,9 +103,9 @@ public void testClickhouse(TestContainer container) throws Exception { @TestTemplate public void testSourceParallelism(TestContainer container) throws Exception { - System.out.println("=========多并行度测试==========="); + LOG.info("=========多并行度测试==========="); Container.ExecResult execResult = container.executeJob("/clickhouse_to_console.conf"); - System.out.println(execResult.getExitCode()); + Assertions.assertEquals(0, execResult.getExitCode()); } @BeforeAll diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf index 755131276ba..e996be8e4a2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf @@ -19,7 +19,7 @@ ###### env { - parallelism = 2 + parallelism = 3 job.mode = "BATCH" } From 42e591925056841d1299effa0112d8134d5d3070 Mon Sep 17 00:00:00 2001 From: "YOMO.LEE" <75362129@qq.com> Date: Sat, 26 Oct 2024 14:21:46 +0800 Subject: [PATCH 05/11] [Fix][Doc] Fix LocalFile doc (#7887) Continue to optimize the document about filtering files and add some examples [(#7887)](https://github.com/apache/seatunnel/issues/7887) --- docs/en/connector-v2/source/LocalFile.md | 60 ++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index 533d7fa91bf..077537f6887 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -256,10 +256,70 @@ Filter pattern, which used for filtering files. The filtering format is similar to wildcard matching file names in Linux. +| Wildcard | Meaning | Example | +|--------------|--------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------| +| * | Match 0 or more characters | f*     Any file starting with f
b*.txt   Any file starting with b, any character in the middle, and ending with. txt | +| [] | Match a single character in parentheses | [abc]*   A file that starts with any one of the characters a, b, or c | +| ? | Match any single character | f?.txt   Any file starting with 'f' followed by a character and ending with '. txt' | +| [!] | Match any single character not in parentheses | [!abc]*   Any file that does not start with abc | +| [a-z] | Match any single character from a to z | [a-z]*   Any file starting with a to z | +| {a,b,c}/a..z | When separated by commas, it represents individual characters
When separated by two dots, represents continuous characters | {a,b,c}*   Files starting with any character from abc
{a..Z}*    Files starting with any character from a to z | + However, it should be noted that unlike Linux wildcard characters, when encountering file suffixes, the middle dot cannot be omitted. For example, `abc20241022.csv`, the normal Linux wildcard `abc*` is sufficient, but here we need to use `abc*.*` , Pay attention to a point in the middle. +File Structure Example: +``` +report.txt +notes.txt +input.csv +abch20241022.csv +abcw20241022.csv +abcx20241022.csv +abcq20241022.csv +abcg20241022.csv +abcv20241022.csv +abcb20241022.csv +old_data.csv +logo.png +script.sh +helpers.sh +``` +Matching Rules Example: + +**Example 1**: *Match all .txt files*,Regular Expression: +``` +*.txt +``` +The result of this example matching is: +``` +report.txt +notes.txt +``` +**Example 2**: *Match all Any file starting with abc*,Regular Expression: +``` +abc*.csv +``` +The result of this example matching is: +``` +abch20241022.csv +abcw20241022.csv +abcx20241022.csv +abcq20241022.csv +abcg20241022.csv +abcv20241022.csv +abcb20241022.csv +``` +**Example 3**: *Match all Any file starting with abc,And the fourth character is either x or g*, the Regular Expression: +``` +abc[x,g]*.csv +``` +The result of this example matching is: +``` +abcx20241022.csv +abcg20241022.csv +``` ### compress_codec [string] The compress codec of files and the details that supported as the following shown: From 2e9162d61e5e7074daee1e3a053a795fad8f2d40 Mon Sep 17 00:00:00 2001 From: "YOMO.LEE" <75362129@qq.com> Date: Sat, 26 Oct 2024 21:44:17 +0800 Subject: [PATCH 06/11] [Fix][Doc] Fix LocalFile doc (#7887) Change to English log [(#7887)](https://github.com/apache/seatunnel/issues/7887) --- .../seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index e0a09dc56dc..b4093fd4f12 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -103,7 +103,7 @@ public void testClickhouse(TestContainer container) throws Exception { @TestTemplate public void testSourceParallelism(TestContainer container) throws Exception { - LOG.info("=========多并行度测试==========="); + LOG.info("=========Multi parallelism testing begins==========="); Container.ExecResult execResult = container.executeJob("/clickhouse_to_console.conf"); Assertions.assertEquals(0, execResult.getExitCode()); } From be22d936cb3641cc2c1d4dcb36df3944f893b9a4 Mon Sep 17 00:00:00 2001 From: "YOMO.LEE" <75362129@qq.com> Date: Sun, 27 Oct 2024 00:00:03 +0800 Subject: [PATCH 07/11] Revert "[Fix][Doc] Fix LocalFile doc (#7887)" This reverts commit 42e591925056841d1299effa0112d8134d5d3070. --- docs/en/connector-v2/source/LocalFile.md | 60 ------------------------ 1 file changed, 60 deletions(-) diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index 077537f6887..533d7fa91bf 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -256,70 +256,10 @@ Filter pattern, which used for filtering files. The filtering format is similar to wildcard matching file names in Linux. -| Wildcard | Meaning | Example | -|--------------|--------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------| -| * | Match 0 or more characters | f*     Any file starting with f
b*.txt   Any file starting with b, any character in the middle, and ending with. txt | -| [] | Match a single character in parentheses | [abc]*   A file that starts with any one of the characters a, b, or c | -| ? | Match any single character | f?.txt   Any file starting with 'f' followed by a character and ending with '. txt' | -| [!] | Match any single character not in parentheses | [!abc]*   Any file that does not start with abc | -| [a-z] | Match any single character from a to z | [a-z]*   Any file starting with a to z | -| {a,b,c}/a..z | When separated by commas, it represents individual characters
When separated by two dots, represents continuous characters | {a,b,c}*   Files starting with any character from abc
{a..Z}*    Files starting with any character from a to z | - However, it should be noted that unlike Linux wildcard characters, when encountering file suffixes, the middle dot cannot be omitted. For example, `abc20241022.csv`, the normal Linux wildcard `abc*` is sufficient, but here we need to use `abc*.*` , Pay attention to a point in the middle. -File Structure Example: -``` -report.txt -notes.txt -input.csv -abch20241022.csv -abcw20241022.csv -abcx20241022.csv -abcq20241022.csv -abcg20241022.csv -abcv20241022.csv -abcb20241022.csv -old_data.csv -logo.png -script.sh -helpers.sh -``` -Matching Rules Example: - -**Example 1**: *Match all .txt files*,Regular Expression: -``` -*.txt -``` -The result of this example matching is: -``` -report.txt -notes.txt -``` -**Example 2**: *Match all Any file starting with abc*,Regular Expression: -``` -abc*.csv -``` -The result of this example matching is: -``` -abch20241022.csv -abcw20241022.csv -abcx20241022.csv -abcq20241022.csv -abcg20241022.csv -abcv20241022.csv -abcb20241022.csv -``` -**Example 3**: *Match all Any file starting with abc,And the fourth character is either x or g*, the Regular Expression: -``` -abc[x,g]*.csv -``` -The result of this example matching is: -``` -abcx20241022.csv -abcg20241022.csv -``` ### compress_codec [string] The compress codec of files and the details that supported as the following shown: From 64fa711fb682463150972115d7792d2741ed75f0 Mon Sep 17 00:00:00 2001 From: "YOMO.LEE" <75362129@qq.com> Date: Sun, 27 Oct 2024 00:00:26 +0800 Subject: [PATCH 08/11] Revert "[Fix][Doc] Fix LocalFile doc (#7887)" This reverts commit baa7cb89e40c55159414cc4d4c9c6ad5b2875d82. --- docs/en/connector-v2/source/LocalFile.md | 29 ------------------------ 1 file changed, 29 deletions(-) diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index 533d7fa91bf..6d11b992e3a 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -254,12 +254,6 @@ Specifies Whether to process data using the tag attribute format. Filter pattern, which used for filtering files. -The filtering format is similar to wildcard matching file names in Linux. - -However, it should be noted that unlike Linux wildcard characters, when encountering file suffixes, the middle dot cannot be omitted. - -For example, `abc20241022.csv`, the normal Linux wildcard `abc*` is sufficient, but here we need to use `abc*.*` , Pay attention to a point in the middle. - ### compress_codec [string] The compress codec of files and the details that supported as the following shown: @@ -412,29 +406,6 @@ sink { ``` -### Filter File - -```hocon -env { - parallelism = 1 - job.mode = "BATCH" -} - -source { - LocalFile { - path = "/seatunnel/read/" - file_format_type = "csv" - skip_header_row_number = 1 - // file example abcD2024.csv - file_filter_pattern = "abc[DX]*.*" - } -} -sink { - Console { - } -} -``` - ## Changelog ### 2.2.0-beta 2022-09-26 From 9109024b2887961dab9a2294fe799687e3989664 Mon Sep 17 00:00:00 2001 From: "YOMO.LEE" <75362129@qq.com> Date: Tue, 29 Oct 2024 10:32:45 +0800 Subject: [PATCH 09/11] [Fix][Connector-V2] Fixed clickhouse connectors cannot stop under multiple parallelism Adjust the bug regarding the Clickhouse connector not being able to stop properly --- .../source/ClickhouseSourceReader.java | 53 ++++++++++--------- .../ClickhouseSourceSplitEnumerator.java | 1 + .../seatunnel/clickhouse/ClickhouseIT.java | 1 - 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java index fd3c6300b5d..da5dc7e5211 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java @@ -79,34 +79,35 @@ public void close() throws IOException { @Override public void pollNext(Collector output) throws Exception { - if (!splits.isEmpty()) { - try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) { - response.stream() - .forEach( - record -> { - Object[] values = - new Object[this.rowTypeInfo.getFieldNames().length]; - for (int i = 0; i < record.size(); i++) { - if (record.getValue(i).isNullOrEmpty()) { - values[i] = null; - } else { - values[i] = - TypeConvertUtil.valueUnwrap( - this.rowTypeInfo.getFieldType(i), - record.getValue(i)); + synchronized (output.getCheckpointLock()) { + if (!splits.isEmpty()) { + try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) { + response.stream() + .forEach( + record -> { + Object[] values = + new Object[this.rowTypeInfo.getFieldNames().length]; + for (int i = 0; i < record.size(); i++) { + if (record.getValue(i).isNullOrEmpty()) { + values[i] = null; + } else { + values[i] = + TypeConvertUtil.valueUnwrap( + this.rowTypeInfo.getFieldType(i), + record.getValue(i)); + } } - } - output.collect(new SeaTunnelRow(values)); - }); + output.collect(new SeaTunnelRow(values)); + }); + } + this.readerContext.signalNoMoreElement(); + this.splits.clear(); + } + if (noMoreSplit && splits.isEmpty()) { + log.info("Closed the bounded ClickHouse source"); + this.readerContext.signalNoMoreElement(); + this.splits.clear(); } - this.readerContext.signalNoMoreElement(); - this.splits.clear(); - } else if (noMoreSplit - && splits.isEmpty() - && Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { - log.info("Closed the bounded ClickHouse source"); - this.readerContext.signalNoMoreElement(); - this.splits.clear(); } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java index 21937db9a81..418764560e5 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java @@ -77,6 +77,7 @@ public void registerReader(int subtaskId) { if (assigned < 0) { assigned = subtaskId; context.assignSplit(subtaskId, new ClickhouseSourceSplit()); + context.signalNoMoreSplits(subtaskId); } else { context.signalNoMoreSplits(subtaskId); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java index b4093fd4f12..76bdfaa2816 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java @@ -103,7 +103,6 @@ public void testClickhouse(TestContainer container) throws Exception { @TestTemplate public void testSourceParallelism(TestContainer container) throws Exception { - LOG.info("=========Multi parallelism testing begins==========="); Container.ExecResult execResult = container.executeJob("/clickhouse_to_console.conf"); Assertions.assertEquals(0, execResult.getExitCode()); } From ace9fb0af3b96a5d2fafc6d7c499de558899bdf2 Mon Sep 17 00:00:00 2001 From: "YOMO.LEE" <75362129@qq.com> Date: Tue, 29 Oct 2024 12:01:20 +0800 Subject: [PATCH 10/11] [Fix][Connector-V2] Fixed clickhouse connectors cannot stop under multiple parallelism Optimize code structure --- .../clickhouse/source/ClickhouseSourceReader.java | 15 +++++++++------ .../source/ClickhouseSourceSplitEnumerator.java | 4 +--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java index da5dc7e5211..1b46375545a 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java @@ -100,17 +100,20 @@ record -> { output.collect(new SeaTunnelRow(values)); }); } - this.readerContext.signalNoMoreElement(); - this.splits.clear(); + signalNoMoreElement(); } - if (noMoreSplit && splits.isEmpty()) { - log.info("Closed the bounded ClickHouse source"); - this.readerContext.signalNoMoreElement(); - this.splits.clear(); + if (noMoreSplit && splits.isEmpty() && Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { + signalNoMoreElement(); } } } + private void signalNoMoreElement() { + log.info("Closed the bounded ClickHouse source"); + this.readerContext.signalNoMoreElement(); + this.splits.clear(); + } + @Override public List snapshotState(long checkpointId) throws Exception { return Collections.emptyList(); diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java index 418764560e5..f3c1bd0c47b 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java @@ -77,10 +77,8 @@ public void registerReader(int subtaskId) { if (assigned < 0) { assigned = subtaskId; context.assignSplit(subtaskId, new ClickhouseSourceSplit()); - context.signalNoMoreSplits(subtaskId); - } else { - context.signalNoMoreSplits(subtaskId); } + context.signalNoMoreSplits(subtaskId); } @Override From 802f1f93a4fb1f77c6c6211e6348118f011d65dc Mon Sep 17 00:00:00 2001 From: "YOMO.LEE" <75362129@qq.com> Date: Tue, 29 Oct 2024 14:15:51 +0800 Subject: [PATCH 11/11] [Fix][Connector-V2] Fixed clickhouse connectors cannot stop under multiple parallelism format code --- .../seatunnel/clickhouse/source/ClickhouseSourceReader.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java index 1b46375545a..3ad0ec041e6 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java @@ -102,7 +102,9 @@ record -> { } signalNoMoreElement(); } - if (noMoreSplit && splits.isEmpty() && Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { + if (noMoreSplit + && splits.isEmpty() + && Boundedness.BOUNDED.equals(readerContext.getBoundedness())) { signalNoMoreElement(); } }