diff --git a/hdfsreader/pom.xml b/hdfsreader/pom.xml index ec0e1e0fa..de642a112 100644 --- a/hdfsreader/pom.xml +++ b/hdfsreader/pom.xml @@ -35,12 +35,59 @@ io.prestosql.hadoop hadoop-apache ${hadoop.version} + + + slf4j-api + org.slf4j + + + com.google.guava + guava + + io.prestosql.hive hive-apache ${hive.version} + + + slf4j-api + org.slf4j + + + com.google.guava + guava + + + + + + org.apache.avro + avro + 1.10.0 + + + slf4j-api + org.slf4j + + + + + org.apache.parquet + parquet-avro + 1.11.1 + + + com.google.guava + guava + + + slf4j-api + org.slf4j + + diff --git a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java index 7bb483054..9ec765b0b 100644 --- a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java +++ b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java @@ -705,32 +705,10 @@ public boolean checkHdfsFileType(String filepath, String specifiedFileType) Path file = new Path(filepath); try (FileSystem fs = FileSystem.get(hadoopConf); FSDataInputStream in = fs.open(file)) { - - if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.CSV) - || StringUtils.equalsIgnoreCase(specifiedFileType, Constant.TEXT)) { - - boolean isORC = isORCFile(file, fs, in);// 判断是否是 ORC File - if (isORC) { - return false; - } - boolean isRC = isRCFile(filepath, in);// 判断是否是 RC File - if (isRC) { - return false; - } - boolean isSEQ = isSequenceFile(file, in);// 判断是否是 Sequence File - if (isSEQ) { - return false; - } - boolean isParquet = isParquetFile(file, in); //判断是否为Parquet File - // 如果不是ORC,RC,PARQUET,SEQ,则默认为是TEXT或CSV类型 - return !isParquet; - } - else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.ORC)) { - + if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.ORC)) { return isORCFile(file, fs, in); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.RC)) { - return isRCFile(filepath, in); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.SEQ)) { @@ -740,10 +718,14 @@ else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.SEQ)) { else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.PARQUET)) { return isParquetFile(file, in); } + else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.CSV) + || StringUtils.equalsIgnoreCase(specifiedFileType, Constant.TEXT)) { + return true; + } } catch (Exception e) { - String message = String.format("检查文件[%s]类型失败,目前支持ORC,SEQUENCE,RCFile,TEXT,CSV五种格式的文件," + - "请检查您文件类型和文件是否正确。", filepath); + String message = String.format("检查文件[%s]类型失败,目前支持 %s 格式的文件," + + "请检查您文件类型和文件是否正确。", filepath, Constant.SUPPORT_FILE_TYPE); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e); } diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java index 62e27e1f6..61038e756 100644 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsHelper.java @@ -52,6 +52,7 @@ import java.io.IOException; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -59,15 +60,16 @@ import java.util.StringJoiner; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; public class HdfsHelper { public static final Logger LOG = LoggerFactory.getLogger(HdfsHelper.class); public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication"; public static final String HDFS_DEFAULT_FS_KEY = "fs.defaultFS"; - public FileSystem fileSystem = null; - public JobConf conf = null; - public org.apache.hadoop.conf.Configuration hadoopConf = null; + private FileSystem fileSystem = null; + private JobConf conf = null; + private org.apache.hadoop.conf.Configuration hadoopConf = null; // Kerberos private boolean haveKerberos = false; private String kerberosKeytabFilePath; @@ -400,23 +402,18 @@ public boolean isPathDir(String filePath) */ public void deleteFiles(Path[] paths, boolean delDotFile) { - String fname; - for (Path path : paths) { - LOG.info("delete file [{}], include dotfile({}).", path, delDotFile); + List needDelPaths; + if (delDotFile) { + LOG.info("仅删除指定目录下的点(.)开头的文件或文件夹"); + needDelPaths = Arrays.stream(paths).filter(x -> x.getName().startsWith(".")).collect(Collectors.toList()); + } else { + LOG.info("删除指定目录下的不以点(.)开头的文件夹或文件夹"); + needDelPaths = Arrays.stream(paths).filter(x -> ! x.getName().startsWith(".")).collect(Collectors.toList()); + } + + for (Path path : needDelPaths) { try { - fname = path.getName(); - // 如果只要删除点开头的文件 - if (delDotFile) { - if (fname.startsWith(".")) { - fileSystem.delete(path, true); - } - } - else { - // 保留dot文件,其他删除 - if (!fname.startsWith(("."))) { - fileSystem.delete(path, true); - } - } + fileSystem.delete(path, true); } catch (IOException e) { LOG.error("删除文件[{}]时发生IO异常,请检查您的网络是否正常!", path); @@ -482,7 +479,7 @@ public void renameFile(Set tmpFiles, Set endFiles) LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.HDFS_RENAME_FILE_ERROR, message); } - LOG.info("finish rename file [{}] to file [{}].", srcFile, dstFile); + LOG.info("finish rename file."); } else { LOG.info("文件[{}]内容为空,请检查写入是否正常!", srcFile); @@ -609,7 +606,7 @@ public void parquetFileStartWrite(RecordReceiver lineReceiver, Configuration con if ("NONE".equals(compress)) { compress = "UNCOMPRESSED"; } -// List columnNames = getColumnNames(columns); +// List columnNames = getColumnNames(columns) // List columnTypeInspectors = getparColumnTypeInspectors(columns); // StructObjectInspector inspector = ObjectInspectorFactory // .getStandardStructObjectInspector(columnNames, columnTypeInspectors); diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java index 1d41362d9..7a1c67883 100644 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java @@ -205,7 +205,7 @@ public void post() if ("overwrite".equals(writeMode)) { hdfsHelper.deleteFiles(existFilePaths, false); } - hdfsHelper.renameFile(tmpFiles, endFiles); + hdfsHelper.renameFile(this.tmpFiles, this.endFiles); // 删除临时目录 hdfsHelper.deleteFiles(existFilePaths, true); } @@ -283,6 +283,9 @@ else if ("ZLIB".equals(compress)) { this.tmpFiles.add(fullFileName); this.endFiles.add(endFullFileName); } + } else { + this.tmpFiles.add(fullFileName); + this.endFiles.add(endFullFileName); } splitedTaskConfig @@ -399,8 +402,7 @@ public void prepare() @Override public void startWrite(RecordReceiver lineReceiver) { - LOG.info("begin do write..."); - LOG.info(String.format("write to file : [%s]", this.fileName)); + LOG.info("write to file : [{}]", this.fileName); if ("TEXT".equals(fileType)) { //写TEXT FILE hdfsHelper.textFileStartWrite(lineReceiver, this.writerSliceConfig, this.fileName,