From 40f1993a829fbc0f80900e4993495516eee7eb69 Mon Sep 17 00:00:00 2001 From: Jarvis Date: Wed, 12 Jul 2023 17:48:19 +0800 Subject: [PATCH 01/14] [Feature] support compress on Text File Read --- .../file/config/BaseSourceConfig.java | 6 + .../file/source/reader/TextReadStrategy.java | 31 ++++- .../e2e/connector/file/local/LocalFileIT.java | 8 ++ .../src/test/resources/text/e2e.lzo.txt | Bin 0 -> 2748 bytes .../text/local_file_text_lzo_to_assert.conf | 116 ++++++++++++++++++ 5 files changed, 159 insertions(+), 2 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.lzo.txt create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java index fa65628bd56..3624e134836 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java @@ -112,4 +112,10 @@ public class BaseSourceConfig { .stringType() .noDefaultValue() .withDescription("To be read sheet name,only valid for excel files"); + + public static final Option COMPRESS_CODEC = + Options.key("compress_codec") + .enumType(CompressFormat.class) + .defaultValue(CompressFormat.NONE) + .withDescription("Compression codec"); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java index 4b931cb8902..51892cf99f5 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; @@ -39,12 +40,17 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import io.airlift.compress.lzo.LzopCodec; +import lombok.extern.slf4j.Slf4j; + import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.Map; +@Slf4j public class TextReadStrategy extends AbstractReadStrategy { private DeserializationSchema deserializationSchema; private String fieldDelimiter = BaseSourceConfig.DELIMITER.defaultValue(); @@ -52,6 +58,7 @@ public class TextReadStrategy extends AbstractReadStrategy { private DateTimeUtils.Formatter datetimeFormat = BaseSourceConfig.DATETIME_FORMAT.defaultValue(); private TimeUtils.Formatter timeFormat = BaseSourceConfig.TIME_FORMAT.defaultValue(); + private CompressFormat compressFormat = BaseSourceConfig.COMPRESS_CODEC.defaultValue(); private int[] indexes; @Override @@ -61,9 +68,25 @@ public void read(String path, Collector output) FileSystem fs = FileSystem.get(conf); Path filePath = new Path(path); Map partitionsMap = parsePartitionsByPath(path); + InputStream inputStream; + switch (compressFormat) { + case LZO: + LzopCodec lzo = new LzopCodec(); + inputStream = lzo.createInputStream(fs.open(filePath)); + break; + case NONE: + inputStream = fs.open(filePath); + break; + default: + log.warn( + "Text file does not support this compress type: {}", + compressFormat.getCompressCodec()); + inputStream = fs.open(filePath); + break; + } + try (BufferedReader reader = - new BufferedReader( - new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) { + new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { reader.lines() .skip(skipHeaderNumber) .forEach( @@ -200,5 +223,9 @@ private void initFormatter() { TimeUtils.Formatter.parse( pluginConfig.getString(BaseSourceConfig.TIME_FORMAT.key())); } + if (pluginConfig.hasPath(BaseSourceConfig.COMPRESS_CODEC.key())) { + String compressCodec = pluginConfig.getString(BaseSourceConfig.COMPRESS_CODEC.key()); + compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase()); + } } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index f5c220deabd..a0f96a612b6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -47,6 +47,7 @@ public class LocalFileIT extends TestSuiteBase { Path orcPath = ContainerUtil.getResourcesFile("/orc/e2e.orc").toPath(); Path parquetPath = ContainerUtil.getResourcesFile("/parquet/e2e.parquet").toPath(); Path textPath = ContainerUtil.getResourcesFile("/text/e2e.txt").toPath(); + Path lzoTextPath = ContainerUtil.getResourcesFile("/text/e2e.lzo.txt").toPath(); Path excelPath = ContainerUtil.getResourcesFile("/excel/e2e.xlsx").toPath(); container.copyFileToContainer( MountableFile.forHostPath(jsonPath), @@ -60,6 +61,9 @@ public class LocalFileIT extends TestSuiteBase { container.copyFileToContainer( MountableFile.forHostPath(textPath), "/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt"); + container.copyFileToContainer( + MountableFile.forHostPath(lzoTextPath), + "/seatunnel/read/lzo_text/e2e.txt"); container.copyFileToContainer( MountableFile.forHostPath(excelPath), "/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx"); @@ -82,6 +86,10 @@ public void testLocalFileReadAndWrite(TestContainer container) Container.ExecResult textWriteResult = container.executeJob("/text/fake_to_local_file_text.conf"); Assertions.assertEquals(0, textWriteResult.getExitCode()); + // test read local lzo text file + Container.ExecResult lzoTextReadResult = + container.executeJob("/text/local_file_text_lzo_to_assert.conf"); + Assertions.assertEquals(0, lzoTextReadResult.getExitCode()); // test read skip header Container.ExecResult textWriteAndSkipResult = container.executeJob("/text/local_file_text_skip_headers.conf"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.lzo.txt b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.lzo.txt new file mode 100644 index 0000000000000000000000000000000000000000..fd8f7b852337c4b51c478c1d83520ae138db3f02 GIT binary patch literal 2748 zcmXX|Id3D$8SSoW4`yT-Sf@LHAvm!UHR?VHV{G1%NKv9Fi4-%6;vpX5C6Xc|JKLGx z;9vs>V`GB}9BklV0~?soaO~e;VE=&OR~i$NP~9q4)%U)4)cb1tJo?$e*9TvHb$IdS zckxf6m(L&mbpPq&&tHZ|uj#*{=-?mt{Sf_Uk)NGa=<#r|IV1UPVO*nU)8fg9R8FqC zOFEtPW(zXeEaxM7xt})cWY=xvF6robUmM0;D`gnh#t^OrvzA%M2YYmf5F&5iY38^{PMQQQC zx$|CNl48zzRg}!eyJ`8p84D)l5obquA(X@{%NR2n%Smn>PqmO%aVZ%So+~Yc{P-&R z%-j*CTz*KFn|3ot$JxQ{LsHMKCIecZ9+zgxuvgeV z)9LMaag$7YrRNE7P}~cvoS*?0uO(B88f&@1ZiZ?ljgi_*DjnxmJ7dXYTfVs^{mx|7 zBTf*UTM=gk$LSO!QX1izv`z`bwUtI@StEm!HFnSUOUc5Q!hO$!&5yCNxG>?c*4-_`EVLszXZs96)ilt$r7 zf)r?l;ph*+{}mTn7)em014K^6liKWI7r(#}OBPE33}X&-e;M^{`2@vqpDB7rX3olC)ZnjeELimhOg0p)|jq(qU^{ z86^+R!u5_m4tk40a@SrR4^go(SX?tl(S0V2S3*w+x~.#emN*vFCB4I{4&QSU8unH=z@e1qpI=2wbHwDK5I`d%+(BjD+7@Z< z_F^z6H`9}Hg?2`l`6X$sJKG&S$!=>8;A3^)ptGym@tSn+M(b-@t+XB&p&wx6oAf*9wL_tR)Tl!cw3BC40!k;IItT_%(zwk%!D=gw8N^a6IUAw{)1T zGzX+TXm|EBH$2bZlTz+xm8a9n_2ZbO2w2myU) zgWU;&Kn62p@DhXOS%zx005s@^i$~Y_{P0sD`h`|dHdq@Y9+(hHBV;u9-eaUZ+(d`o zVp7mLs`>Z(=(V!U05I$c@uLR-!N$Gf?bXe9@1nOMP!LQU0AW6?H}r!exOB=4nxYt} ziDzh0vX8F_GqBu0zK_N#!&Uq^FYM2gZM(c!(OvdAd!Fof<#vzGrjzzlQfW4_J9=B3 zZn{Y^-*30*@oKY~Ce!Zj=?t2H*}?ZXJR*=9J+O>oXIMCniD5_?1)oFSpgXWdAw0=e z7ybDUAEMtd11d;pCsYn+fDuDxC_mUhAPD;Bh)!{Opo=6m#HHx82D~r=%aspMkf)2< z-3|F>0ISdlR{{Y+23QDQE*wM)CZs{hP@EEQFE|)D_}4=87Y;r75GM+8;to2atNSau z+h>bQ(r#wkb9y=&=Q^Y~x~@Ib<*L;@BgMPMph?TURk2A<+w0~ftvAY}Exf|uR0t}# z8)DSq^r4{OACQL-sLfmM0gnoK180PbiTJ)>+LgZCHw0K=u}16xrgy+J79K8-#yJ^u z4pn#)`xL3@z5Ltz=#BCg@Pki4%YY>>Zjl?ex|R6TFQYg92x}?}{1ieMBtXhbi8M>i z?|*!QXaQld5yXmYopYFlJ!CSYM27h|f4bk!l76R|o6+^U-p?n)QZc)vrCk3xo8%_# zNs~@Gx9eq6JFaX>G=F;Cs3+w?clJo&mDmeON5M#meAkxZN`OW?>nQ#jz!Iop;A40s z+it16Am98Ly(1b>c~BjI;YtQJX=0GhLP7&*8-`4K%t=857!7!V8$_x&TmZKP1}nkO z3o(X+M*i;aFbYUS6Tkq!L5@QbpbqHfY>I=z%^j{pFw>wi@$uD98E|tUQbU5UFBjA3 EfAb)6f&c&j literal 0 HcmV?d00001 diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf new file mode 100644 index 00000000000..b32c37e367e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf @@ -0,0 +1,116 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + LocalFile { + path = "/seatunnel/read/lzo_text" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "lzo" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} \ No newline at end of file From e0e26b3f0eb55db3f49916b11232f88121da6f20 Mon Sep 17 00:00:00 2001 From: Jarvis Date: Fri, 14 Jul 2023 11:08:11 +0800 Subject: [PATCH 02/14] [Feature] fix code style --- .../apache/seatunnel/e2e/connector/file/local/LocalFileIT.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index a0f96a612b6..ac6bd8bfe0d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -62,8 +62,7 @@ public class LocalFileIT extends TestSuiteBase { MountableFile.forHostPath(textPath), "/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt"); container.copyFileToContainer( - MountableFile.forHostPath(lzoTextPath), - "/seatunnel/read/lzo_text/e2e.txt"); + MountableFile.forHostPath(lzoTextPath), "/seatunnel/read/lzo_text/e2e.txt"); container.copyFileToContainer( MountableFile.forHostPath(excelPath), "/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx"); From 21521a058a90c61dcfa7a77a71f2c6770e1c82bc Mon Sep 17 00:00:00 2001 From: Jarvis Date: Wed, 19 Jul 2023 17:05:17 +0800 Subject: [PATCH 03/14] [Feature] update local file doc --- docs/en/connector-v2/source/LocalFile.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index d33288b7a57..2d9eebe188d 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -49,6 +49,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | schema | config | no | - | | common-options | | no | - | | sheet_name | string | no | - | +| compress_codec | string | no | none | ### path [string] @@ -225,6 +226,11 @@ Source plugin common parameters, please refer to [Source Common Options](common- Reader the sheet of the workbook,Only used when file_format is excel. +### compress_codec [string] +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` + ## Example ```hocon From 9ac95ca01d54e447ec2a9721e26a52ced0f82b2e Mon Sep 17 00:00:00 2001 From: Jarvis Date: Thu, 20 Jul 2023 16:31:01 +0800 Subject: [PATCH 04/14] [Improve] local file read compression --- docs/en/connector-v2/source/LocalFile.md | 5 ++ .../file/source/reader/JsonReadStrategy.java | 36 +++++++- .../e2e/connector/file/local/LocalFileIT.java | 23 +++++ .../src/test/resources/json/e2e_lzo.json | Bin 0 -> 3348 bytes .../json/local_file_json_lzo_to_console.conf | 81 ++++++++++++++++++ 5 files changed, 143 insertions(+), 2 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/e2e_lzo.json create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index 2d9eebe188d..1e52cc65f61 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -227,9 +227,14 @@ Source plugin common parameters, please refer to [Source Common Options](common- Reader the sheet of the workbook,Only used when file_format is excel. ### compress_codec [string] + The compress codec of files and the details that supported as the following shown: - txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. ## Example diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java index 4441a28faca..3b3b613aba3 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java @@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.format.json.JsonDeserializationSchema; @@ -30,14 +32,29 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import io.airlift.compress.lzo.LzopCodec; +import lombok.extern.slf4j.Slf4j; + import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.Map; +@Slf4j public class JsonReadStrategy extends AbstractReadStrategy { private DeserializationSchema deserializationSchema; + private CompressFormat compressFormat = BaseSourceConfig.COMPRESS_CODEC.defaultValue(); + + @Override + public void init(HadoopConf conf) { + super.init(conf); + if (pluginConfig.hasPath(BaseSourceConfig.COMPRESS_CODEC.key())) { + String compressCodec = pluginConfig.getString(BaseSourceConfig.COMPRESS_CODEC.key()); + compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase()); + } + } @Override public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { @@ -58,9 +75,24 @@ public void read(String path, Collector output) FileSystem fs = FileSystem.get(conf); Path filePath = new Path(path); Map partitionsMap = parsePartitionsByPath(path); + InputStream inputStream; + switch (compressFormat) { + case LZO: + LzopCodec lzo = new LzopCodec(); + inputStream = lzo.createInputStream(fs.open(filePath)); + break; + case NONE: + inputStream = fs.open(filePath); + break; + default: + log.warn( + "Text file does not support this compress type: {}", + compressFormat.getCompressCodec()); + inputStream = fs.open(filePath); + break; + } try (BufferedReader reader = - new BufferedReader( - new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) { + new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { reader.lines() .forEach( line -> { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index ac6bd8bfe0d..28f01a0a308 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -44,6 +44,7 @@ public class LocalFileIT extends TestSuiteBase { private final ContainerExtendedFactory extendedFactory = container -> { Path jsonPath = ContainerUtil.getResourcesFile("/json/e2e.json").toPath(); + Path lzoJsonPath = ContainerUtil.getResourcesFile("/json/e2e_lzo.json").toPath(); Path orcPath = ContainerUtil.getResourcesFile("/orc/e2e.orc").toPath(); Path parquetPath = ContainerUtil.getResourcesFile("/parquet/e2e.parquet").toPath(); Path textPath = ContainerUtil.getResourcesFile("/text/e2e.txt").toPath(); @@ -52,6 +53,9 @@ public class LocalFileIT extends TestSuiteBase { container.copyFileToContainer( MountableFile.forHostPath(jsonPath), "/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json"); + container.copyFileToContainer( + MountableFile.forHostPath(lzoJsonPath), + "/seatunnel/read/lzo_json/e2e.json"); container.copyFileToContainer( MountableFile.forHostPath(orcPath), "/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc"); @@ -71,6 +75,9 @@ public class LocalFileIT extends TestSuiteBase { @TestTemplate public void testLocalFileReadAndWrite(TestContainer container) throws IOException, InterruptedException { + /** + * excel + */ Container.ExecResult excelWriteResult = container.executeJob("/excel/fake_to_local_excel.conf"); Assertions.assertEquals(0, excelWriteResult.getExitCode(), excelWriteResult.getStderr()); @@ -81,6 +88,9 @@ public void testLocalFileReadAndWrite(TestContainer container) container.executeJob("/excel/local_excel_projection_to_assert.conf"); Assertions.assertEquals( 0, excelProjectionReadResult.getExitCode(), excelProjectionReadResult.getStderr()); + /** + * text + */ // test write local text file Container.ExecResult textWriteResult = container.executeJob("/text/fake_to_local_file_text.conf"); @@ -101,6 +111,9 @@ public void testLocalFileReadAndWrite(TestContainer container) Container.ExecResult textProjectionResult = container.executeJob("/text/local_file_text_projection_to_assert.conf"); Assertions.assertEquals(0, textProjectionResult.getExitCode()); + /** + * json + */ // test write local json file Container.ExecResult jsonWriteResult = container.executeJob("/json/fake_to_local_file_json.conf"); @@ -109,6 +122,13 @@ public void testLocalFileReadAndWrite(TestContainer container) Container.ExecResult jsonReadResult = container.executeJob("/json/local_file_json_to_assert.conf"); Assertions.assertEquals(0, jsonReadResult.getExitCode()); + // test read lzo json file + Container.ExecResult lzoJsonReadResult = + container.executeJob("/json/local_file_json_lzo_to_console.conf"); + Assertions.assertEquals(0, lzoJsonReadResult.getExitCode()); + /** + * orc + */ // test write local orc file Container.ExecResult orcWriteResult = container.executeJob("/orc/fake_to_local_file_orc.conf"); @@ -121,6 +141,9 @@ public void testLocalFileReadAndWrite(TestContainer container) Container.ExecResult orcProjectionResult = container.executeJob("/orc/local_file_orc_projection_to_assert.conf"); Assertions.assertEquals(0, orcProjectionResult.getExitCode()); + /** + * parquet + */ // test write local parquet file Container.ExecResult parquetWriteResult = container.executeJob("/parquet/fake_to_local_file_parquet.conf"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/e2e_lzo.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/e2e_lzo.json new file mode 100644 index 0000000000000000000000000000000000000000..5abf6283cc84d87cd0cdae3d1d163554abfef8ea GIT binary patch literal 3348 zcmY*c%TJ@%9W{^P%P=!0O+u73qRDusVJ3Vw_x74RqHl%~q+t_3h;= zL`Ip+G9CyWaP|A%oksh@K6o+on^yf`$(3e%adT}BEF~AFQdQu+nMA7qytmYv-2hIt ziwRe{X1cmP0UY1YO=f||miCi#z^%o(+41j}PHIkt#btfUl_AtJD}AH26XzlD){y4CC@GGJ0anE4Z>k!(K!Hjo_a_PiCPu15! zACr=UV#&Y*=LE7j@A_--2#aw6rgV~<6F!*s`lK|P;A|R93M){CH@m+Gy6$dlZDrFm z;dK%bW%~4~!GsrjLNc6}DZ%h;&Z!~XqLM}{ML4%gGa`uAXOEGD7v@n$vqS~N21G>O zc%%3BX*l#^B3-W4!O>K)K2Zi|^?If3I3L@nwM)PoOIvrBftSitnL2PQvAyFso|vEC zTLZ2fjAbW*i}lq)-uJywghK-a8ZtiY;qGh|^w059G6SbDN;L)Hpk8ug3FK@1({NWM zH$B>noV{*-8`h|W*TEz)$FH8g+(WgZ+7N3`ZafJ36a{1WYZ%uOVm0LUNw|l_v@*RJWl;R^akwj4Yh;s6avbi2;3&CxYVZyiwA5;?iP2IBOCQ>qzzft8 zqODcjLJpc9KQnyCy8^WnmL(z(ETH0|H+BpHbL;74gh6DqvNBo&A%*t#yd!93Vxi*j z*jgnw>IjREr^kV}TKUBc@Z?Ui=00C3jgB}GH`Z4+O8%l3dKOnR6c*(^)hH;63aTtf zu$BnnQ{xmV6h7uK8iz9sT7G zkay^D#Rzw&bKk{=u#t>GzBnI;ES0eWem|KXzTba^k|>o~Vgf+|DWgt#nZ459I4+)x z&#yQ)k51%sPD!a!yWId@Us;$c0mql-CyI`!`0Bb-W@)^TssL}Sr5Z`#gY3a{$`@Yf zhMJ!|GcApRHmv2)HdpGMi~H|iqA_GF+)Y03zluI!DAbR30Qd6P z=l$K;d@-IIE6}&$!24Ba+sCby3Q7zp`1j2@FTTUW5I>=dc8510Zz=;rX6#;m5~kC@(!D&`XO+x z)yypd7uF9GZhdlMa(>f)-3wi(3^uXc5+^Dg48AhlM-2t+X&E#}3?jufHO&})jDl1< zbJ^A097Kn!3I9L+=AJkXE@_QLkk)y9@In7If%U}gt_Wdhhwt>>+GM3+}HDLdGz6THcYW6`l7->Avk{P=x6<1 zXq7^%sPX!EOf&7~2soF)r)a4k40SK3ikW>!Qpy(qj{AxbRA8cOwr1Rt~a<>%hw+$;>wJ z==A8U!0`PLJotXkIt`;*+b9>dDZHGrdx6@f+W4}`M zzhI%Oh$hQ1MJi+#9u7mHBlrqK3Unn-{vEa!7Ihr}O;uswW6_U?I=9>V^_`>Z_Rm9< zoXHnphI`TTRu2LYv1lZ}kr;Aw2nGd?DU9GcSSvWk+yTo`h#RVmW`eMdNjMyx!eA{P zt&uw2`LVghENPP|uhWG&J$f3eLQf#*-2TWisxj-JvltqKBdui$t(l-;gvj&92)58s zDza3*6)^Np7i2S~g*_Mf^OLE4C#Yh1C!Yh3r%Q<(aCIVk5C>iwTPe%{x2Bhyj>+xT zQp%0ZcUN1x?(mQ6L14m@mlsK#o2}E5rtThrYmEykSoE z-wk&jG!NI(=T8p4?hWSxbNuYBZ~AfWn@6vl2N*w41dHL~^e5lC@x!$i3?!JlT+$PT z_1#Dy%9xD4`!LarPy#=gqBS(CW96W$vRRCmI8_6L50?Y90JJ5DlUb-up~SE(rWW*R j3#CdN&_cWSo*w>eD2xukNS3ic!m0+7+YkQ#pP&B$rxrB~ literal 0 HcmV?d00001 diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf new file mode 100644 index 00000000000..9c5e382ca83 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + LocalFile { + path = "/seatunnel/read/lzo_json" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "json" + compress_codec = "lzo" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + + } +} + +sink { + Console { + + } +} \ No newline at end of file From 5ba9c265a900422a8a4eece1926681dbeae382a5 Mon Sep 17 00:00:00 2001 From: Jarvis Date: Thu, 27 Jul 2023 12:09:29 +0800 Subject: [PATCH 05/14] fix code style --- .../e2e/connector/file/local/LocalFileIT.java | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index 28f01a0a308..3da2f65fac8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -75,9 +75,7 @@ public class LocalFileIT extends TestSuiteBase { @TestTemplate public void testLocalFileReadAndWrite(TestContainer container) throws IOException, InterruptedException { - /** - * excel - */ + /** excel */ Container.ExecResult excelWriteResult = container.executeJob("/excel/fake_to_local_excel.conf"); Assertions.assertEquals(0, excelWriteResult.getExitCode(), excelWriteResult.getStderr()); @@ -88,9 +86,7 @@ public void testLocalFileReadAndWrite(TestContainer container) container.executeJob("/excel/local_excel_projection_to_assert.conf"); Assertions.assertEquals( 0, excelProjectionReadResult.getExitCode(), excelProjectionReadResult.getStderr()); - /** - * text - */ + /** text */ // test write local text file Container.ExecResult textWriteResult = container.executeJob("/text/fake_to_local_file_text.conf"); @@ -111,9 +107,7 @@ public void testLocalFileReadAndWrite(TestContainer container) Container.ExecResult textProjectionResult = container.executeJob("/text/local_file_text_projection_to_assert.conf"); Assertions.assertEquals(0, textProjectionResult.getExitCode()); - /** - * json - */ + /** json */ // test write local json file Container.ExecResult jsonWriteResult = container.executeJob("/json/fake_to_local_file_json.conf"); @@ -126,9 +120,7 @@ public void testLocalFileReadAndWrite(TestContainer container) Container.ExecResult lzoJsonReadResult = container.executeJob("/json/local_file_json_lzo_to_console.conf"); Assertions.assertEquals(0, lzoJsonReadResult.getExitCode()); - /** - * orc - */ + /** orc */ // test write local orc file Container.ExecResult orcWriteResult = container.executeJob("/orc/fake_to_local_file_orc.conf"); @@ -141,9 +133,7 @@ public void testLocalFileReadAndWrite(TestContainer container) Container.ExecResult orcProjectionResult = container.executeJob("/orc/local_file_orc_projection_to_assert.conf"); Assertions.assertEquals(0, orcProjectionResult.getExitCode()); - /** - * parquet - */ + /** parquet */ // test write local parquet file Container.ExecResult parquetWriteResult = container.executeJob("/parquet/fake_to_local_file_parquet.conf"); From f4a5d51c93440862ea0ecbf3e2192301f783ba0e Mon Sep 17 00:00:00 2001 From: Jarvis Date: Tue, 1 Aug 2023 13:15:02 +0800 Subject: [PATCH 06/14] code style --- .../seatunnel/e2e/connector/file/local/LocalFileIT.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index 99edcf7d06c..4a204c86587 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -45,17 +45,13 @@ public class LocalFileIT extends TestSuiteBase { "/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json", container); ContainerUtil.copyFileIntoContainers( - "/json/e2e_lzo.json", - "/seatunnel/read/lzo_json/e2e.json", - container); + "/json/e2e_lzo.json", "/seatunnel/read/lzo_json/e2e.json", container); ContainerUtil.copyFileIntoContainers( "/text/e2e.txt", "/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt", container); ContainerUtil.copyFileIntoContainers( - "/text/e2e.lzo.txt", - "/seatunnel/read/lzo_text/e2e.txt", - container); + "/text/e2e.lzo.txt", "/seatunnel/read/lzo_text/e2e.txt", container); ContainerUtil.copyFileIntoContainers( "/excel/e2e.xlsx", "/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx", From c2f4d6e741a3692b212e28a605ec9e7d543c268f Mon Sep 17 00:00:00 2001 From: Jarvis Date: Wed, 9 Aug 2023 10:19:01 +0800 Subject: [PATCH 07/14] [Improve] add compress_codec parameter to file related connector --- docs/en/connector-v2/source/CosFile.md | 11 +++++++++++ docs/en/connector-v2/source/FtpFile.md | 11 +++++++++++ docs/en/connector-v2/source/HdfsFile.md | 11 +++++++++++ docs/en/connector-v2/source/Hive.md | 11 +++++++++++ docs/en/connector-v2/source/OssFile.md | 11 +++++++++++ docs/en/connector-v2/source/OssJindoFile.md | 11 +++++++++++ docs/en/connector-v2/source/S3File.md | 11 +++++++++++ docs/en/connector-v2/source/SftpFile.md | 11 +++++++++++ .../file/cos/source/CosFileSourceFactory.java | 1 + .../file/ftp/source/FtpFileSourceFactory.java | 1 + .../file/hdfs/source/HdfsFileSourceFactory.java | 1 + .../file/oss/source/OssFileSourceFactory.java | 1 + .../file/oss/source/OssFileSourceFactory.java | 1 + .../seatunnel/file/s3/source/S3FileSourceFactory.java | 1 + .../file/sftp/source/SftpFileSourceFactory.java | 1 + 15 files changed, 95 insertions(+) diff --git a/docs/en/connector-v2/source/CosFile.md b/docs/en/connector-v2/source/CosFile.md index dd1e77ebcfd..236c4b8ca09 100644 --- a/docs/en/connector-v2/source/CosFile.md +++ b/docs/en/connector-v2/source/CosFile.md @@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### path [string] @@ -252,6 +253,16 @@ Reader the sheet of the workbook,Only used when file_format is excel. Filter pattern, which used for filtering files. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md index 6737511e63d..3c29b859639 100644 --- a/docs/en/connector-v2/source/FtpFile.md +++ b/docs/en/connector-v2/source/FtpFile.md @@ -49,6 +49,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### host [string] @@ -228,6 +229,16 @@ Source plugin common parameters, please refer to [Source Common Options](common- Reader the sheet of the workbook,Only used when file_format is excel. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md index 1d285c539a3..5ec8f4f76ff 100644 --- a/docs/en/connector-v2/source/HdfsFile.md +++ b/docs/en/connector-v2/source/HdfsFile.md @@ -54,6 +54,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### path [string] @@ -250,6 +251,16 @@ Reader the sheet of the workbook,Only used when file_format is excel. Filter pattern, which used for filtering files. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md index f9f35aaf733..dbe87936f59 100644 --- a/docs/en/connector-v2/source/Hive.md +++ b/docs/en/connector-v2/source/Hive.md @@ -44,6 +44,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | read_partitions | list | no | - | | read_columns | list | no | - | | common-options | | no | - | +| compress_codec | string | no | none | ### table_name [string] @@ -84,6 +85,16 @@ The read column list of the data source, user can use it to implement field proj Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```bash diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md index 12f2141cd6e..c6d4fbb733c 100644 --- a/docs/en/connector-v2/source/OssFile.md +++ b/docs/en/connector-v2/source/OssFile.md @@ -57,6 +57,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### path [string] @@ -249,6 +250,16 @@ Source plugin common parameters, please refer to [Source Common Options](common- Reader the sheet of the workbook,Only used when file_format is excel. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md index 913d277683e..de91d661593 100644 --- a/docs/en/connector-v2/source/OssJindoFile.md +++ b/docs/en/connector-v2/source/OssJindoFile.md @@ -57,6 +57,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### path [string] @@ -253,6 +254,16 @@ Reader the sheet of the workbook,Only used when file_format is excel. Filter pattern, which used for filtering files. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index 79a89be1c27..f41c19321b8 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -58,6 +58,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### path [string] @@ -304,6 +305,16 @@ Reader the sheet of the workbook,Only used when file_format is excel. Filter pattern, which used for filtering files. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Changelog ### 2.3.0-beta 2022-10-20 diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md index 22047d481ed..da432ad0fd8 100644 --- a/docs/en/connector-v2/source/SftpFile.md +++ b/docs/en/connector-v2/source/SftpFile.md @@ -48,6 +48,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### host [string] @@ -231,6 +232,16 @@ Reader the sheet of the workbook,Only used when file_format is excel. Filter pattern, which used for filtering files. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```hocon diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java index 496e9277f4e..e2b0285efba 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java @@ -61,6 +61,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java index 4ab637c4348..a29bb8228f2 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java @@ -61,6 +61,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java index c3d406d62c7..2e4832da8ac 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java @@ -58,6 +58,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java index eaea7bccb61..74f88bb17a0 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -61,6 +61,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java index e7d862bd44a..ca22d62ef6d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -61,6 +61,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java index a3b48088650..2d2f3d749a8 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java @@ -66,6 +66,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java index e9efe1cdf9b..7b6fc8d52c2 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java @@ -61,6 +61,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } From 3f966dc96d754bc9e9e9aea237b34f51344b3d65 Mon Sep 17 00:00:00 2001 From: jarvis Date: Mon, 4 Sep 2023 22:19:39 +0800 Subject: [PATCH 08/14] code style fix --- docs/en/connector-v2/source/HdfsFile.md | 3 +-- docs/en/connector-v2/source/Hive.md | 26 ++++++++++++------------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md index 3e8f80b6821..29092296ea7 100644 --- a/docs/en/connector-v2/source/HdfsFile.md +++ b/docs/en/connector-v2/source/HdfsFile.md @@ -39,7 +39,7 @@ Read data from hdfs file system. ## Source Options -| Name | Type | Required | Default | Description | +| Name | Type | Required | Default | Description | |---------------------------|---------|----------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | path | string | yes | - | The source file path. | | file_format_type | string | yes | - | We supported as the following file types:`text` `json` `csv` `orc` `parquet` `excel`.Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`. | @@ -69,7 +69,6 @@ The compress codec of files and the details that supported as the following show - orc/parquet: automatically recognizes the compression type, no additional settings required. - ### Tips > If you use spark/flink, In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x. If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you download and install SeaTunnel Engine. You can check the jar package under ${SEATUNNEL_HOME}/lib to confirm this. diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md index 0c7127cf570..c2ed0ca0669 100644 --- a/docs/en/connector-v2/source/Hive.md +++ b/docs/en/connector-v2/source/Hive.md @@ -33,19 +33,19 @@ Read all the data in a split in a pollNext call. What splits are read will be sa ## Options -| name | type | required | default value | -|-------------------------------|----------|----------|---------------| -| table_name | string | yes | - | -| metastore_uri | string | yes | - | -| kerberos_principal | string | no | - | -| kerberos_keytab_path | string | no | - | -| hdfs_site_path | string | no | - | -| hive_site_path | string | no | - | -| read_partitions | list | no | - | -| read_columns | list | no | - | -| abort_drop_partition_metadata | boolean | no | true | -| compress_codec | string | no | none | -| common-options | | no | - | +| name | type | required | default value | +|-------------------------------|---------|----------|---------------| +| table_name | string | yes | - | +| metastore_uri | string | yes | - | +| kerberos_principal | string | no | - | +| kerberos_keytab_path | string | no | - | +| hdfs_site_path | string | no | - | +| hive_site_path | string | no | - | +| read_partitions | list | no | - | +| read_columns | list | no | - | +| abort_drop_partition_metadata | boolean | no | true | +| compress_codec | string | no | none | +| common-options | | no | - | ### table_name [string] From 01865f24cf7c9c4ab80eaaeb595e82bbc98e23be Mon Sep 17 00:00:00 2001 From: jarvis Date: Fri, 15 Sep 2023 08:12:54 +0800 Subject: [PATCH 09/14] [Feature] add assert sink check --- .../json/local_file_json_lzo_to_console.conf | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf index 9c5e382ca83..12c4e626ffd 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf @@ -78,4 +78,61 @@ sink { Console { } + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 5 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_double + field_type = double + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = hobby + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } } \ No newline at end of file From 1af0b31d75cfea3bb815d52374c804bb12547d4a Mon Sep 17 00:00:00 2001 From: jarvis Date: Mon, 18 Sep 2023 19:14:44 +0800 Subject: [PATCH 10/14] [Feature] update assert sink check --- .../src/test/resources/json/e2e_lzo.json | Bin 3348 -> 3306 bytes .../json/local_file_json_lzo_to_console.conf | 51 ++++++++---------- 2 files changed, 22 insertions(+), 29 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/e2e_lzo.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/e2e_lzo.json index 5abf6283cc84d87cd0cdae3d1d163554abfef8ea..ed3a4e789158f9a21b84e8fec6bf8c47677e5e83 100644 GIT binary patch literal 3306 zcmZu!OH5 z*ZIzOzK>5A*D|3S*S~%J_1DL4hTjcD6K^15WOz(>dUs zMsCx+-`?Lj3V?U3jqFPJ6CW#AI-N>C9Q&a$T5ClK@u}jRaU+cM0WAR{##e+WDx_vU z6-Hal06uaH9Cka~tp*k?#yiPrY*Jm{Xt$g7N-G@OtTYenZlt^2>TkEY;aJ3y!KA%P zvpE=`(z3zOOGKF#Oj|Ao>ufg$qe4?=h{Cp%F`S7p;a+bx+Z8NHqlyusRgxPwT5I<< znsp4b=-{i3ez$&z=vJqFhBK}7bnSwLF4k&Yz}#GOZw{|Zy`_EHyDX>o}7iPHk>I zN&@HV>EpHw-pqPDmy4HO@RXF28RlCB?yTn4r&KXS$uf?BDki`l9;Hx3DnltkH?zJP z6*f6bj}eL9m_Ovac08@ieGAhldhEkiXC%INv%&87I_;s9f9$=satp&M3f z#*`3tutsX`>2T;RiJ&lOhg)fwumFhj%=+wNCXUruRB|De;O8df&8#Of`RPU6jz*<1 zTrgpnwN_h;uoCn6_1DTl(kVYk78Zl~{QakI4<+J3a$VL z>Ft$c?0S?+)po(pNu$?V13q{(pX&p+w+buRF1qqH#RX0qnJrnE$GoCQHn+(bcM zzO~%UmD=>7H{{a(@jIYBbPLfFMmX8w!0uZrqLf%I6qqAYP$Hx`f14mgEJ97dSd3$d z(7mN%slJPArxVF?0yG@$OfSQR!l`WVs0UnFY#uld%N*tR_knZqwUndnWNu+~0l2(7 zmvz~Z$sP1IBUtgfI54-!BMsW5gtx=#d>r0FRzh>%GDTqFhWk(k&Wz{r+rJ;C+X=4oX+Y(_7m@wr1l6O@{4OK?4BtPq`=VK$)`K^<+LVSEAfQXQM zMlonkqN2Fn5TB?;6=4d(ms|=6+n$~gOcYK=RK55Ge?At9&Gn98nBky5lX177J__=1 zjPQJ;m2Uy}Q=2)LWsP7uxd`0Ow^B|tbKSkw6X2!UPR13a>S0hn@Uh@6PLwsAYggO| zRN}M@Vvu79JVqJJ!JhqXR8wu>l(5f|H+)oil-PV)wq;K|{^R6t#E!bGH~M8hCGcui}VV=^xjNz}uxgWF=(O+-)2H?_{Ry zj@Vpc^C*f1?5%`Ey7P2PUE8nGbl+bSd+^=L zh*4}KU@%gG0?pw;ChBSjGT%asI3H5&^d|%pDT-7;$ybJ9=Dqn^W!~9!da0P&!2Oaz zFy|`jNn$r}_uZ`pn-$=AKXFh4-l#9G?f^Gt3N_bWah<(-a{#sNF_s| z8E^olps?YQS$M8gNHHe|T+cO6k}D%!0|iGJ-dSjOTc>LiHIKYMWiR&cHaxt$J2rI% zcSD^!|MlI=v6~zeXpQP+QL2okRz3arMQ4;o;Y%3h=f9l19J>KNpl28$nl>lw=$B(7 zxk61;XoFTg8=ic>%R3z?^6?BjSaz|I+*i0@*Fj literal 3348 zcmY*c%TJ@%9W{^P%P=!0O+u73qRDusVJ3Vw_x74RqHl%~q+t_3h;= zL`Ip+G9CyWaP|A%oksh@K6o+on^yf`$(3e%adT}BEF~AFQdQu+nMA7qytmYv-2hIt ziwRe{X1cmP0UY1YO=f||miCi#z^%o(+41j}PHIkt#btfUl_AtJD}AH26XzlD){y4CC@GGJ0anE4Z>k!(K!Hjo_a_PiCPu15! zACr=UV#&Y*=LE7j@A_--2#aw6rgV~<6F!*s`lK|P;A|R93M){CH@m+Gy6$dlZDrFm z;dK%bW%~4~!GsrjLNc6}DZ%h;&Z!~XqLM}{ML4%gGa`uAXOEGD7v@n$vqS~N21G>O zc%%3BX*l#^B3-W4!O>K)K2Zi|^?If3I3L@nwM)PoOIvrBftSitnL2PQvAyFso|vEC zTLZ2fjAbW*i}lq)-uJywghK-a8ZtiY;qGh|^w059G6SbDN;L)Hpk8ug3FK@1({NWM zH$B>noV{*-8`h|W*TEz)$FH8g+(WgZ+7N3`ZafJ36a{1WYZ%uOVm0LUNw|l_v@*RJWl;R^akwj4Yh;s6avbi2;3&CxYVZyiwA5;?iP2IBOCQ>qzzft8 zqODcjLJpc9KQnyCy8^WnmL(z(ETH0|H+BpHbL;74gh6DqvNBo&A%*t#yd!93Vxi*j z*jgnw>IjREr^kV}TKUBc@Z?Ui=00C3jgB}GH`Z4+O8%l3dKOnR6c*(^)hH;63aTtf zu$BnnQ{xmV6h7uK8iz9sT7G zkay^D#Rzw&bKk{=u#t>GzBnI;ES0eWem|KXzTba^k|>o~Vgf+|DWgt#nZ459I4+)x z&#yQ)k51%sPD!a!yWId@Us;$c0mql-CyI`!`0Bb-W@)^TssL}Sr5Z`#gY3a{$`@Yf zhMJ!|GcApRHmv2)HdpGMi~H|iqA_GF+)Y03zluI!DAbR30Qd6P z=l$K;d@-IIE6}&$!24Ba+sCby3Q7zp`1j2@FTTUW5I>=dc8510Zz=;rX6#;m5~kC@(!D&`XO+x z)yypd7uF9GZhdlMa(>f)-3wi(3^uXc5+^Dg48AhlM-2t+X&E#}3?jufHO&})jDl1< zbJ^A097Kn!3I9L+=AJkXE@_QLkk)y9@In7If%U}gt_Wdhhwt>>+GM3+}HDLdGz6THcYW6`l7->Avk{P=x6<1 zXq7^%sPX!EOf&7~2soF)r)a4k40SK3ikW>!Qpy(qj{AxbRA8cOwr1Rt~a<>%hw+$;>wJ z==A8U!0`PLJotXkIt`;*+b9>dDZHGrdx6@f+W4}`M zzhI%Oh$hQ1MJi+#9u7mHBlrqK3Unn-{vEa!7Ihr}O;uswW6_U?I=9>V^_`>Z_Rm9< zoXHnphI`TTRu2LYv1lZ}kr;Aw2nGd?DU9GcSSvWk+yTo`h#RVmW`eMdNjMyx!eA{P zt&uw2`LVghENPP|uhWG&J$f3eLQf#*-2TWisxj-JvltqKBdui$t(l-;gvj&92)58s zDza3*6)^Np7i2S~g*_Mf^OLE4C#Yh1C!Yh3r%Q<(aCIVk5C>iwTPe%{x2Bhyj>+xT zQp%0ZcUN1x?(mQ6L14m@mlsK#o2}E5rtThrYmEykSoE z-wk&jG!NI(=T8p4?hWSxbNuYBZ~AfWn@6vl2N*w41dHL~^e5lC@x!$i3?!JlT+$PT z_1#Dy%9xD4`!LarPy#=gqBS(CW96W$vRRCmI8_6L50?Y90JJ5DlUb-up~SE(rWW*R j3#CdN&_cWSo*w>eD2xukNS3ic!m0+7+YkQ#pP&B$rxrB~ diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf index 12c4e626ffd..5cdb8f49618 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf @@ -27,6 +27,7 @@ env { source { LocalFile { + result_table_name = "fake" path = "/seatunnel/read/lzo_json" row_delimiter = "\n" partition_dir_expression = "${k0}=${v0}" @@ -53,20 +54,20 @@ source { c_decimal = "decimal(38, 18)" c_timestamp = timestamp c_row = { - c_map = "map" - c_array = "array" - c_string = string - c_boolean = boolean - c_tinyint = tinyint - c_smallint = smallint - c_int = int - c_bigint = bigint - c_float = float - c_double = double - c_bytes = bytes - c_date = date - c_decimal = "decimal(38, 18)" - c_timestamp = timestamp + C_MAP = "map" + C_ARRAY = "array" + C_STRING = string + C_BOOLEAN = boolean + C_TINYINT = tinyint + C_SMALLINT = smallint + C_INT = int + C_BIGINT = bigint + C_FLOAT = float + C_DOUBLE = double + C_BYTES = bytes + C_DATE = date + C_DECIMAL = "decimal(38, 18)" + C_TIMESTAMP = timestamp } } } @@ -76,9 +77,10 @@ source { sink { Console { - + source_table_name = "fake" } Assert { + source_table_name = "fake" rules { row_rules = [ { @@ -92,7 +94,7 @@ sink { field_type = string field_value = [ { - rule_type = NOT_NULL + equals_to = "WArEB" } ] }, @@ -110,25 +112,16 @@ sink { field_type = double field_value = [ { - rule_type = NOT_NULL + equals_to = 12.4 } ] }, { - field_name = name - field_type = string + field_name = c_date + field_type = date field_value = [ { - rule_type = NOT_NULL - } - ] - }, - { - field_name = hobby - field_type = string - field_value = [ - { - rule_type = NOT_NULL + equals_to = "2022-05-05" } ] } From 1445f96d178f7ba8818ed0b30481fd34b7a37701 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Fri, 13 Oct 2023 17:18:38 +0800 Subject: [PATCH 11/14] update test case --- .../connector-file-local-e2e/pom.xml | 13 ++++ .../e2e/connector/file/local/LocalFileIT.java | 25 ++++++- .../src/test/resources/json/e2e_lzo.json | Bin 3306 -> 0 bytes .../json/local_file_json_lzo_to_console.conf | 25 +++++-- .../src/test/resources/text/e2e.lzo.txt | Bin 2748 -> 0 bytes .../text/local_file_text_lzo_to_assert.conf | 68 ++++++++++++------ .../e2e/common/util/ContainerUtil.java | 8 ++- 7 files changed, 106 insertions(+), 33 deletions(-) delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/e2e_lzo.json delete mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.lzo.txt diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml index 7a581bbc3a9..ea120abdd31 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml @@ -38,6 +38,19 @@ ${project.version} test + + org.apache.seatunnel + seatunnel-hadoop3-3.1.4-uber + ${project.version} + optional + test + + + org.apache.avro + avro + + + org.apache.seatunnel connector-assert diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index 926b88b8e1b..8132a9c7f59 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -28,7 +28,14 @@ import org.junit.jupiter.api.TestTemplate; +import io.airlift.compress.lzo.LzopCodec; + +import java.io.File; import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; @DisabledOnContainer( value = {TestContainerId.SPARK_2_4}, @@ -44,14 +51,19 @@ public class LocalFileIT extends TestSuiteBase { "/json/e2e.json", "/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json", container); + + Path jsonLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/json/e2e.json")); ContainerUtil.copyFileIntoContainers( - "/json/e2e_lzo.json", "/seatunnel/read/lzo_json/e2e.json", container); + jsonLzo, "/seatunnel/read/lzo_json/e2e.json", container); + ContainerUtil.copyFileIntoContainers( "/text/e2e.txt", "/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt", container); + + Path txtLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/text/e2e.txt")); ContainerUtil.copyFileIntoContainers( - "/text/e2e.lzo.txt", "/seatunnel/read/lzo_text/e2e.txt", container); + txtLzo, "/seatunnel/read/lzo_text/e2e.txt", container); ContainerUtil.copyFileIntoContainers( "/excel/e2e.xlsx", "/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx", @@ -115,4 +127,13 @@ public void testLocalFileReadAndWrite(TestContainer container) helper.execute("/json/local_file_to_console.conf"); helper.execute("/parquet/local_file_to_console.conf"); } + + private Path convertToLzoFile(File file) throws IOException { + LzopCodec lzo = new LzopCodec(); + Path path = Paths.get(file.getAbsolutePath() + ".lzo"); + OutputStream outputStream = lzo.createOutputStream(Files.newOutputStream(path)); + outputStream.write(Files.readAllBytes(file.toPath())); + outputStream.close(); + return path; + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/e2e_lzo.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/e2e_lzo.json deleted file mode 100644 index ed3a4e789158f9a21b84e8fec6bf8c47677e5e83..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 3306 zcmZu!OH5 z*ZIzOzK>5A*D|3S*S~%J_1DL4hTjcD6K^15WOz(>dUs zMsCx+-`?Lj3V?U3jqFPJ6CW#AI-N>C9Q&a$T5ClK@u}jRaU+cM0WAR{##e+WDx_vU z6-Hal06uaH9Cka~tp*k?#yiPrY*Jm{Xt$g7N-G@OtTYenZlt^2>TkEY;aJ3y!KA%P zvpE=`(z3zOOGKF#Oj|Ao>ufg$qe4?=h{Cp%F`S7p;a+bx+Z8NHqlyusRgxPwT5I<< znsp4b=-{i3ez$&z=vJqFhBK}7bnSwLF4k&Yz}#GOZw{|Zy`_EHyDX>o}7iPHk>I zN&@HV>EpHw-pqPDmy4HO@RXF28RlCB?yTn4r&KXS$uf?BDki`l9;Hx3DnltkH?zJP z6*f6bj}eL9m_Ovac08@ieGAhldhEkiXC%INv%&87I_;s9f9$=satp&M3f z#*`3tutsX`>2T;RiJ&lOhg)fwumFhj%=+wNCXUruRB|De;O8df&8#Of`RPU6jz*<1 zTrgpnwN_h;uoCn6_1DTl(kVYk78Zl~{QakI4<+J3a$VL z>Ft$c?0S?+)po(pNu$?V13q{(pX&p+w+buRF1qqH#RX0qnJrnE$GoCQHn+(bcM zzO~%UmD=>7H{{a(@jIYBbPLfFMmX8w!0uZrqLf%I6qqAYP$Hx`f14mgEJ97dSd3$d z(7mN%slJPArxVF?0yG@$OfSQR!l`WVs0UnFY#uld%N*tR_knZqwUndnWNu+~0l2(7 zmvz~Z$sP1IBUtgfI54-!BMsW5gtx=#d>r0FRzh>%GDTqFhWk(k&Wz{r+rJ;C+X=4oX+Y(_7m@wr1l6O@{4OK?4BtPq`=VK$)`K^<+LVSEAfQXQM zMlonkqN2Fn5TB?;6=4d(ms|=6+n$~gOcYK=RK55Ge?At9&Gn98nBky5lX177J__=1 zjPQJ;m2Uy}Q=2)LWsP7uxd`0Ow^B|tbKSkw6X2!UPR13a>S0hn@Uh@6PLwsAYggO| zRN}M@Vvu79JVqJJ!JhqXR8wu>l(5f|H+)oil-PV)wq;K|{^R6t#E!bGH~M8hCGcui}VV=^xjNz}uxgWF=(O+-)2H?_{Ry zj@Vpc^C*f1?5%`Ey7P2PUE8nGbl+bSd+^=L zh*4}KU@%gG0?pw;ChBSjGT%asI3H5&^d|%pDT-7;$ybJ9=Dqn^W!~9!da0P&!2Oaz zFy|`jNn$r}_uZ`pn-$=AKXFh4-l#9G?f^Gt3N_bWah<(-a{#sNF_s| z8E^olps?YQS$M8gNHHe|T+cO6k}D%!0|iGJ-dSjOTc>LiHIKYMWiR&cHaxt$J2rI% zcSD^!|MlI=v6~zeXpQP+QL2okRz3arMQ4;o;Y%3h=f9l19J>KNpl28$nl>lw=$B(7 zxk61;XoFTg8=ic>%R3z?^6?BjSaz|I+*i0@*Fj diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf index 5cdb8f49618..bab2c38eddf 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf @@ -71,21 +71,32 @@ source { } } } + } +} +transform { + sql { + source_table_name = "fake" + result_table_name = "sqlresult" + query = "select * from fake where c_string = 'WArEB'" } } sink { Console { - source_table_name = "fake" + source_table_name = "sqlresult" } Assert { - source_table_name = "fake" + source_table_name = "sqlresult" rules { row_rules = [ { rule_type = MAX_ROW - rule_value = 5 + rule_value = 1 + }, + { + rule_type = MIN_ROW + rule_value = 1 } ], field_rules = [ @@ -108,11 +119,11 @@ sink { ] }, { - field_name = c_double - field_type = double + field_name = c_smallint + field_type = short field_value = [ { - equals_to = 12.4 + equals_to = 15920 } ] }, @@ -121,7 +132,7 @@ sink { field_type = date field_value = [ { - equals_to = "2022-05-05" + equals_to = "2022-04-27" } ] } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.lzo.txt b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.lzo.txt deleted file mode 100644 index fd8f7b852337c4b51c478c1d83520ae138db3f02..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2748 zcmXX|Id3D$8SSoW4`yT-Sf@LHAvm!UHR?VHV{G1%NKv9Fi4-%6;vpX5C6Xc|JKLGx z;9vs>V`GB}9BklV0~?soaO~e;VE=&OR~i$NP~9q4)%U)4)cb1tJo?$e*9TvHb$IdS zckxf6m(L&mbpPq&&tHZ|uj#*{=-?mt{Sf_Uk)NGa=<#r|IV1UPVO*nU)8fg9R8FqC zOFEtPW(zXeEaxM7xt})cWY=xvF6robUmM0;D`gnh#t^OrvzA%M2YYmf5F&5iY38^{PMQQQC zx$|CNl48zzRg}!eyJ`8p84D)l5obquA(X@{%NR2n%Smn>PqmO%aVZ%So+~Yc{P-&R z%-j*CTz*KFn|3ot$JxQ{LsHMKCIecZ9+zgxuvgeV z)9LMaag$7YrRNE7P}~cvoS*?0uO(B88f&@1ZiZ?ljgi_*DjnxmJ7dXYTfVs^{mx|7 zBTf*UTM=gk$LSO!QX1izv`z`bwUtI@StEm!HFnSUOUc5Q!hO$!&5yCNxG>?c*4-_`EVLszXZs96)ilt$r7 zf)r?l;ph*+{}mTn7)em014K^6liKWI7r(#}OBPE33}X&-e;M^{`2@vqpDB7rX3olC)ZnjeELimhOg0p)|jq(qU^{ z86^+R!u5_m4tk40a@SrR4^go(SX?tl(S0V2S3*w+x~.#emN*vFCB4I{4&QSU8unH=z@e1qpI=2wbHwDK5I`d%+(BjD+7@Z< z_F^z6H`9}Hg?2`l`6X$sJKG&S$!=>8;A3^)ptGym@tSn+M(b-@t+XB&p&wx6oAf*9wL_tR)Tl!cw3BC40!k;IItT_%(zwk%!D=gw8N^a6IUAw{)1T zGzX+TXm|EBH$2bZlTz+xm8a9n_2ZbO2w2myU) zgWU;&Kn62p@DhXOS%zx005s@^i$~Y_{P0sD`h`|dHdq@Y9+(hHBV;u9-eaUZ+(d`o zVp7mLs`>Z(=(V!U05I$c@uLR-!N$Gf?bXe9@1nOMP!LQU0AW6?H}r!exOB=4nxYt} ziDzh0vX8F_GqBu0zK_N#!&Uq^FYM2gZM(c!(OvdAd!Fof<#vzGrjzzlQfW4_J9=B3 zZn{Y^-*30*@oKY~Ce!Zj=?t2H*}?ZXJR*=9J+O>oXIMCniD5_?1)oFSpgXWdAw0=e z7ybDUAEMtd11d;pCsYn+fDuDxC_mUhAPD;Bh)!{Opo=6m#HHx82D~r=%aspMkf)2< z-3|F>0ISdlR{{Y+23QDQE*wM)CZs{hP@EEQFE|)D_}4=87Y;r75GM+8;to2atNSau z+h>bQ(r#wkb9y=&=Q^Y~x~@Ib<*L;@BgMPMph?TURk2A<+w0~ftvAY}Exf|uR0t}# z8)DSq^r4{OACQL-sLfmM0gnoK180PbiTJ)>+LgZCHw0K=u}16xrgy+J79K8-#yJ^u z4pn#)`xL3@z5Ltz=#BCg@Pki4%YY>>Zjl?ex|R6TFQYg92x}?}{1ieMBtXhbi8M>i z?|*!QXaQld5yXmYopYFlJ!CSYM27h|f4bk!l76R|o6+^U-p?n)QZc)vrCk3xo8%_# zNs~@Gx9eq6JFaX>G=F;Cs3+w?clJo&mDmeON5M#meAkxZN`OW?>nQ#jz!Iop;A40s z+it16Am98Ly(1b>c~BjI;YtQJX=0GhLP7&*8-`4K%t=857!7!V8$_x&TmZKP1}nkO z3o(X+M*i;aFbYUS6Tkq!L5@QbpbqHfY>I=z%^j{pFw>wi@$uD98E|tUQbU5UFBjA3 EfAb)6f&c&j diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf index b32c37e367e..c15333ed3f7 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf @@ -36,8 +36,23 @@ source { filename_time_format = "yyyy.MM.dd" is_enable_transaction = true compress_codec = "lzo" - schema = { - fields { + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { c_map = "map" c_array = "array" c_string = string @@ -52,34 +67,32 @@ source { c_date = date c_decimal = "decimal(38, 18)" c_timestamp = timestamp - c_row = { - c_map = "map" - c_array = "array" - c_string = string - c_boolean = boolean - c_tinyint = tinyint - c_smallint = smallint - c_int = int - c_bigint = bigint - c_float = float - c_double = double - c_bytes = bytes - c_date = date - c_decimal = "decimal(38, 18)" - c_timestamp = timestamp - } } } + } + } +} + +transform { + sql { + source_table_name = "fake" + result_table_name = "sqlresult" + query = "select * from fake where c_string = 'MTDna'" } } sink { Assert { + source_table_name = "sqlresult" rules { row_rules = [ { rule_type = MAX_ROW - rule_value = 5 + rule_value = 1 + }, + { + rule_type = MIN_ROW + rule_value = 1 } ], field_rules = [ @@ -88,7 +101,7 @@ sink { field_type = string field_value = [ { - rule_type = NOT_NULL + equals_to = "MTDna" } ] }, @@ -102,11 +115,20 @@ sink { ] }, { - field_name = c_double - field_type = double + field_name = c_smallint + field_type = short field_value = [ { - rule_type = NOT_NULL + equals_to = 13846 + } + ] + }, + { + field_name = c_date + field_type = date + field_value = [ + { + equals_to = "2023-06-07" } ] } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java index ca26d069ecf..6f8c5f597ff 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java @@ -37,6 +37,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -239,7 +240,7 @@ public static List discoverTestContainers() { ServiceLoader.load(TestContainer.class, Thread.currentThread().getContextClassLoader()) .iterator() .forEachRemaining(result::add); - return result; + return Collections.singletonList(result.get(4)); } catch (ServiceConfigurationError e) { log.error("Could not load service provider for containers.", e); throw new FactoryException("Could not load service provider for containers.", e); @@ -249,6 +250,11 @@ public static List discoverTestContainers() { public static void copyFileIntoContainers( String fileName, String targetPath, GenericContainer container) { Path path = getResourcesFile(fileName).toPath(); + copyFileIntoContainers(path, targetPath, container); + } + + public static void copyFileIntoContainers( + Path path, String targetPath, GenericContainer container) { container.copyFileToContainer(MountableFile.forHostPath(path), targetPath); } } From 4a484ac1af0dafa63b716a24103b9fa031bd76ae Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Sat, 14 Oct 2023 09:51:49 +0800 Subject: [PATCH 12/14] update --- .../org/apache/seatunnel/e2e/common/util/ContainerUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java index 6f8c5f597ff..628f8d090f3 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java @@ -240,7 +240,7 @@ public static List discoverTestContainers() { ServiceLoader.load(TestContainer.class, Thread.currentThread().getContextClassLoader()) .iterator() .forEachRemaining(result::add); - return Collections.singletonList(result.get(4)); + return result; } catch (ServiceConfigurationError e) { log.error("Could not load service provider for containers.", e); throw new FactoryException("Could not load service provider for containers.", e); From ff83e7cff04943ef5cc5b70c1c167aa5762717b1 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Sat, 14 Oct 2023 10:03:44 +0800 Subject: [PATCH 13/14] update --- .../java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java index 628f8d090f3..722a5c18e3a 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java @@ -37,7 +37,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; From 7e5cc2ea0ab42255fc422176d013f05b61cdaec6 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Mon, 16 Oct 2023 11:35:16 +0800 Subject: [PATCH 14/14] fix ci --- .../src/test/resources/text/local_file_text_lzo_to_assert.conf | 1 + 1 file changed, 1 insertion(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf index c15333ed3f7..80613ec0fcc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf @@ -70,6 +70,7 @@ source { } } } + result_table_name = "fake" } }