Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Add read end flag of SSM Small File (#1975)
Browse files Browse the repository at this point in the history
  • Loading branch information
gnillor authored Oct 23, 2018
1 parent 91f7908 commit 90657c7
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,33 @@ public List<LocatedBlock> getAllBlocks() throws IOException {
@Override
public synchronized int read(final byte[] buf, int off, int len) throws IOException {
int realLen = (int) Math.min(len, fileContainerInfo.getLength() - getPos());
return super.read(buf, off, realLen);
if (realLen == 0) {
return -1;
} else {
return super.read(buf, off, realLen);
}
}

@Override
public synchronized int read(final ByteBuffer buf) throws IOException {
int realLen = (int) Math.min(buf.remaining(), fileContainerInfo.getLength() - getPos());
buf.limit(realLen + buf.position());
return super.read(buf);
if (realLen == 0) {
return -1;
} else {
buf.limit(realLen + buf.position());
return super.read(buf);
}
}

@Override
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
int realLen = (int) Math.min(length, fileContainerInfo.getLength() - getPos());
long realPos = position + fileContainerInfo.getOffset();
return super.read(realPos, buffer, offset, realLen);
int realLen = (int) Math.min(length, fileContainerInfo.getLength() - position);
if (realLen == 0) {
return -1;
} else {
return super.read(realPos, buffer, offset, realLen);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,33 @@ public List<LocatedBlock> getAllBlocks() throws IOException {
@Override
public synchronized int read(final byte[] buf, int off, int len) throws IOException {
int realLen = (int) Math.min(len, fileContainerInfo.getLength() - getPos());
return super.read(buf, off, realLen);
if (realLen == 0) {
return -1;
} else {
return super.read(buf, off, realLen);
}
}

@Override
public synchronized int read(final ByteBuffer buf) throws IOException {
int realLen = (int) Math.min(buf.remaining(), fileContainerInfo.getLength() - getPos());
buf.limit(realLen + buf.position());
return super.read(buf);
if (realLen == 0) {
return -1;
} else {
buf.limit(realLen + buf.position());
return super.read(buf);
}
}

@Override
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
int realLen = (int) Math.min(length, fileContainerInfo.getLength() - getPos());
long realPos = position + fileContainerInfo.getOffset();
return super.read(realPos, buffer, offset, realLen);
int realLen = (int) Math.min(length, fileContainerInfo.getLength() - position);
if (realLen == 0) {
return -1;
} else {
return super.read(realPos, buffer, offset, realLen);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang.SerializationUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.io.IOUtils;
import org.smartdata.SmartConstants;
Expand Down Expand Up @@ -129,6 +131,8 @@ protected void execute() throws Exception {

for (String smallFile : smallFileList) {
if ((smallFile != null) && !smallFile.isEmpty() && dfsClient.exists(smallFile)) {
HdfsDataOutputStream append = dfsClient.append(
smallFile, 1024, EnumSet.of(CreateFlag.APPEND), null, null);
long fileLen = dfsClient.getFileInfo(smallFile).getLen();
if (fileLen > 0) {
try (InputStream in = dfsClient.open(smallFile)) {
Expand All @@ -138,6 +142,7 @@ protected void execute() throws Exception {
// Truncate small file, add file container info to XAttr
CompactFileState compactFileState = new CompactFileState(
smallFile, new FileContainerInfo(containerFile, offset, fileLen));
append.close();
truncateAndSetXAttr(smallFile, compactFileState);

// Update compact file state map, offset, status, and log
Expand All @@ -148,7 +153,10 @@ protected void execute() throws Exception {
appendLog(String.format(
"Compact %s to %s successfully.", smallFile, containerFile));
} catch (IOException e) {
// Close output stream and put compact file state map into action result
// Close append, output streams and put compact file state map into action result
if (append != null) {
append.close();
}
if (out != null) {
out.close();
appendResult(new Gson().toJson(compactFileStates));
Expand Down

0 comments on commit 90657c7

Please sign in to comment.