Skip to content

Commit

Permalink
Fixed: failed to reading text filetype from hdfs #66
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao committed Dec 12, 2020
1 parent c66bff7 commit 22f18e4
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 49 deletions.
47 changes: 47 additions & 0 deletions hdfsreader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,59 @@
<groupId>io.prestosql.hadoop</groupId>
<artifactId>hadoop-apache</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>slf4j-api</groupId>
<artifactId>org.slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.prestosql.hive</groupId>
<artifactId>hive-apache</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>slf4j-api</groupId>
<artifactId>org.slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.0</version>
<exclusions>
<exclusion>
<groupId>slf4j-api</groupId>
<artifactId>org.slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.11.1</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>slf4j-api</groupId>
<artifactId>org.slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,32 +705,10 @@ public boolean checkHdfsFileType(String filepath, String specifiedFileType)
Path file = new Path(filepath);

try (FileSystem fs = FileSystem.get(hadoopConf); FSDataInputStream in = fs.open(file)) {

if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.CSV)
|| StringUtils.equalsIgnoreCase(specifiedFileType, Constant.TEXT)) {

boolean isORC = isORCFile(file, fs, in);// 判断是否是 ORC File
if (isORC) {
return false;
}
boolean isRC = isRCFile(filepath, in);// 判断是否是 RC File
if (isRC) {
return false;
}
boolean isSEQ = isSequenceFile(file, in);// 判断是否是 Sequence File
if (isSEQ) {
return false;
}
boolean isParquet = isParquetFile(file, in); //判断是否为Parquet File
// 如果不是ORC,RC,PARQUET,SEQ,则默认为是TEXT或CSV类型
return !isParquet;
}
else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.ORC)) {

if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.ORC)) {
return isORCFile(file, fs, in);
}
else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.RC)) {

return isRCFile(filepath, in);
}
else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.SEQ)) {
Expand All @@ -740,10 +718,14 @@ else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.SEQ)) {
else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.PARQUET)) {
return isParquetFile(file, in);
}
else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.CSV)
|| StringUtils.equalsIgnoreCase(specifiedFileType, Constant.TEXT)) {
return true;
}
}
catch (Exception e) {
String message = String.format("检查文件[%s]类型失败,目前支持ORC,SEQUENCE,RCFile,TEXT,CSV五种格式的文件," +
"请检查您文件类型和文件是否正确。", filepath);
String message = String.format("检查文件[%s]类型失败,目前支持 %s 格式的文件," +
"请检查您文件类型和文件是否正确。", filepath, Constant.SUPPORT_FILE_TYPE);
LOG.error(message);
throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,24 @@

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.StringJoiner;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class HdfsHelper
{
public static final Logger LOG = LoggerFactory.getLogger(HdfsHelper.class);
public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
public static final String HDFS_DEFAULT_FS_KEY = "fs.defaultFS";
public FileSystem fileSystem = null;
public JobConf conf = null;
public org.apache.hadoop.conf.Configuration hadoopConf = null;
private FileSystem fileSystem = null;
private JobConf conf = null;
private org.apache.hadoop.conf.Configuration hadoopConf = null;
// Kerberos
private boolean haveKerberos = false;
private String kerberosKeytabFilePath;
Expand Down Expand Up @@ -400,23 +402,18 @@ public boolean isPathDir(String filePath)
*/
public void deleteFiles(Path[] paths, boolean delDotFile)
{
String fname;
for (Path path : paths) {
LOG.info("delete file [{}], include dotfile({}).", path, delDotFile);
List<Path> needDelPaths;
if (delDotFile) {
LOG.info("仅删除指定目录下的点(.)开头的文件或文件夹");
needDelPaths = Arrays.stream(paths).filter(x -> x.getName().startsWith(".")).collect(Collectors.toList());
} else {
LOG.info("删除指定目录下的不以点(.)开头的文件夹或文件夹");
needDelPaths = Arrays.stream(paths).filter(x -> ! x.getName().startsWith(".")).collect(Collectors.toList());
}

for (Path path : needDelPaths) {
try {
fname = path.getName();
// 如果只要删除点开头的文件
if (delDotFile) {
if (fname.startsWith(".")) {
fileSystem.delete(path, true);
}
}
else {
// 保留dot文件,其他删除
if (!fname.startsWith(("."))) {
fileSystem.delete(path, true);
}
}
fileSystem.delete(path, true);
}
catch (IOException e) {
LOG.error("删除文件[{}]时发生IO异常,请检查您的网络是否正常!", path);
Expand Down Expand Up @@ -482,7 +479,7 @@ public void renameFile(Set<String> tmpFiles, Set<String> endFiles)
LOG.error(message);
throw DataXException.asDataXException(HdfsWriterErrorCode.HDFS_RENAME_FILE_ERROR, message);
}
LOG.info("finish rename file [{}] to file [{}].", srcFile, dstFile);
LOG.info("finish rename file.");
}
else {
LOG.info("文件[{}]内容为空,请检查写入是否正常!", srcFile);
Expand Down Expand Up @@ -609,7 +606,7 @@ public void parquetFileStartWrite(RecordReceiver lineReceiver, Configuration con
if ("NONE".equals(compress)) {
compress = "UNCOMPRESSED";
}
// List<String> columnNames = getColumnNames(columns);
// List<String> columnNames = getColumnNames(columns)
// List<ObjectInspector> columnTypeInspectors = getparColumnTypeInspectors(columns);
// StructObjectInspector inspector = ObjectInspectorFactory
// .getStandardStructObjectInspector(columnNames, columnTypeInspectors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void post()
if ("overwrite".equals(writeMode)) {
hdfsHelper.deleteFiles(existFilePaths, false);
}
hdfsHelper.renameFile(tmpFiles, endFiles);
hdfsHelper.renameFile(this.tmpFiles, this.endFiles);
// 删除临时目录
hdfsHelper.deleteFiles(existFilePaths, true);
}
Expand Down Expand Up @@ -283,6 +283,9 @@ else if ("ZLIB".equals(compress)) {
this.tmpFiles.add(fullFileName);
this.endFiles.add(endFullFileName);
}
} else {
this.tmpFiles.add(fullFileName);
this.endFiles.add(endFullFileName);
}

splitedTaskConfig
Expand Down Expand Up @@ -399,8 +402,7 @@ public void prepare()
@Override
public void startWrite(RecordReceiver lineReceiver)
{
LOG.info("begin do write...");
LOG.info(String.format("write to file : [%s]", this.fileName));
LOG.info("write to file : [{}]", this.fileName);
if ("TEXT".equals(fileType)) {
//写TEXT FILE
hdfsHelper.textFileStartWrite(lineReceiver, this.writerSliceConfig, this.fileName,
Expand Down

0 comments on commit 22f18e4

Please sign in to comment.