diff --git a/smart-hadoop-support/smart-hadoop-2/src/main/java/org/apache/hadoop/hdfs/CompactInputStream.java b/smart-hadoop-support/smart-hadoop-2/src/main/java/org/apache/hadoop/hdfs/CompactInputStream.java index 6b61a20db64..c84819de7f0 100644 --- a/smart-hadoop-support/smart-hadoop-2/src/main/java/org/apache/hadoop/hdfs/CompactInputStream.java +++ b/smart-hadoop-support/smart-hadoop-2/src/main/java/org/apache/hadoop/hdfs/CompactInputStream.java @@ -74,21 +74,33 @@ public List getAllBlocks() throws IOException { @Override public synchronized int read(final byte[] buf, int off, int len) throws IOException { int realLen = (int) Math.min(len, fileContainerInfo.getLength() - getPos()); - return super.read(buf, off, realLen); + if (realLen == 0) { + return -1; + } else { + return super.read(buf, off, realLen); + } } @Override public synchronized int read(final ByteBuffer buf) throws IOException { int realLen = (int) Math.min(buf.remaining(), fileContainerInfo.getLength() - getPos()); - buf.limit(realLen + buf.position()); - return super.read(buf); + if (realLen == 0) { + return -1; + } else { + buf.limit(realLen + buf.position()); + return super.read(buf); + } } @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { - int realLen = (int) Math.min(length, fileContainerInfo.getLength() - getPos()); long realPos = position + fileContainerInfo.getOffset(); - return super.read(realPos, buffer, offset, realLen); + int realLen = (int) Math.min(length, fileContainerInfo.getLength() - position); + if (realLen == 0) { + return -1; + } else { + return super.read(realPos, buffer, offset, realLen); + } } @Override diff --git a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/apache/hadoop/hdfs/CompactInputStream.java b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/apache/hadoop/hdfs/CompactInputStream.java index 3d192f54aa9..f29b5775f61 100644 --- a/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/apache/hadoop/hdfs/CompactInputStream.java +++ b/smart-hadoop-support/smart-hadoop-3.1/src/main/java/org/apache/hadoop/hdfs/CompactInputStream.java @@ -74,21 +74,33 @@ public List getAllBlocks() throws IOException { @Override public synchronized int read(final byte[] buf, int off, int len) throws IOException { int realLen = (int) Math.min(len, fileContainerInfo.getLength() - getPos()); - return super.read(buf, off, realLen); + if (realLen == 0) { + return -1; + } else { + return super.read(buf, off, realLen); + } } @Override public synchronized int read(final ByteBuffer buf) throws IOException { int realLen = (int) Math.min(buf.remaining(), fileContainerInfo.getLength() - getPos()); - buf.limit(realLen + buf.position()); - return super.read(buf); + if (realLen == 0) { + return -1; + } else { + buf.limit(realLen + buf.position()); + return super.read(buf); + } } @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { - int realLen = (int) Math.min(length, fileContainerInfo.getLength() - getPos()); long realPos = position + fileContainerInfo.getOffset(); - return super.read(realPos, buffer, offset, realLen); + int realLen = (int) Math.min(length, fileContainerInfo.getLength() - position); + if (realLen == 0) { + return -1; + } else { + return super.read(realPos, buffer, offset, realLen); + } } @Override diff --git a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/SmallFileCompactAction.java b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/SmallFileCompactAction.java index f66476edcfb..b4cb472135b 100644 --- a/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/SmallFileCompactAction.java +++ b/smart-hadoop-support/smart-hadoop/src/main/java/org/smartdata/hdfs/action/SmallFileCompactAction.java @@ -21,8 +21,10 @@ import com.google.gson.reflect.TypeToken; import org.apache.commons.lang.SerializationUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.io.IOUtils; import org.smartdata.SmartConstants; @@ -129,6 +131,8 @@ protected void execute() throws Exception { for (String smallFile : smallFileList) { if ((smallFile != null) && !smallFile.isEmpty() && dfsClient.exists(smallFile)) { + HdfsDataOutputStream append = dfsClient.append( + smallFile, 1024, EnumSet.of(CreateFlag.APPEND), null, null); long fileLen = dfsClient.getFileInfo(smallFile).getLen(); if (fileLen > 0) { try (InputStream in = dfsClient.open(smallFile)) { @@ -138,6 +142,7 @@ protected void execute() throws Exception { // Truncate small file, add file container info to XAttr CompactFileState compactFileState = new CompactFileState( smallFile, new FileContainerInfo(containerFile, offset, fileLen)); + append.close(); truncateAndSetXAttr(smallFile, compactFileState); // Update compact file state map, offset, status, and log @@ -148,7 +153,10 @@ protected void execute() throws Exception { appendLog(String.format( "Compact %s to %s successfully.", smallFile, containerFile)); } catch (IOException e) { - // Close output stream and put compact file state map into action result + // Close append, output streams and put compact file state map into action result + if (append != null) { + append.close(); + } if (out != null) { out.close(); appendResult(new Gson().toJson(compactFileStates));