diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md new file mode 100644 index 00000000000..3c6d9091cb1 --- /dev/null +++ b/docs/en/connector-v2/sink/OssJindoFile.md @@ -0,0 +1,215 @@ +# OssFile + +> OssJindo file sink connector + +## Description + +Output data to oss file system using jindo api. + +> Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OSS and this connector need some hadoop dependencies. + +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) + +By default, we use 2PC commit to ensure `exactly-once` + +- [ ] [schema projection](../../concept/connector-v2-features.md) +- [x] file format + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json + +## Options + +| name | type | required | default value | +|----------------------------------|---------|----------|-----------------------------------------------------------| +| path | string | yes | - | +| bucket | string | yes | - | +| access_key | string | yes | - | +| access_secret | string | yes | - | +| endpoint | string | yes | - | +| file_name_expression | string | no | "${transactionId}" | +| file_format | string | no | "text" | +| filename_time_format | string | no | "yyyy.MM.dd" | +| field_delimiter | string | no | '\001' | +| row_delimiter | string | no | "\n" | +| partition_by | array | no | - | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | +| is_partition_field_write_in_file | boolean | no | false | +| sink_columns | array | no | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | +| common-options | | no | - | + +### path [string] + +The target dir path is required. + +### bucket [string] + +The bucket address of oss file system, for example: `oss://tyrantlucifer-image-bed` + +### access_key [string] + +The access key of oss file system. + +### access_secret [string] + +The access secret of oss file system. + +### endpoint [string] + +The endpoint of oss file system. + +### file_name_expression [string] + +`file_name_expression` describes the file expression which will be created into the `path`. We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`, +`${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`. + +Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. + +### file_format [string] + +We supported as the following file types: + +`text` `csv` `parquet` `orc` `json` + +Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`. + +### filename_time_format [string] + +When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows: + +| Symbol | Description | +| ------ | ------------------ | +| y | Year | +| M | Month | +| d | Day of month | +| H | Hour in day (0-23) | +| m | Minute in hour | +| s | Second in minute | + +See [Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html) for detailed time format syntax. + +### field_delimiter [string] + +The separator between columns in a row of data. Only needed by `text` and `csv` file format. + +### row_delimiter [string] + +The separator between rows in a file. Only needed by `text` and `csv` file format. + +### partition_by [array] + +Partition data based on selected fields + +### partition_dir_expression [string] + +If the `partition_by` is specified, we will generate the corresponding partition directory based on the partition information, and the final file will be placed in the partition directory. + +Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field and `v0` is the value of the first partition field. + +### is_partition_field_write_in_file [boolean] + +If `is_partition_field_write_in_file` is `true`, the partition field and the value of it will be written into data file. + +For example, if you want to write a Hive Data File, Its value should be `false`. + +### sink_columns [array] + +Which columns need be written to file, default value is all the columns get from `Transform` or `Source`. +The order of the fields determines the order in which the file is actually written. + +### is_enable_transaction [boolean] + +If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory. + +Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. + +Only support `true` now. + +### common options + +Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. + +## Example + +For text file format + +```hocon + + OssFile { + path="/seatunnel/sink" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxx" + access_secret = "xxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + field_delimiter = "\t" + row_delimiter = "\n" + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format = "text" + sink_columns = ["name","age"] + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + } + +``` + +For parquet file format + +```hocon + + OssFile { + path = "/seatunnel/sink" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + field_delimiter = "\t" + row_delimiter = "\n" + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format = "parquet" + sink_columns = ["name","age"] + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + } + +``` + +For orc file format + +```bash + + OssFile { + path="/seatunnel/sink" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxx" + access_secret = "xxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + field_delimiter = "\t" + row_delimiter = "\n" + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format = "orc" + sink_columns = ["name","age"] + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + } + +``` + +## Changelog + +### Next version + +- Add OSS Jindo File Sink Connector \ No newline at end of file diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md new file mode 100644 index 00000000000..26409c72b2d --- /dev/null +++ b/docs/en/connector-v2/source/OssJindoFile.md @@ -0,0 +1,247 @@ +# OssFile + +> OssJindo file source connector + +## Description + +Read data from aliyun oss file system using jindo api. + +> Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OSS and this connector need some hadoop dependencies. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) + +Read all the data in a split in a pollNext call. What splits are read will be saved in snapshot. + +- [x] [schema projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) +- [x] file format + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json + +## Options + +| name | type | required | default value | +|---------------------------|---------|----------|---------------------| +| path | string | yes | - | +| type | string | yes | - | +| bucket | string | yes | - | +| access_key | string | yes | - | +| access_secret | string | yes | - | +| endpoint | string | yes | - | +| delimiter | string | no | \001 | +| parse_partition_from_path | boolean | no | true | +| date_format | string | no | yyyy-MM-dd | +| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | +| time_format | string | no | HH:mm:ss | +| schema | config | no | - | +| common-options | | no | - | + +### path [string] + +The source file path. + +### delimiter [string] + +Field delimiter, used to tell connector how to slice and dice fields when reading text files + +default `\001`, the same as hive's default delimiter + +### parse_partition_from_path [boolean] + +Control whether parse the partition keys and values from file path + +For example if you read a file from path `oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` + +Every record data from file will be added these two fields: + +| name | age | +|----------------|-----| +| tyrantlucifer | 26 | + +Tips: **Do not define partition fields in schema option** + +### date_format [string] + +Date type format, used to tell connector how to convert string to date, supported as the following formats: + +`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` + +default `yyyy-MM-dd` + +### datetime_format [string] + +Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats: + +`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` + +default `yyyy-MM-dd HH:mm:ss` + +### time_format [string] + +Time type format, used to tell connector how to convert string to time, supported as the following formats: + +`HH:mm:ss` `HH:mm:ss.SSS` + +default `HH:mm:ss` + +### type [string] + +File type, supported as the following file types: + +`text` `csv` `parquet` `orc` `json` + +If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want. + +For example: + +upstream data is the following: + +```json + +{"code": 200, "data": "get success", "success": true} + +``` + +You can also save multiple pieces of data in one file and split them by newline: + +```json lines + +{"code": 200, "data": "get success", "success": true} +{"code": 300, "data": "get failed", "success": false} + +``` + +you should assign schema as the following: + +```hocon + +schema { + fields { + code = int + data = string + success = boolean + } +} + +``` + +connector will generate data as the following: + +| code | data | success | +|------|-------------|---------| +| 200 | get success | true | + +If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. + +If you assign file type to `text` `csv`, you can choose to specify the schema information or not. + +For example, upstream data is the following: + +```text + +tyrantlucifer#26#male + +``` + +If you do not assign data schema connector will treat the upstream data as the following: + +| content | +|------------------------| +| tyrantlucifer#26#male | + +If you assign data schema, you should also assign the option `delimiter` too except CSV file type + +you should assign schema and delimiter as the following: + +```hocon + +delimiter = "#" +schema { + fields { + name = string + age = int + gender = string + } +} + +``` + +connector will generate data as the following: + +| name | age | gender | +|---------------|-----|--------| +| tyrantlucifer | 26 | male | + +### bucket [string] + +The bucket address of oss file system, for example: `oss://tyrantlucifer-image-bed` + +### access_key [string] + +The access key of oss file system. + +### access_secret [string] + +The access secret of oss file system. + +### endpoint [string] + +The endpoint of oss file system. + +### schema [config] + +#### fields [Config] + +The schema of upstream data. + +### common options + +Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. + +## Example + +```hocon + + OssFile { + path = "/seatunnel/orc" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + type = "orc" + } + +``` + +```hocon + + OssFile { + path = "/seatunnel/json" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + type = "json" + schema { + fields { + id = int + name = string + } + } + } + +``` + +## Changelog + +### next version + +- Add OSS Jindo File Source Connector diff --git a/plugin-mapping.properties b/plugin-mapping.properties index e73f8c283d2..00bc5b39cca 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -115,6 +115,8 @@ seatunnel.source.LocalFile = connector-file-local seatunnel.sink.LocalFile = connector-file-local seatunnel.source.OssFile = connector-file-oss seatunnel.sink.OssFile = connector-file-oss +seatunnel.source.OssJindoFile = connector-file-oss-jindo +seatunnel.sink.OssJindoFile = connector-file-oss-jindo seatunnel.source.Pulsar = connector-pulsar seatunnel.source.Hudi = connector-hudi seatunnel.sink.DingTalk = connector-dingtalk diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java index 2aca9ded0b1..8d50cee4697 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java @@ -23,6 +23,7 @@ public enum FileSystemType implements Serializable { HDFS("HdfsFile"), LOCAL("LocalFile"), OSS("OssFile"), + OSS_JINDO("OssJindoFile"), FTP("FtpFile"), SFTP("SftpFile"), S3("S3File"); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml new file mode 100644 index 00000000000..fba755c5449 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml @@ -0,0 +1,79 @@ + + + + + connector-file + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-file-oss-jindo + + + 4.6.1 + 2.9.2 + + + + + jindodata + https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/ + + + + + + + org.apache.seatunnel + connector-file-base + ${project.version} + + + + com.aliyun.jindodata + jindo-core + ${jindo-sdk.version} + + + + com.aliyun.jindodata + jindosdk + ${jindo-sdk.version} + + + + org.apache.hadoop + hadoop-common + ${hadoop-common.version} + provided + + + avro + org.apache.avro + + + + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java new file mode 100644 index 00000000000..da6392044f8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.config; + +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.util.HashMap; + +public class OssConf extends HadoopConf { + private static final String HDFS_IMPL = "com.aliyun.emr.fs.oss.JindoOssFileSystem"; + private static final String SCHEMA = "oss"; + + @Override + public String getFsHdfsImpl() { + return HDFS_IMPL; + } + + @Override + public String getSchema() { + return SCHEMA; + } + + public OssConf(String hdfsNameKey) { + super(hdfsNameKey); + } + + public static HadoopConf buildWithConfig(Config config) { + HadoopConf hadoopConf = new OssConf(config.getString(OssConfig.BUCKET.key())); + HashMap ossOptions = new HashMap<>(); + ossOptions.put("fs.AbstractFileSystem.oss.impl", "com.aliyun.emr.fs.oss.OSS"); + ossOptions.put("fs.oss.impl", "com.aliyun.emr.fs.oss.JindoOssFileSystem"); + ossOptions.put("fs.oss.accessKeyId", config.getString(OssConfig.ACCESS_KEY.key())); + ossOptions.put("fs.oss.accessKeySecret", config.getString(OssConfig.ACCESS_SECRET.key())); + ossOptions.put("fs.oss.endpoint", config.getString(OssConfig.ENDPOINT.key())); + hadoopConf.setExtraOptions(ossOptions); + return hadoopConf; + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java new file mode 100644 index 00000000000..8c110dc3ced --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; + +public class OssConfig extends BaseSourceConfig { + public static final Option ACCESS_KEY = Options.key("access_key") + .stringType() + .noDefaultValue() + .withDescription("OSS bucket access key"); + public static final Option ACCESS_SECRET = Options.key("access_secret") + .stringType() + .noDefaultValue() + .withDescription("OSS bucket access secret"); + public static final Option ENDPOINT = Options.key("endpoint") + .stringType() + .noDefaultValue() + .withDescription("OSS endpoint"); + public static final Option BUCKET = Options.key("bucket") + .stringType() + .noDefaultValue() + .withDescription("OSS bucket"); +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java new file mode 100644 index 00000000000..cb718c5f2d6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class OssJindoConnectorException extends SeaTunnelRuntimeException { + public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java new file mode 100644 index 00000000000..bac6220da40 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; +import org.apache.seatunnel.connectors.seatunnel.file.oss.exception.OssJindoConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +@AutoService(SeaTunnelSink.class) +public class OssFileSink extends BaseFileSink { + @Override + public String getPluginName() { + return FileSystemType.OSS_JINDO.getFileSystemPluginName(); + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + super.prepare(pluginConfig); + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, + OssConfig.FILE_PATH.key(), + OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(), + OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key()); + if (!result.isSuccess()) { + throw new OssJindoConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SINK, result.getMsg())); + } + hadoopConf = OssConf.buildWithConfig(pluginConfig); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java new file mode 100644 index 00000000000..fe5c13b62e7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class OssFileSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return FileSystemType.OSS.getFileSystemPluginName(); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(OssConfig.FILE_PATH) + .required(OssConfig.BUCKET) + .required(OssConfig.ACCESS_KEY) + .required(OssConfig.ACCESS_SECRET) + .required(OssConfig.ENDPOINT) + .optional(BaseSinkConfig.FILE_NAME_EXPRESSION) + .optional(BaseSinkConfig.FILE_FORMAT) + .optional(BaseSinkConfig.FILENAME_TIME_FORMAT) + .optional(BaseSinkConfig.FIELD_DELIMITER) + .optional(BaseSinkConfig.ROW_DELIMITER) + .optional(BaseSinkConfig.PARTITION_BY) + .optional(BaseSinkConfig.PARTITION_DIR_EXPRESSION) + .optional(BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE) + .optional(BaseSinkConfig.SINK_COLUMNS) + .optional(BaseSinkConfig.IS_ENABLE_TRANSACTION) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java new file mode 100644 index 00000000000..440f0bc96b8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.source; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; +import org.apache.seatunnel.connectors.seatunnel.file.oss.exception.OssJindoConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSource.class) +public class OssFileSource extends BaseFileSource { + @Override + public String getPluginName() { + return FileSystemType.OSS_JINDO.getFileSystemPluginName(); + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, + OssConfig.FILE_PATH.key(), OssConfig.FILE_TYPE.key(), + OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(), + OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key()); + if (!result.isSuccess()) { + throw new OssJindoConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SOURCE, result.getMsg())); + } + readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key())); + readStrategy.setPluginConfig(pluginConfig); + String path = pluginConfig.getString(OssConfig.FILE_PATH.key()); + hadoopConf = OssConf.buildWithConfig(pluginConfig); + try { + filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); + } catch (IOException e) { + String errorMsg = String.format("Get file list from this path [%s] failed", path); + throw new FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e); + } + // support user-defined schema + FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase()); + // only json text csv type support user-defined schema now + if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) { + switch (fileFormat) { + case CSV: + case TEXT: + case JSON: + Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key()); + SeaTunnelRowType userDefinedSchema = SeaTunnelSchema + .buildWithConfig(schemaConfig) + .getSeaTunnelRowType(); + readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); + rowType = readStrategy.getActualSeaTunnelRowTypeInfo(); + break; + case ORC: + case PARQUET: + throw new OssJindoConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, + "SeaTunnel does not support user-defined schema for [parquet, orc] files"); + default: + // never got in there + throw new OssJindoConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, + "SeaTunnel does not supported this file format"); + } + } else { + try { + rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); + } catch (FileConnectorException e) { + String errorMsg = String.format("Get table schema from file [%s] failed", filePaths.get(0)); + throw new FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e); + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java new file mode 100644 index 00000000000..cd875c0e841 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.source; + +import org.apache.seatunnel.api.configuration.util.Condition; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class OssFileSourceFactory implements TableSourceFactory { + @Override + public String factoryIdentifier() { + return FileSystemType.OSS.getFileSystemPluginName(); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(OssConfig.FILE_PATH) + .required(OssConfig.FILE_TYPE) + .required(OssConfig.BUCKET) + .required(OssConfig.ACCESS_KEY) + .required(OssConfig.ACCESS_SECRET) + .required(OssConfig.ENDPOINT) + .optional(OssConfig.DELIMITER) + .optional(OssConfig.PARSE_PARTITION_FROM_PATH) + .optional(OssConfig.DATE_FORMAT) + .optional(OssConfig.DATETIME_FORMAT) + .optional(OssConfig.TIME_FORMAT) + .conditional(Condition.of(OssConfig.FILE_TYPE, "text"), SeaTunnelSchema.SCHEMA) + .conditional(Condition.of(OssConfig.FILE_TYPE, "json"), SeaTunnelSchema.SCHEMA) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem new file mode 100644 index 00000000000..de01ef278ef --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.aliyun.emr.fs.oss.JindoOssFileSystem \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/test/java/org/apache/seatunnel/connectors/test/OssJindoFactoryTest.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/test/java/org/apache/seatunnel/connectors/test/OssJindoFactoryTest.java new file mode 100644 index 00000000000..85955f086f9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/test/java/org/apache/seatunnel/connectors/test/OssJindoFactoryTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.test; + +import org.apache.seatunnel.connectors.seatunnel.file.oss.sink.OssFileSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.file.oss.source.OssFileSourceFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class OssJindoFactoryTest { + @Test + public void testOptionRule() { + Assertions.assertNotNull((new OssFileSourceFactory()).optionRule()); + Assertions.assertNotNull((new OssFileSinkFactory()).optionRule()); + } +} diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml index 773ae5143b3..2bb9426690a 100644 --- a/seatunnel-connectors-v2/connector-file/pom.xml +++ b/seatunnel-connectors-v2/connector-file/pom.xml @@ -42,6 +42,7 @@ connector-file-base-hadoop connector-file-sftp connector-file-s3 + connector-file-oss-jindo diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index bbbd55f299e..a1500941661 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -225,6 +225,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-file-oss-jindo + ${project.version} + provided + org.apache.seatunnel connector-file-ftp