From 1f3e137c493ccdc7f35273b06da9504224e9110f Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Sun, 23 Oct 2022 00:37:28 +0800 Subject: [PATCH 01/14] [Feature][Connector-V2][Oss-Jindo] Add oss-jindo source & sink connector --- .../seatunnel/file/config/FileSystemType.java | 1 + .../connector-file-oss-jindo/pom.xml | 43 +++++++++ .../seatunnel/file/oss/config/OssConf.java | 49 ++++++++++ .../seatunnel/file/oss/config/OssConfig.java | 27 ++++++ .../seatunnel/file/oss/sink/OssFileSink.java | 53 ++++++++++ .../file/oss/source/OssFileSource.java | 96 +++++++++++++++++++ .../services/org.apache.hadoop.fs.FileSystem | 16 ++++ .../connector-file/pom.xml | 1 + 8 files changed, 286 insertions(+) create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java index 2aca9ded0b1..8d50cee4697 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java @@ -23,6 +23,7 @@ public enum FileSystemType implements Serializable { HDFS("HdfsFile"), LOCAL("LocalFile"), OSS("OssFile"), + OSS_JINDO("OssJindoFile"), FTP("FtpFile"), SFTP("SftpFile"), S3("S3File"); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml new file mode 100644 index 00000000000..90e96fcef36 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml @@ -0,0 +1,43 @@ + + + + connector-file + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-file-oss-jindo + + + 3.7.3 + 2.9.2 + + + + + + org.apache.seatunnel + connector-file-base + ${project.version} + + + + com.aliyun.oss + jindo-sdk + ${jindo-sdk.version} + system + ${basedir}/src/libs/jindofs-sdk-3.7.3.jar + + + + org.apache.hadoop + hadoop-aliyun + ${hadoop-aliyun.version} + + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java new file mode 100644 index 00000000000..f4c8dab6366 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.config; + +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.util.HashMap; + +public class OssConf extends HadoopConf { + private final String fsHdfsImpl = "com.aliyun.emr.fs.oss.JindoOssFileSystem"; + + @Override + public String getFsHdfsImpl() { + return fsHdfsImpl; + } + + public OssConf(String hdfsNameKey) { + super(hdfsNameKey); + } + + public static HadoopConf buildWithConfig(Config config) { + HadoopConf hadoopConf = new OssConf(config.getString(OssConfig.BUCKET)); + HashMap ossOptions = new HashMap<>(); + ossOptions.put("fs.AbstractFileSystem.oss.impl", "com.aliyun.emr.fs.oss.OSS"); + ossOptions.put("fs.oss.impl", "com.aliyun.emr.fs.oss.JindoOssFileSystem"); + ossOptions.put("fs.oss.accessKeyId", config.getString(OssConfig.ACCESS_KEY)); + ossOptions.put("fs.oss.accessKeySecret", config.getString(OssConfig.ACCESS_SECRET)); + ossOptions.put("fs.oss.endpoint", config.getString(OssConfig.ENDPOINT)); + hadoopConf.setExtraOptions(ossOptions); + return hadoopConf; + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java new file mode 100644 index 00000000000..7a928e579ea --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.config; + +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; + +public class OssConfig extends BaseSourceConfig { + public static final String ACCESS_KEY = "access_key"; + public static final String ACCESS_SECRET = "access_secret"; + public static final String ENDPOINT = "endpoint"; + public static final String BUCKET = "bucket"; +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java new file mode 100644 index 00000000000..e6c11fee009 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; +import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +@AutoService(SeaTunnelSink.class) +public class OssFileSink extends BaseFileSink { + @Override + public String getPluginName() { + return FileSystemType.OSS_JINDO.getFileSystemPluginName(); + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + super.prepare(pluginConfig); + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, + OssConfig.FILE_PATH, + OssConfig.BUCKET, OssConfig.ACCESS_KEY, + OssConfig.ACCESS_SECRET, OssConfig.BUCKET); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg()); + } + hadoopConf = OssConf.buildWithConfig(pluginConfig); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java new file mode 100644 index 00000000000..287ce8cd8eb --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.source; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; +import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSource.class) +public class OssFileSource extends BaseFileSource { + @Override + public String getPluginName() { + return FileSystemType.OSS_JINDO.getFileSystemPluginName(); + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, + OssConfig.FILE_PATH, OssConfig.FILE_TYPE, + OssConfig.BUCKET, OssConfig.ACCESS_KEY, + OssConfig.ACCESS_SECRET, OssConfig.BUCKET); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE)); + readStrategy.setPluginConfig(pluginConfig); + String path = pluginConfig.getString(OssConfig.FILE_PATH); + hadoopConf = OssConf.buildWithConfig(pluginConfig); + try { + filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); + } catch (IOException e) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail."); + } + // support user-defined schema + FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE).toUpperCase()); + // only json text csv type support user-defined schema now + if (pluginConfig.hasPath(OssConfig.SCHEMA)) { + switch (fileFormat) { + case CSV: + case TEXT: + case JSON: + Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA); + SeaTunnelRowType userDefinedSchema = SeaTunnelSchema + .buildWithConfig(schemaConfig) + .getSeaTunnelRowType(); + readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema); + rowType = readStrategy.getActualSeaTunnelRowTypeInfo(); + break; + case ORC: + case PARQUET: + throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files"); + default: + // never got in there + throw new UnsupportedOperationException("SeaTunnel does not supported this file format"); + } + } else { + try { + rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); + } catch (FilePluginException e) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e); + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem new file mode 100644 index 00000000000..de01ef278ef --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.aliyun.emr.fs.oss.JindoOssFileSystem \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml index 773ae5143b3..2bb9426690a 100644 --- a/seatunnel-connectors-v2/connector-file/pom.xml +++ b/seatunnel-connectors-v2/connector-file/pom.xml @@ -42,6 +42,7 @@ connector-file-base-hadoop connector-file-sftp connector-file-s3 + connector-file-oss-jindo From 5a857892f6404d3f20ed0f2f82b5a4dfc8d664f9 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 16 Nov 2022 21:51:50 +0800 Subject: [PATCH 02/14] [Feature][Connector-V2][Oss-Jindo] Update OssConf and pom --- .../connector-file-oss-jindo/pom.xml | 30 +++++++++++++------ .../seatunnel/file/oss/config/OssConf.java | 10 +++++-- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml index 90e96fcef36..8fe4f4cb66c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml @@ -12,10 +12,17 @@ connector-file-oss-jindo - 3.7.3 - 2.9.2 + 4.6.1 + 2.9.2 + + + jindodata + https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/ + + + @@ -25,17 +32,22 @@ - com.aliyun.oss - jindo-sdk - ${jindo-sdk.version} - system - ${basedir}/src/libs/jindofs-sdk-3.7.3.jar + com.aliyun.jindodata + jindo-core + 4.6.1 + + + + com.aliyun.jindodata + jindosdk + 4.6.1 org.apache.hadoop - hadoop-aliyun - ${hadoop-aliyun.version} + hadoop-common + ${hadoop-common.version} + provided diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java index f4c8dab6366..b8ce6cc251b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java @@ -24,11 +24,17 @@ import java.util.HashMap; public class OssConf extends HadoopConf { - private final String fsHdfsImpl = "com.aliyun.emr.fs.oss.JindoOssFileSystem"; + private static final String HDFS_IMPL = "com.aliyun.emr.fs.oss.JindoOssFileSystem"; + private static final String SCHEMA = "oss"; @Override public String getFsHdfsImpl() { - return fsHdfsImpl; + return HDFS_IMPL; + } + + @Override + public String getSchema() { + return SCHEMA; } public OssConf(String hdfsNameKey) { From bda13d00285cb20abf1776d9d7b4069b1628b051 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 16 Nov 2022 21:59:08 +0800 Subject: [PATCH 03/14] [Feature][Connector-V2][Oss-Jindo] Update exception --- .../exception/OssJindoConnectorException.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java new file mode 100644 index 00000000000..cb718c5f2d6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class OssJindoConnectorException extends SeaTunnelRuntimeException { + public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} From 42ab6aa958a57455ee7e094c7f51955122771be8 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Thu, 17 Nov 2022 17:40:57 +0800 Subject: [PATCH 04/14] [Feature][Connector-V2][Oss-Jindo] Unified exception & add option factory --- .../seatunnel/file/oss/config/OssConfig.java | 22 ++++++-- .../exception/OssJindoConnectorException.java | 35 ++++++++++++ .../seatunnel/file/oss/sink/OssFileSink.java | 12 ++-- .../file/oss/sink/OssFileSinkFactory.java | 56 +++++++++++++++++++ .../file/oss/source/OssFileSource.java | 32 +++++++---- .../file/oss/source/OssFileSourceFactory.java | 55 ++++++++++++++++++ 6 files changed, 192 insertions(+), 20 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java index 7a928e579ea..8c110dc3ced 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConfig.java @@ -17,11 +17,25 @@ package org.apache.seatunnel.connectors.seatunnel.file.oss.config; +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; public class OssConfig extends BaseSourceConfig { - public static final String ACCESS_KEY = "access_key"; - public static final String ACCESS_SECRET = "access_secret"; - public static final String ENDPOINT = "endpoint"; - public static final String BUCKET = "bucket"; + public static final Option ACCESS_KEY = Options.key("access_key") + .stringType() + .noDefaultValue() + .withDescription("OSS bucket access key"); + public static final Option ACCESS_SECRET = Options.key("access_secret") + .stringType() + .noDefaultValue() + .withDescription("OSS bucket access secret"); + public static final Option ENDPOINT = Options.key("endpoint") + .stringType() + .noDefaultValue() + .withDescription("OSS endpoint"); + public static final Option BUCKET = Options.key("bucket") + .stringType() + .noDefaultValue() + .withDescription("OSS bucket"); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java new file mode 100644 index 00000000000..cb718c5f2d6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +public class OssJindoConnectorException extends SeaTunnelRuntimeException { + public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { + super(seaTunnelErrorCode, errorMessage); + } + + public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { + super(seaTunnelErrorCode, errorMessage, cause); + } + + public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { + super(seaTunnelErrorCode, cause); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java index e6c11fee009..bac6220da40 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.oss.sink; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; @@ -25,6 +26,7 @@ import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf; import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; +import org.apache.seatunnel.connectors.seatunnel.file.oss.exception.OssJindoConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -42,11 +44,13 @@ public String getPluginName() { public void prepare(Config pluginConfig) throws PrepareFailException { super.prepare(pluginConfig); CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, - OssConfig.FILE_PATH, - OssConfig.BUCKET, OssConfig.ACCESS_KEY, - OssConfig.ACCESS_SECRET, OssConfig.BUCKET); + OssConfig.FILE_PATH.key(), + OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(), + OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key()); if (!result.isSuccess()) { - throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg()); + throw new OssJindoConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SINK, result.getMsg())); } hadoopConf = OssConf.buildWithConfig(pluginConfig); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java new file mode 100644 index 00000000000..fe5c13b62e7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class OssFileSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return FileSystemType.OSS.getFileSystemPluginName(); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(OssConfig.FILE_PATH) + .required(OssConfig.BUCKET) + .required(OssConfig.ACCESS_KEY) + .required(OssConfig.ACCESS_SECRET) + .required(OssConfig.ENDPOINT) + .optional(BaseSinkConfig.FILE_NAME_EXPRESSION) + .optional(BaseSinkConfig.FILE_FORMAT) + .optional(BaseSinkConfig.FILENAME_TIME_FORMAT) + .optional(BaseSinkConfig.FIELD_DELIMITER) + .optional(BaseSinkConfig.ROW_DELIMITER) + .optional(BaseSinkConfig.PARTITION_BY) + .optional(BaseSinkConfig.PARTITION_DIR_EXPRESSION) + .optional(BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE) + .optional(BaseSinkConfig.SINK_COLUMNS) + .optional(BaseSinkConfig.IS_ENABLE_TRANSACTION) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java index 287ce8cd8eb..07eedabd799 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java @@ -18,17 +18,20 @@ package org.apache.seatunnel.connectors.seatunnel.file.oss.source; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.config.CheckConfigUtil; import org.apache.seatunnel.common.config.CheckResult; import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException; import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf; import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; +import org.apache.seatunnel.connectors.seatunnel.file.oss.exception.OssJindoConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; @@ -48,15 +51,17 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, - OssConfig.FILE_PATH, OssConfig.FILE_TYPE, - OssConfig.BUCKET, OssConfig.ACCESS_KEY, - OssConfig.ACCESS_SECRET, OssConfig.BUCKET); + OssConfig.FILE_PATH.key(), OssConfig.FILE_TYPE.key(), + OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(), + OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key()); if (!result.isSuccess()) { - throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + throw new OssJindoConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SOURCE, result.getMsg())); } - readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE)); + readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key())); readStrategy.setPluginConfig(pluginConfig); - String path = pluginConfig.getString(OssConfig.FILE_PATH); + String path = pluginConfig.getString(OssConfig.FILE_PATH.key()); hadoopConf = OssConf.buildWithConfig(pluginConfig); try { filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); @@ -64,14 +69,14 @@ public void prepare(Config pluginConfig) throws PrepareFailException { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail."); } // support user-defined schema - FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE).toUpperCase()); + FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase()); // only json text csv type support user-defined schema now - if (pluginConfig.hasPath(OssConfig.SCHEMA)) { + if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA.key())) { switch (fileFormat) { case CSV: case TEXT: case JSON: - Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA); + Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA.key()); SeaTunnelRowType userDefinedSchema = SeaTunnelSchema .buildWithConfig(schemaConfig) .getSeaTunnelRowType(); @@ -80,16 +85,19 @@ public void prepare(Config pluginConfig) throws PrepareFailException { break; case ORC: case PARQUET: - throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files"); + throw new OssJindoConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, + "SeaTunnel does not support user-defined schema for [parquet, orc] files"); default: // never got in there - throw new UnsupportedOperationException("SeaTunnel does not supported this file format"); + throw new OssJindoConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION, + "SeaTunnel does not supported this file format"); } } else { try { rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); } catch (FilePluginException e) { - throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e); + throw new OssJindoConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, + "Get data schema information from file failed", e); } } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java new file mode 100644 index 00000000000..42f8403d276 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.oss.source; + +import org.apache.seatunnel.api.configuration.util.Condition; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; + +import com.google.auto.service.AutoService; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; + +@AutoService(Factory.class) +public class OssFileSourceFactory implements TableSourceFactory { + @Override + public String factoryIdentifier() { + return FileSystemType.OSS.getFileSystemPluginName(); + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(OssConfig.FILE_PATH) + .required(OssConfig.FILE_TYPE) + .required(OssConfig.BUCKET) + .required(OssConfig.ACCESS_KEY) + .required(OssConfig.ACCESS_SECRET) + .required(OssConfig.ENDPOINT) + .optional(OssConfig.DELIMITER) + .optional(OssConfig.PARSE_PARTITION_FROM_PATH) + .optional(OssConfig.DATE_FORMAT) + .optional(OssConfig.DATETIME_FORMAT) + .optional(OssConfig.TIME_FORMAT) + .conditional(Condition.of(OssConfig.FILE_TYPE, "text"), SeaTunnelSchema.SCHEMA) + .conditional(Condition.of(OssConfig.FILE_TYPE, "json"), SeaTunnelSchema.SCHEMA) + .build(); + } +} From ead3f6e0ae07feaf49648b6e0e6aa1a1b64d23a8 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Thu, 17 Nov 2022 17:41:33 +0800 Subject: [PATCH 05/14] Revert "[Feature][Connector-V2][Oss-Jindo] Update exception" This reverts commit cf0a9fe61f30d3f785b93ec81d556a754fa3d65e. --- .../exception/OssJindoConnectorException.java | 35 ------------------- 1 file changed, 35 deletions(-) delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java deleted file mode 100644 index cb718c5f2d6..00000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/exception/OssJindoConnectorException.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.oss.exception; - -import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; -import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; - -public class OssJindoConnectorException extends SeaTunnelRuntimeException { - public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { - super(seaTunnelErrorCode, errorMessage); - } - - public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { - super(seaTunnelErrorCode, errorMessage, cause); - } - - public OssJindoConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { - super(seaTunnelErrorCode, cause); - } -} From 7f8dde924f08b99e43066aea892c909e494103d2 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Thu, 17 Nov 2022 17:46:47 +0800 Subject: [PATCH 06/14] [Feature][Connector-V2][Oss-Jindo] Fix code style --- .../seatunnel/file/oss/source/OssFileSourceFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java index 42f8403d276..cd875c0e841 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -23,9 +23,9 @@ import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; import com.google.auto.service.AutoService; -import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; @AutoService(Factory.class) public class OssFileSourceFactory implements TableSourceFactory { From 63c98842a41fb1101e37a3748a1b4757bae9e91b Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Thu, 17 Nov 2022 18:03:27 +0800 Subject: [PATCH 07/14] [Feature][Connector-V2][Oss-Jindo] Fix compile error --- .../connectors/seatunnel/file/oss/config/OssConf.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java index b8ce6cc251b..da6392044f8 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/config/OssConf.java @@ -42,13 +42,13 @@ public OssConf(String hdfsNameKey) { } public static HadoopConf buildWithConfig(Config config) { - HadoopConf hadoopConf = new OssConf(config.getString(OssConfig.BUCKET)); + HadoopConf hadoopConf = new OssConf(config.getString(OssConfig.BUCKET.key())); HashMap ossOptions = new HashMap<>(); ossOptions.put("fs.AbstractFileSystem.oss.impl", "com.aliyun.emr.fs.oss.OSS"); ossOptions.put("fs.oss.impl", "com.aliyun.emr.fs.oss.JindoOssFileSystem"); - ossOptions.put("fs.oss.accessKeyId", config.getString(OssConfig.ACCESS_KEY)); - ossOptions.put("fs.oss.accessKeySecret", config.getString(OssConfig.ACCESS_SECRET)); - ossOptions.put("fs.oss.endpoint", config.getString(OssConfig.ENDPOINT)); + ossOptions.put("fs.oss.accessKeyId", config.getString(OssConfig.ACCESS_KEY.key())); + ossOptions.put("fs.oss.accessKeySecret", config.getString(OssConfig.ACCESS_SECRET.key())); + ossOptions.put("fs.oss.endpoint", config.getString(OssConfig.ENDPOINT.key())); hadoopConf.setExtraOptions(ossOptions); return hadoopConf; } From 257e8f888fb715ca9a6753b12d9b291576bcf1ef Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Thu, 17 Nov 2022 18:23:10 +0800 Subject: [PATCH 08/14] [Feature][Connector-V2][Oss-Jindo] Update seatunnel-dist --- seatunnel-dist/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index bbbd55f299e..a1500941661 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -225,6 +225,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-file-oss-jindo + ${project.version} + provided + org.apache.seatunnel connector-file-ftp From 203bbb548328e44914456bae4423dd597706fefe Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Thu, 17 Nov 2022 18:27:11 +0800 Subject: [PATCH 09/14] [Feature][Connector-V2][Oss-Jindo] Update plugin-mapping.properties --- plugin-mapping.properties | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugin-mapping.properties b/plugin-mapping.properties index e73f8c283d2..00bc5b39cca 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -115,6 +115,8 @@ seatunnel.source.LocalFile = connector-file-local seatunnel.sink.LocalFile = connector-file-local seatunnel.source.OssFile = connector-file-oss seatunnel.sink.OssFile = connector-file-oss +seatunnel.source.OssJindoFile = connector-file-oss-jindo +seatunnel.sink.OssJindoFile = connector-file-oss-jindo seatunnel.source.Pulsar = connector-pulsar seatunnel.source.Hudi = connector-hudi seatunnel.sink.DingTalk = connector-dingtalk From 2b65f18e971999edc95aae18b8d0a07f889e141f Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Thu, 17 Nov 2022 18:35:06 +0800 Subject: [PATCH 10/14] [Feature][Connector-V2][Oss-Jindo] Fix license header --- .../connector-file-oss-jindo/pom.xml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml index 8fe4f4cb66c..f2f04a64dea 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml @@ -1,4 +1,22 @@ + From 99c86def031cb272452d2e4376ecbcaea84ae78d Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Thu, 17 Nov 2022 22:41:35 +0800 Subject: [PATCH 11/14] [Feature][Connector-V2][Oss-Jindo] Fix avro --- .../connector-file/connector-file-oss-jindo/pom.xml | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml index f2f04a64dea..fba755c5449 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/pom.xml @@ -52,13 +52,13 @@ com.aliyun.jindodata jindo-core - 4.6.1 + ${jindo-sdk.version} com.aliyun.jindodata jindosdk - 4.6.1 + ${jindo-sdk.version} @@ -66,6 +66,12 @@ hadoop-common ${hadoop-common.version} provided + + + avro + org.apache.avro + + From 37f3126aa7047b6c4a480827cda8618321b67485 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Sat, 19 Nov 2022 16:46:59 +0800 Subject: [PATCH 12/14] [Feature][Connector-V2][Oss-Jindo] Add docs --- docs/en/connector-v2/sink/OssJindoFile.md | 215 +++++++++++++++++ docs/en/connector-v2/source/OssJindoFile.md | 247 ++++++++++++++++++++ 2 files changed, 462 insertions(+) create mode 100644 docs/en/connector-v2/sink/OssJindoFile.md create mode 100644 docs/en/connector-v2/source/OssJindoFile.md diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md new file mode 100644 index 00000000000..3c6d9091cb1 --- /dev/null +++ b/docs/en/connector-v2/sink/OssJindoFile.md @@ -0,0 +1,215 @@ +# OssFile + +> OssJindo file sink connector + +## Description + +Output data to oss file system using jindo api. + +> Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OSS and this connector need some hadoop dependencies. + +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) + +By default, we use 2PC commit to ensure `exactly-once` + +- [ ] [schema projection](../../concept/connector-v2-features.md) +- [x] file format + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json + +## Options + +| name | type | required | default value | +|----------------------------------|---------|----------|-----------------------------------------------------------| +| path | string | yes | - | +| bucket | string | yes | - | +| access_key | string | yes | - | +| access_secret | string | yes | - | +| endpoint | string | yes | - | +| file_name_expression | string | no | "${transactionId}" | +| file_format | string | no | "text" | +| filename_time_format | string | no | "yyyy.MM.dd" | +| field_delimiter | string | no | '\001' | +| row_delimiter | string | no | "\n" | +| partition_by | array | no | - | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | +| is_partition_field_write_in_file | boolean | no | false | +| sink_columns | array | no | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | +| common-options | | no | - | + +### path [string] + +The target dir path is required. + +### bucket [string] + +The bucket address of oss file system, for example: `oss://tyrantlucifer-image-bed` + +### access_key [string] + +The access key of oss file system. + +### access_secret [string] + +The access secret of oss file system. + +### endpoint [string] + +The endpoint of oss file system. + +### file_name_expression [string] + +`file_name_expression` describes the file expression which will be created into the `path`. We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`, +`${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`. + +Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. + +### file_format [string] + +We supported as the following file types: + +`text` `csv` `parquet` `orc` `json` + +Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`. + +### filename_time_format [string] + +When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows: + +| Symbol | Description | +| ------ | ------------------ | +| y | Year | +| M | Month | +| d | Day of month | +| H | Hour in day (0-23) | +| m | Minute in hour | +| s | Second in minute | + +See [Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html) for detailed time format syntax. + +### field_delimiter [string] + +The separator between columns in a row of data. Only needed by `text` and `csv` file format. + +### row_delimiter [string] + +The separator between rows in a file. Only needed by `text` and `csv` file format. + +### partition_by [array] + +Partition data based on selected fields + +### partition_dir_expression [string] + +If the `partition_by` is specified, we will generate the corresponding partition directory based on the partition information, and the final file will be placed in the partition directory. + +Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field and `v0` is the value of the first partition field. + +### is_partition_field_write_in_file [boolean] + +If `is_partition_field_write_in_file` is `true`, the partition field and the value of it will be written into data file. + +For example, if you want to write a Hive Data File, Its value should be `false`. + +### sink_columns [array] + +Which columns need be written to file, default value is all the columns get from `Transform` or `Source`. +The order of the fields determines the order in which the file is actually written. + +### is_enable_transaction [boolean] + +If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory. + +Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file. + +Only support `true` now. + +### common options + +Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details. + +## Example + +For text file format + +```hocon + + OssFile { + path="/seatunnel/sink" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxx" + access_secret = "xxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + field_delimiter = "\t" + row_delimiter = "\n" + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format = "text" + sink_columns = ["name","age"] + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + } + +``` + +For parquet file format + +```hocon + + OssFile { + path = "/seatunnel/sink" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + field_delimiter = "\t" + row_delimiter = "\n" + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format = "parquet" + sink_columns = ["name","age"] + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + } + +``` + +For orc file format + +```bash + + OssFile { + path="/seatunnel/sink" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxx" + access_secret = "xxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + field_delimiter = "\t" + row_delimiter = "\n" + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format = "orc" + sink_columns = ["name","age"] + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + } + +``` + +## Changelog + +### Next version + +- Add OSS Jindo File Sink Connector \ No newline at end of file diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md new file mode 100644 index 00000000000..26409c72b2d --- /dev/null +++ b/docs/en/connector-v2/source/OssJindoFile.md @@ -0,0 +1,247 @@ +# OssFile + +> OssJindo file source connector + +## Description + +Read data from aliyun oss file system using jindo api. + +> Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to OSS and this connector need some hadoop dependencies. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) + +Read all the data in a split in a pollNext call. What splits are read will be saved in snapshot. + +- [x] [schema projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) +- [x] file format + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json + +## Options + +| name | type | required | default value | +|---------------------------|---------|----------|---------------------| +| path | string | yes | - | +| type | string | yes | - | +| bucket | string | yes | - | +| access_key | string | yes | - | +| access_secret | string | yes | - | +| endpoint | string | yes | - | +| delimiter | string | no | \001 | +| parse_partition_from_path | boolean | no | true | +| date_format | string | no | yyyy-MM-dd | +| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | +| time_format | string | no | HH:mm:ss | +| schema | config | no | - | +| common-options | | no | - | + +### path [string] + +The source file path. + +### delimiter [string] + +Field delimiter, used to tell connector how to slice and dice fields when reading text files + +default `\001`, the same as hive's default delimiter + +### parse_partition_from_path [boolean] + +Control whether parse the partition keys and values from file path + +For example if you read a file from path `oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` + +Every record data from file will be added these two fields: + +| name | age | +|----------------|-----| +| tyrantlucifer | 26 | + +Tips: **Do not define partition fields in schema option** + +### date_format [string] + +Date type format, used to tell connector how to convert string to date, supported as the following formats: + +`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` + +default `yyyy-MM-dd` + +### datetime_format [string] + +Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats: + +`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` + +default `yyyy-MM-dd HH:mm:ss` + +### time_format [string] + +Time type format, used to tell connector how to convert string to time, supported as the following formats: + +`HH:mm:ss` `HH:mm:ss.SSS` + +default `HH:mm:ss` + +### type [string] + +File type, supported as the following file types: + +`text` `csv` `parquet` `orc` `json` + +If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want. + +For example: + +upstream data is the following: + +```json + +{"code": 200, "data": "get success", "success": true} + +``` + +You can also save multiple pieces of data in one file and split them by newline: + +```json lines + +{"code": 200, "data": "get success", "success": true} +{"code": 300, "data": "get failed", "success": false} + +``` + +you should assign schema as the following: + +```hocon + +schema { + fields { + code = int + data = string + success = boolean + } +} + +``` + +connector will generate data as the following: + +| code | data | success | +|------|-------------|---------| +| 200 | get success | true | + +If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically. + +If you assign file type to `text` `csv`, you can choose to specify the schema information or not. + +For example, upstream data is the following: + +```text + +tyrantlucifer#26#male + +``` + +If you do not assign data schema connector will treat the upstream data as the following: + +| content | +|------------------------| +| tyrantlucifer#26#male | + +If you assign data schema, you should also assign the option `delimiter` too except CSV file type + +you should assign schema and delimiter as the following: + +```hocon + +delimiter = "#" +schema { + fields { + name = string + age = int + gender = string + } +} + +``` + +connector will generate data as the following: + +| name | age | gender | +|---------------|-----|--------| +| tyrantlucifer | 26 | male | + +### bucket [string] + +The bucket address of oss file system, for example: `oss://tyrantlucifer-image-bed` + +### access_key [string] + +The access key of oss file system. + +### access_secret [string] + +The access secret of oss file system. + +### endpoint [string] + +The endpoint of oss file system. + +### schema [config] + +#### fields [Config] + +The schema of upstream data. + +### common options + +Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. + +## Example + +```hocon + + OssFile { + path = "/seatunnel/orc" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + type = "orc" + } + +``` + +```hocon + + OssFile { + path = "/seatunnel/json" + bucket = "oss://tyrantlucifer-image-bed" + access_key = "xxxxxxxxxxxxxxxxx" + access_secret = "xxxxxxxxxxxxxxxxxxxxxx" + endpoint = "oss-cn-beijing.aliyuncs.com" + type = "json" + schema { + fields { + id = int + name = string + } + } + } + +``` + +## Changelog + +### next version + +- Add OSS Jindo File Source Connector From 8d6b5ba045e689b0747fbccac40fc8f5bbdc37d4 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Sun, 27 Nov 2022 21:52:18 +0800 Subject: [PATCH 13/14] [Feature][Connector-V2][Oss-Jindo] Unified exception --- .../seatunnel/file/oss/source/OssFileSource.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java index 07eedabd799..440f0bc96b8 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java @@ -28,7 +28,8 @@ import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; -import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf; import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig; import org.apache.seatunnel.connectors.seatunnel.file.oss.exception.OssJindoConnectorException; @@ -66,7 +67,8 @@ public void prepare(Config pluginConfig) throws PrepareFailException { try { filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); } catch (IOException e) { - throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail."); + String errorMsg = String.format("Get file list from this path [%s] failed", path); + throw new FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e); } // support user-defined schema FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase()); @@ -95,9 +97,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException { } else { try { rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0)); - } catch (FilePluginException e) { - throw new OssJindoConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, - "Get data schema information from file failed", e); + } catch (FileConnectorException e) { + String errorMsg = String.format("Get table schema from file [%s] failed", filePaths.get(0)); + throw new FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e); } } } From a120dfa9168f49d3de02e49ad28d42b89756a608 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Wed, 30 Nov 2022 11:16:34 +0800 Subject: [PATCH 14/14] [Feature][Connector-V2][Oss-Jindo] Add factory test --- .../connectors/test/OssJindoFactoryTest.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/test/java/org/apache/seatunnel/connectors/test/OssJindoFactoryTest.java diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/test/java/org/apache/seatunnel/connectors/test/OssJindoFactoryTest.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/test/java/org/apache/seatunnel/connectors/test/OssJindoFactoryTest.java new file mode 100644 index 00000000000..85955f086f9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/test/java/org/apache/seatunnel/connectors/test/OssJindoFactoryTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.test; + +import org.apache.seatunnel.connectors.seatunnel.file.oss.sink.OssFileSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.file.oss.source.OssFileSourceFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class OssJindoFactoryTest { + @Test + public void testOptionRule() { + Assertions.assertNotNull((new OssFileSourceFactory()).optionRule()); + Assertions.assertNotNull((new OssFileSinkFactory()).optionRule()); + } +}