From 856d0d7c933db3d7ac0d70c6f2d070ec11b793fc Mon Sep 17 00:00:00 2001 From: TyrantLucifer Date: Wed, 7 Sep 2022 11:07:22 +0800 Subject: [PATCH] [Improve][Connector-V2] Refactor local file sink connector code structure (#2655) * [Improve][Connector-V2] Refactor local file code struct --- .../LocalConf.java} | 27 +- .../file/local/sink/LocalFileSink.java | 22 +- .../file/local/sink/LocalFileSinkPlugin.java | 69 ----- .../filesystem/LocalFileSystemCommitter.java | 57 ---- .../file/local/sink/util/FileUtils.java | 104 -------- .../LocalJsonTransactionStateFileWriter.java | 123 --------- .../LocalOrcTransactionStateFileWriter.java | 245 ------------------ ...ocalParquetTransactionStateFileWriter.java | 171 ------------ ...LocalTransactionStateFileWriteFactory.java | 115 -------- .../LocalTxtTransactionStateFileWriter.java | 127 --------- .../file/local/source/LocalFileSource.java | 4 +- .../FileSinkAggregatedCommitterTest.java | 146 ----------- ...estLocalTxtTransactionStateFileWriter.java | 106 -------- 13 files changed, 26 insertions(+), 1290 deletions(-) rename seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/{sink/filesystem/LocalFileSystem.java => config/LocalConf.java} (53%) delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkPlugin.java delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/filesystem/LocalFileSystemCommitter.java delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/util/FileUtils.java delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalJsonTransactionStateFileWriter.java delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalOrcTransactionStateFileWriter.java delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalParquetTransactionStateFileWriter.java delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalTransactionStateFileWriteFactory.java delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalTxtTransactionStateFileWriter.java delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/FileSinkAggregatedCommitterTest.java delete mode 100644 seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/TestLocalTxtTransactionStateFileWriter.java diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/filesystem/LocalFileSystem.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java similarity index 53% rename from seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/filesystem/LocalFileSystem.java rename to seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java index 83939f30171a..918b8e4868ed 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/filesystem/LocalFileSystem.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/config/LocalConf.java @@ -15,29 +15,14 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.file.local.sink.filesystem; +package org.apache.seatunnel.connectors.seatunnel.file.local.config; -import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; +public class LocalConf extends HadoopConf { + private final String fsHdfsImpl = "org.apache.hadoop.fs.LocalFileSystem"; -public class LocalFileSystem implements FileSystem { - @Override - public void deleteFile(String path) throws IOException { - File file = new File(path); - file.delete(); - } - - @Override - public List dirList(String dirPath) throws IOException { - File file = new File(dirPath); - String[] list = file.list(); - if (list == null) { - return null; - } - return Arrays.asList(list); + public LocalConf(String hdfsNameKey) { + super(hdfsNameKey); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java index 5051c7465d4b..64966a9078ff 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSink.java @@ -17,16 +17,28 @@ package org.apache.seatunnel.connectors.seatunnel.file.local.sink; +import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.sink.SeaTunnelSink; -import org.apache.seatunnel.connectors.seatunnel.file.sink.AbstractFileSink; -import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; +import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf; +import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; import com.google.auto.service.AutoService; +import org.apache.hadoop.fs.CommonConfigurationKeys; @AutoService(SeaTunnelSink.class) -public class LocalFileSink extends AbstractFileSink { +public class LocalFileSink extends BaseFileSink { + + @Override + public String getPluginName() { + return FileSystemType.LOCAL.getFileSystemPluginName(); + } + @Override - public SinkFileSystemPlugin getSinkFileSystemPlugin() { - return new LocalFileSinkPlugin(); + public void prepare(Config pluginConfig) throws PrepareFailException { + super.prepare(pluginConfig); + hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkPlugin.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkPlugin.java deleted file mode 100644 index 58990bdd2265..000000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkPlugin.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.local.sink; - -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; -import org.apache.seatunnel.connectors.seatunnel.file.local.sink.filesystem.LocalFileSystem; -import org.apache.seatunnel.connectors.seatunnel.file.local.sink.filesystem.LocalFileSystemCommitter; -import org.apache.seatunnel.connectors.seatunnel.file.local.sink.writer.LocalTransactionStateFileWriteFactory; -import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem; -import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystemCommitter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin; -import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator; -import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator; - -import lombok.NonNull; - -import java.util.List; -import java.util.Optional; - -public class LocalFileSinkPlugin implements SinkFileSystemPlugin { - @Override - public String getPluginName() { - return FileSystemType.LOCAL.getFileSystemPluginName(); - } - - @Override - public Optional getTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo, - @NonNull TransactionFileNameGenerator transactionFileNameGenerator, - @NonNull PartitionDirNameGenerator partitionDirNameGenerator, - @NonNull List sinkColumnsIndexInRow, - @NonNull String tmpPath, - @NonNull String targetPath, - @NonNull String jobId, - int subTaskIndex, - @NonNull String fieldDelimiter, - @NonNull String rowDelimiter, - @NonNull FileSystem fileSystem) { - // using factory to generate transaction state file writer - TransactionStateFileWriter writer = LocalTransactionStateFileWriteFactory.of(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fieldDelimiter, rowDelimiter, fileSystem); - return Optional.of(writer); - } - - @Override - public Optional getFileSystemCommitter() { - return Optional.of(new LocalFileSystemCommitter()); - } - - @Override - public Optional getFileSystem() { - return Optional.of(new LocalFileSystem()); - } -} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/filesystem/LocalFileSystemCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/filesystem/LocalFileSystemCommitter.java deleted file mode 100644 index 54ca54c03fc4..000000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/filesystem/LocalFileSystemCommitter.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.local.sink.filesystem; - -import org.apache.seatunnel.connectors.seatunnel.file.local.sink.util.FileUtils; -import org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo; -import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystemCommitter; - -import lombok.NonNull; - -import java.io.File; -import java.io.IOException; -import java.util.Map; - -public class LocalFileSystemCommitter implements FileSystemCommitter { - @Override - public void commitTransaction(@NonNull FileAggregatedCommitInfo aggregateCommitInfo) throws IOException { - for (Map.Entry> entry : aggregateCommitInfo.getTransactionMap().entrySet()) { - for (Map.Entry mvFileEntry : entry.getValue().entrySet()) { - FileUtils.renameFile(mvFileEntry.getKey(), mvFileEntry.getValue()); - } - // delete the transaction dir - FileUtils.deleteFile(entry.getKey()); - } - } - - @Override - public void abortTransaction(@NonNull FileAggregatedCommitInfo aggregateCommitInfo) throws IOException { - for (Map.Entry> entry : aggregateCommitInfo.getTransactionMap().entrySet()) { - // rollback the file - for (Map.Entry mvFileEntry : entry.getValue().entrySet()) { - File oldFile = new File(mvFileEntry.getKey()); - File newFile = new File(mvFileEntry.getValue()); - if (newFile.exists() && !oldFile.exists()) { - FileUtils.renameFile(mvFileEntry.getValue(), mvFileEntry.getKey()); - } - } - // delete the transaction dir - FileUtils.deleteFile(entry.getKey()); - } - } -} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/util/FileUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/util/FileUtils.java deleted file mode 100644 index 5a37b715929b..000000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/util/FileUtils.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.local.sink.util; - -import lombok.NonNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; - -public class FileUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(FileUtils.class); - public static File createDir(@NonNull String dirPath) { - if (dirPath == null || "".equals(dirPath)) { - return null; - } - File file = new File(dirPath); - if (!file.exists() || !file.isDirectory()) { - file.mkdirs(); - } - return file; - } - - public static File createFile(@NonNull String filePath) throws IOException { - if (filePath == null || "".equals(filePath)) { - return null; - } - File file = new File(filePath); - if (!file.getParentFile().exists()) { - file.getParentFile().mkdirs(); - } - - if (!file.exists() || !file.isFile()) { - file.createNewFile(); - } - return file; - } - - public static boolean fileExist(@NonNull String filePath) { - File file = new File(filePath); - return file.exists(); - } - - public static void renameFile(@NonNull String oldName, @NonNull String newName) throws IOException { - LOGGER.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]"); - File oldPath = new File(oldName); - File newPath = new File(newName); - - if (!newPath.getParentFile().exists()) { - newPath.getParentFile().mkdirs(); - } - - if (oldPath.renameTo(newPath)) { - LOGGER.info("rename file :[" + oldPath + "] to [" + newPath + "] finish"); - } else { - throw new IOException("rename file :[" + oldPath + "] to [" + newPath + "] error"); - } - } - - public static void deleteFile(@NonNull String filePath) throws IOException { - File file = new File(filePath); - if (file.exists()) { - if (file.isDirectory()) { - deleteFiles(file); - } - file.delete(); - } - } - - private static boolean deleteFiles(@NonNull File file) { - try { - File[] files = file.listFiles(); - for (int i = 0; i < files.length; i++) { - File thisFile = files[i]; - if (thisFile.isDirectory()) { - deleteFiles(thisFile); - } - thisFile.delete(); - } - file.delete(); - - } catch (Exception e) { - LOGGER.error("delete file [" + file.getPath() + "] error"); - return false; - } - return true; - } -} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalJsonTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalJsonTransactionStateFileWriter.java deleted file mode 100644 index c492e575023d..000000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalJsonTransactionStateFileWriter.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.local.sink.writer; - -import org.apache.seatunnel.api.serialization.SerializationSchema; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.file.local.sink.util.FileUtils; -import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem; -import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator; -import org.apache.seatunnel.format.json.JsonSerializationSchema; - -import lombok.NonNull; -import lombok.extern.slf4j.Slf4j; - -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@Slf4j -public class LocalJsonTransactionStateFileWriter extends AbstractTransactionStateFileWriter { - - private static final long serialVersionUID = -3834472539886339383L; - - private final byte[] rowDelimiter; - private final SerializationSchema serializationSchema; - private Map beingWrittenOutputStream; - - public LocalJsonTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo, - @NonNull TransactionFileNameGenerator transactionFileNameGenerator, - @NonNull PartitionDirNameGenerator partitionDirNameGenerator, - @NonNull List sinkColumnsIndexInRow, - @NonNull String tmpPath, - @NonNull String targetPath, - @NonNull String jobId, - int subTaskIndex, - @NonNull String rowDelimiter, - @NonNull FileSystem fileSystem) { - super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem); - - this.rowDelimiter = rowDelimiter.getBytes(); - this.serializationSchema = new JsonSerializationSchema(seaTunnelRowTypeInfo); - this.beingWrittenOutputStream = new HashMap<>(); - } - - @Override - public void beginTransaction(String transactionId) { - this.beingWrittenOutputStream = new HashMap<>(); - } - - @Override - public void abortTransaction(String transactionId) { - this.beingWrittenOutputStream = new HashMap<>(); - } - - @Override - public void write(@NonNull SeaTunnelRow seaTunnelRow) { - String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow); - FileOutputStream fileOutputStream = getOrCreateOutputStream(filePath); - try { - byte[] rowBytes = serializationSchema.serialize(seaTunnelRow); - fileOutputStream.write(rowBytes); - fileOutputStream.write(rowDelimiter); - } catch (IOException e) { - log.error("write data to file {} error", filePath); - throw new RuntimeException(e); - } - } - - @Override - public void finishAndCloseWriteFile() { - beingWrittenOutputStream.entrySet().forEach(entry -> { - try { - entry.getValue().flush(); - } catch (IOException e) { - log.error("error when flush file {}", entry.getKey()); - throw new RuntimeException(e); - } finally { - try { - entry.getValue().close(); - } catch (IOException e) { - log.error("error when close output stream {}", entry.getKey()); - } - } - - needMoveFiles.put(entry.getKey(), getTargetLocation(entry.getKey())); - }); - } - - private FileOutputStream getOrCreateOutputStream(@NonNull String filePath) { - FileOutputStream fileOutputStream = beingWrittenOutputStream.get(filePath); - if (fileOutputStream == null) { - try { - FileUtils.createFile(filePath); - fileOutputStream = new FileOutputStream(filePath); - beingWrittenOutputStream.put(filePath, fileOutputStream); - } catch (IOException e) { - log.error("can not get output file stream"); - throw new RuntimeException(e); - } - } - return fileOutputStream; - } -} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalOrcTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalOrcTransactionStateFileWriter.java deleted file mode 100644 index bd317db3a543..000000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalOrcTransactionStateFileWriter.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.local.sink.writer; - -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem; -import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator; - -import lombok.NonNull; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.CompressionKind; -import org.apache.orc.OrcFile; -import org.apache.orc.TypeDescription; -import org.apache.orc.Writer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.math.BigInteger; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class LocalOrcTransactionStateFileWriter extends AbstractTransactionStateFileWriter { - private static final Logger LOGGER = LoggerFactory.getLogger(LocalOrcTransactionStateFileWriter.class); - private final Configuration configuration = new Configuration(); - private Map beingWrittenWriter; - - public LocalOrcTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo, @NonNull TransactionFileNameGenerator transactionFileNameGenerator, - @NonNull PartitionDirNameGenerator partitionDirNameGenerator, - @NonNull List sinkColumnsIndexInRow, - @NonNull String tmpPath, - @NonNull String targetPath, - @NonNull String jobId, - int subTaskIndex, - @NonNull FileSystem fileSystem) { - super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem); - this.beingWrittenWriter = new HashMap<>(); - } - - @Override - public void write(@NonNull SeaTunnelRow seaTunnelRow) { - String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow); - Writer writer = getOrCreateWriter(filePath); - TypeDescription schema = buildSchemaWithRowType(); - VectorizedRowBatch rowBatch = schema.createRowBatch(); - int i = 0; - int row = rowBatch.size++; - for (Integer index : sinkColumnsIndexInRow) { - Object value = seaTunnelRow.getField(index); - ColumnVector vector = rowBatch.cols[i]; - setColumn(value, vector, row); - i++; - } - try { - writer.addRowBatch(rowBatch); - rowBatch.reset(); - } catch (IOException e) { - String errorMsg = String.format("Write data to orc file [%s] error", filePath); - throw new RuntimeException(errorMsg, e); - } - } - - @Override - public void finishAndCloseWriteFile() { - this.beingWrittenWriter.forEach((k, v) -> { - try { - v.close(); - } catch (IOException e) { - String errorMsg = String.format("Close file [%s] orc writer failed, error msg: [%s]", k, e.getMessage()); - throw new RuntimeException(errorMsg, e); - } catch (NullPointerException e) { - // Because orc writer not support be closed multi times, so if the second time close orc writer it will throw NullPointerException - // In a whole process of file sink, it will experience four stages: - // 1. beginTransaction 2. prepareCommit 3. commit 4. close - // In the first stage, it will not close any writers, start with the second stage, writer will be closed. - // In the last stage, it will not close any writers - // So orc writer will be closed one extra time after is closed. - LOGGER.info("Close file [{}] orc writer", k); - } - needMoveFiles.put(k, getTargetLocation(k)); - }); - } - - @Override - public void beginTransaction(String transactionId) { - this.beingWrittenWriter = new HashMap<>(); - } - - @Override - public void abortTransaction(String transactionId) { - this.beingWrittenWriter = new HashMap<>(); - } - - private Writer getOrCreateWriter(@NonNull String filePath) { - Writer writer = this.beingWrittenWriter.get(filePath); - if (writer == null) { - TypeDescription schema = buildSchemaWithRowType(); - Path path = new Path(filePath); - try { - OrcFile.WriterOptions options = OrcFile.writerOptions(configuration) - .setSchema(schema) - // temporarily used snappy - .compress(CompressionKind.SNAPPY) - // use orc version 0.12 - .version(OrcFile.Version.V_0_12) - .overwrite(true); - Writer newWriter = OrcFile.createWriter(path, options); - this.beingWrittenWriter.put(filePath, newWriter); - return newWriter; - } catch (IOException e) { - String errorMsg = String.format("Get orc writer for file [%s] error", filePath); - throw new RuntimeException(errorMsg, e); - } - } - return writer; - } - - private TypeDescription buildFieldWithRowType(SeaTunnelDataType type) { - if (BasicType.BOOLEAN_TYPE.equals(type)) { - return TypeDescription.createBoolean(); - } - if (BasicType.SHORT_TYPE.equals(type)) { - return TypeDescription.createShort(); - } - if (BasicType.INT_TYPE.equals(type)) { - return TypeDescription.createInt(); - } - if (BasicType.LONG_TYPE.equals(type)) { - return TypeDescription.createLong(); - } - if (BasicType.FLOAT_TYPE.equals(type)) { - return TypeDescription.createFloat(); - } - if (BasicType.DOUBLE_TYPE.equals(type)) { - return TypeDescription.createDouble(); - } - if (BasicType.BYTE_TYPE.equals(type)) { - return TypeDescription.createByte(); - } - return TypeDescription.createString(); - } - - private TypeDescription buildSchemaWithRowType() { - TypeDescription schema = TypeDescription.createStruct(); - for (Integer i : sinkColumnsIndexInRow) { - TypeDescription fieldType = buildFieldWithRowType(seaTunnelRowTypeInfo.getFieldType(i)); - schema.addField(seaTunnelRowTypeInfo.getFieldName(i), fieldType); - } - return schema; - } - - private void setColumn(Object value, ColumnVector vector, int row) { - if (value == null) { - vector.isNull[row] = true; - vector.noNulls = false; - } else { - switch (vector.type) { - case LONG: - LongColumnVector longVector = (LongColumnVector) vector; - setLongColumnVector(value, longVector, row); - break; - case DOUBLE: - DoubleColumnVector doubleColumnVector = (DoubleColumnVector) vector; - setDoubleVector(value, doubleColumnVector, row); - break; - case BYTES: - BytesColumnVector bytesColumnVector = (BytesColumnVector) vector; - setByteColumnVector(value, bytesColumnVector, row); - break; - default: - throw new RuntimeException("Unexpected ColumnVector subtype"); - } - } - } - - private void setLongColumnVector(Object value, LongColumnVector longVector, int row) { - if (value instanceof Boolean) { - Boolean bool = (Boolean) value; - longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? Long.valueOf(1) : Long.valueOf(0); - } else if (value instanceof Integer) { - longVector.vector[row] = (Integer) value; - } else if (value instanceof Long) { - longVector.vector[row] = (Long) value; - } else if (value instanceof BigInteger) { - BigInteger bigInt = (BigInteger) value; - longVector.vector[row] = bigInt.longValue(); - } else { - throw new RuntimeException("Long or Integer type expected for field"); - } - } - - private void setByteColumnVector(Object value, BytesColumnVector bytesColVector, int rowNum) { - if (value instanceof byte[] || value instanceof String) { - byte[] byteVec; - if (value instanceof String) { - String strVal = (String) value; - byteVec = strVal.getBytes(StandardCharsets.UTF_8); - } else { - byteVec = (byte[]) value; - } - bytesColVector.setRef(rowNum, byteVec, 0, byteVec.length); - } else { - throw new RuntimeException("byte[] or String type expected for field "); - } - } - - private void setDoubleVector(Object value, DoubleColumnVector doubleVector, int rowNum) { - if (value instanceof Double) { - doubleVector.vector[rowNum] = (Double) value; - } else if (value instanceof Float) { - Float floatValue = (Float) value; - doubleVector.vector[rowNum] = floatValue.doubleValue(); - } else { - throw new RuntimeException("Double or Float type expected for field "); - } - } -} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalParquetTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalParquetTransactionStateFileWriter.java deleted file mode 100644 index beed657cd17d..000000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalParquetTransactionStateFileWriter.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.local.sink.writer; - -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem; -import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator; - -import lombok.NonNull; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericRecordBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.avro.AvroParquetWriter; -import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; -import org.apache.parquet.hadoop.util.HadoopOutputFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class LocalParquetTransactionStateFileWriter extends AbstractTransactionStateFileWriter { - private static final Logger LOGGER = LoggerFactory.getLogger(LocalParquetTransactionStateFileWriter.class); - private final Configuration configuration = new Configuration(); - private Map> beingWrittenWriter; - - public LocalParquetTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo, - @NonNull TransactionFileNameGenerator transactionFileNameGenerator, - @NonNull PartitionDirNameGenerator partitionDirNameGenerator, - @NonNull List sinkColumnsIndexInRow, @NonNull String tmpPath, - @NonNull String targetPath, - @NonNull String jobId, - int subTaskIndex, - @NonNull FileSystem fileSystem) { - super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem); - beingWrittenWriter = new HashMap<>(); - } - - @Override - public void write(@NonNull SeaTunnelRow seaTunnelRow) { - String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow); - ParquetWriter writer = getOrCreateWriter(filePath); - Schema schema = buildSchemaWithRowType(); - GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); - sinkColumnsIndexInRow.forEach(index -> recordBuilder.set(seaTunnelRowTypeInfo.getFieldName(index), seaTunnelRow.getField(index))); - GenericData.Record record = recordBuilder.build(); - try { - writer.write(record); - } catch (IOException e) { - String errorMsg = String.format("Write data to parquet file [%s] error", filePath); - throw new RuntimeException(errorMsg, e); - } - } - - @Override - public void finishAndCloseWriteFile() { - this.beingWrittenWriter.forEach((k, v) -> { - try { - v.close(); - } catch (IOException e) { - String errorMsg = String.format("Close file [%s] parquet writer failed, error msg: [%s]", k, e.getMessage()); - throw new RuntimeException(errorMsg, e); - } - needMoveFiles.put(k, getTargetLocation(k)); - }); - } - - @Override - public void beginTransaction(String transactionId) { - this.beingWrittenWriter = new HashMap<>(); - } - - @Override - public void abortTransaction(String transactionId) { - this.beingWrittenWriter = new HashMap<>(); - } - - private ParquetWriter getOrCreateWriter(@NonNull String filePath) { - ParquetWriter writer = this.beingWrittenWriter.get(filePath); - if (writer == null) { - Schema schema = buildSchemaWithRowType(); - Path path = new Path(filePath); - try { - // In order to write file to local file system we should use empty configuration object - HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, configuration); - ParquetWriter newWriter = AvroParquetWriter.builder(outputFile) - .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) - // use parquet v1 to improve compatibility - .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) - // Temporarily use snappy compress - // I think we can use the compress option in config to control this - .withCompressionCodec(CompressionCodecName.SNAPPY) - .withSchema(schema) - .build(); - this.beingWrittenWriter.put(filePath, newWriter); - return newWriter; - } catch (IOException e) { - String errorMsg = String.format("Get parquet writer for file [%s] error", filePath); - throw new RuntimeException(errorMsg, e); - } - } - return writer; - } - - private Schema buildSchemaWithRowType() { - ArrayList fields = new ArrayList<>(); - SeaTunnelDataType[] fieldTypes = seaTunnelRowTypeInfo.getFieldTypes(); - String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames(); - sinkColumnsIndexInRow.forEach(index -> { - if (BasicType.BOOLEAN_TYPE.equals(fieldTypes[index])) { - Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BOOLEAN), null, null); - fields.add(field); - } else if (BasicType.SHORT_TYPE.equals(fieldTypes[index]) || BasicType.INT_TYPE.equals(fieldTypes[index])) { - Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.INT), null, null); - fields.add(field); - } else if (BasicType.LONG_TYPE.equals(fieldTypes[index])) { - Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.LONG), null, null); - fields.add(field); - } else if (BasicType.FLOAT_TYPE.equals(fieldTypes[index])) { - Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.FLOAT), null, null); - fields.add(field); - } else if (BasicType.DOUBLE_TYPE.equals(fieldTypes[index])) { - Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.DOUBLE), null, null); - fields.add(field); - } else if (BasicType.STRING_TYPE.equals(fieldTypes[index])) { - Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.STRING), null, null); - fields.add(field); - } else if (BasicType.BYTE_TYPE.equals(fieldTypes[index])) { - Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.BYTES), null, null); - fields.add(field); - } else if (BasicType.VOID_TYPE.equals(fieldTypes[index])) { - Schema.Field field = new Schema.Field(fieldNames[index], Schema.create(Schema.Type.NULL), null, null); - fields.add(field); - } - }); - return Schema.createRecord("SeatunnelRecord", - "The record generated by seatunnel file connector", - "org.apache.parquet.avro", - false, - fields); - } -} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalTransactionStateFileWriteFactory.java deleted file mode 100644 index 31b767ee5366..000000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalTransactionStateFileWriteFactory.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.local.sink.writer; - -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; -import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem; -import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator; -import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator; - -import lombok.NonNull; - -import java.util.List; - -public class LocalTransactionStateFileWriteFactory { - - private LocalTransactionStateFileWriteFactory() {} - - public static TransactionStateFileWriter of(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo, - @NonNull TransactionFileNameGenerator transactionFileNameGenerator, - @NonNull PartitionDirNameGenerator partitionDirNameGenerator, - @NonNull List sinkColumnsIndexInRow, - @NonNull String tmpPath, - @NonNull String targetPath, - @NonNull String jobId, - int subTaskIndex, - @NonNull String fieldDelimiter, - @NonNull String rowDelimiter, - @NonNull FileSystem fileSystem) { - FileSinkTransactionFileNameGenerator fileSinkTransactionFileNameGenerator = (FileSinkTransactionFileNameGenerator) transactionFileNameGenerator; - FileFormat fileFormat = fileSinkTransactionFileNameGenerator.getFileFormat(); - if (fileFormat.equals(FileFormat.CSV)) { - // #2133 wait this issue closed, there will be replaced using csv writer - return new LocalTxtTransactionStateFileWriter( - seaTunnelRowTypeInfo, - transactionFileNameGenerator, - partitionDirNameGenerator, - sinkColumnsIndexInRow, - tmpPath, - targetPath, - jobId, - subTaskIndex, - fieldDelimiter, - rowDelimiter, - fileSystem); - } - if (fileFormat.equals(FileFormat.PARQUET)) { - return new LocalParquetTransactionStateFileWriter( - seaTunnelRowTypeInfo, - transactionFileNameGenerator, - partitionDirNameGenerator, - sinkColumnsIndexInRow, - tmpPath, - targetPath, - jobId, - subTaskIndex, - fileSystem); - } - if (fileFormat.equals(FileFormat.ORC)) { - return new LocalOrcTransactionStateFileWriter( - seaTunnelRowTypeInfo, - transactionFileNameGenerator, - partitionDirNameGenerator, - sinkColumnsIndexInRow, - tmpPath, - targetPath, - jobId, - subTaskIndex, - fileSystem); - } - if (fileFormat.equals(FileFormat.JSON)) { - return new LocalJsonTransactionStateFileWriter( - seaTunnelRowTypeInfo, - transactionFileNameGenerator, - partitionDirNameGenerator, - sinkColumnsIndexInRow, - tmpPath, - targetPath, - jobId, - subTaskIndex, - rowDelimiter, - fileSystem); - } - // if file type not supported by file connector, default txt writer will be generated - return new LocalTxtTransactionStateFileWriter( - seaTunnelRowTypeInfo, - transactionFileNameGenerator, - partitionDirNameGenerator, - sinkColumnsIndexInRow, - tmpPath, - targetPath, - jobId, - subTaskIndex, - fieldDelimiter, - rowDelimiter, - fileSystem); - } -} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalTxtTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalTxtTransactionStateFileWriter.java deleted file mode 100644 index 309d9dfa149c..000000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/writer/LocalTxtTransactionStateFileWriter.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.local.sink.writer; - -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.file.local.sink.util.FileUtils; -import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem; -import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator; - -import lombok.NonNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public class LocalTxtTransactionStateFileWriter extends AbstractTransactionStateFileWriter { - private static final Logger LOGGER = LoggerFactory.getLogger(LocalTxtTransactionStateFileWriter.class); - private Map beingWrittenOutputStream; - - private String fieldDelimiter; - private String rowDelimiter; - - public LocalTxtTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo, - @NonNull TransactionFileNameGenerator transactionFileNameGenerator, - @NonNull PartitionDirNameGenerator partitionDirNameGenerator, - @NonNull List sinkColumnsIndexInRow, - @NonNull String tmpPath, - @NonNull String targetPath, - @NonNull String jobId, - int subTaskIndex, - @NonNull String fieldDelimiter, - @NonNull String rowDelimiter, - @NonNull FileSystem fileSystem) { - super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem); - - this.fieldDelimiter = fieldDelimiter; - this.rowDelimiter = rowDelimiter; - beingWrittenOutputStream = new HashMap<>(); - } - - @Override - public void beginTransaction(String transactionId) { - this.beingWrittenOutputStream = new HashMap<>(); - } - - @Override - public void abortTransaction(String transactionId) { - this.beingWrittenOutputStream = new HashMap<>(); - } - - @Override - public void write(@NonNull SeaTunnelRow seaTunnelRow) { - String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow); - FileOutputStream fileOutputStream = getOrCreateOutputStream(filePath); - String line = transformRowToLine(seaTunnelRow); - try { - fileOutputStream.write(line.getBytes()); - fileOutputStream.write(rowDelimiter.getBytes()); - } catch (IOException e) { - LOGGER.error("write data to file {} error", filePath); - throw new RuntimeException(e); - } - } - - @Override - public void finishAndCloseWriteFile() { - beingWrittenOutputStream.entrySet().forEach(entry -> { - try { - entry.getValue().flush(); - } catch (IOException e) { - LOGGER.error("error when flush file {}", entry.getKey()); - throw new RuntimeException(e); - } finally { - try { - entry.getValue().close(); - } catch (IOException e) { - LOGGER.error("error when close output stream {}", entry.getKey()); - } - } - - needMoveFiles.put(entry.getKey(), getTargetLocation(entry.getKey())); - }); - } - - private FileOutputStream getOrCreateOutputStream(@NonNull String filePath) { - FileOutputStream fileOutputStream = beingWrittenOutputStream.get(filePath); - if (fileOutputStream == null) { - try { - FileUtils.createFile(filePath); - fileOutputStream = new FileOutputStream(new File(filePath)); - beingWrittenOutputStream.put(filePath, fileOutputStream); - } catch (IOException e) { - LOGGER.error("can not get output file stream"); - throw new RuntimeException(e); - } - } - return fileOutputStream; - } - - private String transformRowToLine(@NonNull SeaTunnelRow seaTunnelRow) { - return this.sinkColumnsIndexInRow.stream().map(index -> seaTunnelRow.getFields()[index] == null ? "" : seaTunnelRow.getFields()[index].toString()).collect(Collectors.joining(fieldDelimiter)); - } -} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java index 36a9a6b27254..2196227de5eb 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType; import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException; +import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf; import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalSourceConfig; import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory; @@ -33,6 +34,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import com.google.auto.service.AutoService; +import org.apache.hadoop.fs.CommonConfigurationKeys; import java.io.IOException; @@ -52,7 +54,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException { } readStrategy = ReadStrategyFactory.of(pluginConfig.getString(LocalSourceConfig.FILE_TYPE)); String path = pluginConfig.getString(LocalSourceConfig.FILE_PATH); - hadoopConf = null; + hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT); try { filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/FileSinkAggregatedCommitterTest.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/FileSinkAggregatedCommitterTest.java deleted file mode 100644 index dfb315156ae9..000000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/FileSinkAggregatedCommitterTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.local; - -import org.apache.seatunnel.connectors.seatunnel.file.local.sink.filesystem.LocalFileSystemCommitter; -import org.apache.seatunnel.connectors.seatunnel.file.local.sink.util.FileUtils; -import org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo; -import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo; -import org.apache.seatunnel.connectors.seatunnel.file.sink.FileSinkAggregatedCommitter; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.stream.Collectors; - -public class FileSinkAggregatedCommitterTest { - @SuppressWarnings("checkstyle:UnnecessaryParentheses") - public void testCommit() throws Exception { - FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(new LocalFileSystemCommitter()); - Map> transactionFiles = new HashMap<>(); - Random random = new Random(); - Long jobIdLong = random.nextLong(); - String jobId = "Job_" + jobIdLong; - String transactionDir = String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId); - String targetDir = String.format("/tmp/hive/warehouse/%s", jobId); - Map needMoveFiles = new HashMap<>(); - needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir + "/c3=4/c4=rrr/test1.txt"); - needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir + "/c3=4/c4=bbb/test1.txt"); - FileUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt"); - FileUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt"); - - transactionFiles.put(transactionDir, needMoveFiles); - - Map> partitionDirAndVals = new HashMap<>(); - partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new String[]{"4", "rrr"})).collect(Collectors.toList())); - partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new String[]{"4", "bbb"})).collect(Collectors.toList())); - - FileAggregatedCommitInfo fileAggregatedCommitInfo = new FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals); - List fileAggregatedCommitInfoList = new ArrayList<>(); - fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo); - fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList); - - Assertions.assertTrue(FileUtils.fileExist(targetDir + "/c3=4/c4=bbb/test1.txt")); - Assertions.assertTrue(FileUtils.fileExist(targetDir + "/c3=4/c4=rrr/test1.txt")); - Assertions.assertTrue(!FileUtils.fileExist(transactionDir)); - } - - @SuppressWarnings("checkstyle:UnnecessaryParentheses") - @Test - public void testCombine() throws Exception { - FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(new LocalFileSystemCommitter()); - Map> transactionFiles = new HashMap<>(); - Random random = new Random(); - Long jobIdLong = random.nextLong(); - String jobId = "Job_" + jobIdLong; - String transactionDir = String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId); - String targetDir = String.format("/tmp/hive/warehouse/%s", jobId); - Map needMoveFiles = new HashMap<>(); - needMoveFiles.put(transactionDir + "/c3=3/c4=rrr/test1.txt", targetDir + "/c3=3/c4=rrr/test1.txt"); - needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir + "/c3=4/c4=bbb/test1.txt"); - Map> partitionDirAndVals = new HashMap<>(); - partitionDirAndVals.put("/c3=3/c4=rrr", Arrays.stream((new String[]{"3", "rrr"})).collect(Collectors.toList())); - partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new String[]{"4", "bbb"})).collect(Collectors.toList())); - FileCommitInfo fileCommitInfo = new FileCommitInfo(needMoveFiles, partitionDirAndVals, transactionDir); - FileUtils.createFile(transactionDir + "/c3=3/c4=rrr/test1.txt"); - FileUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt"); - - Map needMoveFiles1 = new HashMap<>(); - needMoveFiles1.put(transactionDir + "/c3=4/c4=rrr/test2.txt", targetDir + "/c3=4/c4=rrr/test2.txt"); - needMoveFiles1.put(transactionDir + "/c3=4/c4=bbb/test2.txt", targetDir + "/c3=4/c4=bbb/test2.txt"); - Map> partitionDirAndVals1 = new HashMap<>(); - partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new String[]{"4", "rrr"})).collect(Collectors.toList())); - partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new String[]{"4", "bbb"})).collect(Collectors.toList())); - FileCommitInfo fileCommitInfo1 = new FileCommitInfo(needMoveFiles1, partitionDirAndVals1, transactionDir); - List fileCommitInfoList = new ArrayList<>(); - fileCommitInfoList.add(fileCommitInfo); - fileCommitInfoList.add(fileCommitInfo1); - - FileAggregatedCommitInfo combine = fileSinkAggregatedCommitter.combine(fileCommitInfoList); - Assertions.assertEquals(1, combine.getTransactionMap().size()); - Assertions.assertEquals(4, combine.getTransactionMap().get(transactionDir).size()); - Assertions.assertEquals(targetDir + "/c3=3/c4=rrr/test1.txt", combine.getTransactionMap().get(transactionDir).get(transactionDir + "/c3=3/c4=rrr/test1.txt")); - Assertions.assertEquals(targetDir + "/c3=4/c4=bbb/test1.txt", combine.getTransactionMap().get(transactionDir).get(transactionDir + "/c3=4/c4=bbb/test1.txt")); - Assertions.assertEquals(targetDir + "/c3=4/c4=rrr/test2.txt", combine.getTransactionMap().get(transactionDir).get(transactionDir + "/c3=4/c4=rrr/test2.txt")); - Assertions.assertEquals(targetDir + "/c3=4/c4=bbb/test2.txt", combine.getTransactionMap().get(transactionDir).get(transactionDir + "/c3=4/c4=bbb/test2.txt")); - Assertions.assertEquals(3, combine.getPartitionDirAndValsMap().keySet().size()); - } - - @SuppressWarnings("checkstyle:UnnecessaryParentheses") - @Test - public void testAbort() throws Exception { - FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(new LocalFileSystemCommitter()); - Map> transactionFiles = new HashMap<>(); - Random random = new Random(); - Long jobIdLong = random.nextLong(); - String jobId = "Job_" + jobIdLong; - String transactionDir = String.format("/tmp/seatunnel/seatunnel/%s/T_%s_0_1", jobId, jobId); - String targetDir = String.format("/tmp/hive/warehouse/%s", jobId); - Map needMoveFiles = new HashMap<>(); - needMoveFiles.put(transactionDir + "/c3=4/c4=rrr/test1.txt", targetDir + "/c3=4/c4=rrr/test1.txt"); - needMoveFiles.put(transactionDir + "/c3=4/c4=bbb/test1.txt", targetDir + "/c3=4/c4=bbb/test1.txt"); - Map> partitionDirAndVals = new HashMap<>(); - partitionDirAndVals.put("/c3=4/c4=rrr", Arrays.stream((new String[]{"4", "rrr"})).collect(Collectors.toList())); - partitionDirAndVals.put("/c3=4/c4=bbb", Arrays.stream((new String[]{"4", "bbb"})).collect(Collectors.toList())); - FileUtils.createFile(transactionDir + "/c3=4/c4=rrr/test1.txt"); - FileUtils.createFile(transactionDir + "/c3=4/c4=bbb/test1.txt"); - - transactionFiles.put(transactionDir, needMoveFiles); - FileAggregatedCommitInfo fileAggregatedCommitInfo = new FileAggregatedCommitInfo(transactionFiles, partitionDirAndVals); - List fileAggregatedCommitInfoList = new ArrayList<>(); - fileAggregatedCommitInfoList.add(fileAggregatedCommitInfo); - fileSinkAggregatedCommitter.commit(fileAggregatedCommitInfoList); - - Assertions.assertTrue(FileUtils.fileExist(targetDir + "/c3=4/c4=bbb/test1.txt")); - Assertions.assertTrue(FileUtils.fileExist(targetDir + "/c3=4/c4=rrr/test1.txt")); - Assertions.assertTrue(!FileUtils.fileExist(transactionDir)); - - fileSinkAggregatedCommitter.abort(fileAggregatedCommitInfoList); - Assertions.assertTrue(!FileUtils.fileExist(targetDir + "/c3=4/c4=bbb/test1.txt")); - Assertions.assertTrue(!FileUtils.fileExist(targetDir + "/c3=4/c4=rrr/test1.txt")); - - // transactionDir will being delete when abort - Assertions.assertTrue(!FileUtils.fileExist(transactionDir)); - } -} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/TestLocalTxtTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/TestLocalTxtTransactionStateFileWriter.java deleted file mode 100644 index ced0d8c8a0f4..000000000000 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/local/TestLocalTxtTransactionStateFileWriter.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.file.local; - -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; -import org.apache.seatunnel.connectors.seatunnel.file.local.sink.filesystem.LocalFileSystem; -import org.apache.seatunnel.connectors.seatunnel.file.local.sink.writer.LocalTxtTransactionStateFileWriter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo; -import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkPartitionDirNameGenerator; -import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -public class TestLocalTxtTransactionStateFileWriter { - - @SuppressWarnings("checkstyle:MagicNumber") - @Test - public void testHdfsTextTransactionStateFileWriter() throws Exception { - String[] fieldNames = new String[]{"c1", "c2", "c3", "c4"}; - SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[]{BasicType.BOOLEAN_TYPE, BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.INT_TYPE}; - SeaTunnelRowType seaTunnelRowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); - - List sinkColumnIndexInRow = new ArrayList<>(); - sinkColumnIndexInRow.add(0); - sinkColumnIndexInRow.add(1); - - List hivePartitionFieldList = new ArrayList<>(); - hivePartitionFieldList.add("c3"); - hivePartitionFieldList.add("c4"); - - List partitionFieldIndexInRow = new ArrayList<>(); - partitionFieldIndexInRow.add(2); - partitionFieldIndexInRow.add(3); - - String jobId = System.currentTimeMillis() + ""; - String targetPath = "/tmp/hive/warehouse/seatunnel.db/test1"; - String tmpPath = "/tmp/seatunnel"; - - TransactionStateFileWriter fileWriter = new LocalTxtTransactionStateFileWriter(seaTunnelRowTypeInfo, - new FileSinkTransactionFileNameGenerator(FileFormat.TEXT, null, "yyyy.MM.dd"), - new FileSinkPartitionDirNameGenerator(hivePartitionFieldList, partitionFieldIndexInRow, "${k0}=${v0}/${k1}=${v1}"), - sinkColumnIndexInRow, - tmpPath, - targetPath, - jobId, - 0, - String.valueOf('\001'), - "\n", - new LocalFileSystem()); - - String transactionId = fileWriter.beginTransaction(1L); - - SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{true, 1, "str1", "str2"}); - fileWriter.write(seaTunnelRow); - - SeaTunnelRow seaTunnelRow1 = new SeaTunnelRow(new Object[]{true, 1, "str1", "str3"}); - fileWriter.write(seaTunnelRow1); - - Optional fileCommitInfoOptional = fileWriter.prepareCommit(); - //check file exists and file content - Assertions.assertTrue(fileCommitInfoOptional.isPresent()); - FileCommitInfo fileCommitInfo = fileCommitInfoOptional.get(); - String transactionDir = tmpPath + "/seatunnel/" + jobId + "/" + transactionId; - Assertions.assertEquals(transactionDir, fileCommitInfo.getTransactionDir()); - Assertions.assertEquals(2, fileCommitInfo.getNeedMoveFiles().size()); - Map needMoveFiles = fileCommitInfo.getNeedMoveFiles(); - Assertions.assertEquals(targetPath + "/c3=str1/c4=str2/" + transactionId + ".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str2/" + transactionId + ".txt")); - Assertions.assertEquals(targetPath + "/c3=str1/c4=str3/" + transactionId + ".txt", needMoveFiles.get(transactionDir + "/c3=str1/c4=str3/" + transactionId + ".txt")); - - Map> partitionDirAndValsMap = fileCommitInfo.getPartitionDirAndValsMap(); - Assertions.assertEquals(2, partitionDirAndValsMap.size()); - Assertions.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str2")); - Assertions.assertTrue(partitionDirAndValsMap.keySet().contains("c3=str1/c4=str3")); - Assertions.assertTrue(partitionDirAndValsMap.get("c3=str1/c4=str2").size() == 2); - Assertions.assertEquals("str1", partitionDirAndValsMap.get("c3=str1/c4=str2").get(0)); - Assertions.assertEquals("str2", partitionDirAndValsMap.get("c3=str1/c4=str2").get(1)); - Assertions.assertEquals("str1", partitionDirAndValsMap.get("c3=str1/c4=str3").get(0)); - Assertions.assertEquals("str3", partitionDirAndValsMap.get("c3=str1/c4=str3").get(1)); - } -}