Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Trunk with compression (#1672)
Browse files Browse the repository at this point in the history
* Upate CompressionCodec to create Decompressor

* add a new field compression_impl in metastore

* reformat some code

* update metastore in onSubmit stage

* Refactor CompressionAction and CompressionScheduler for fault tolerance and optimize empty file compression
  • Loading branch information
timmyyao authored Apr 4, 2018
1 parent 6140397 commit b74579f
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 90 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.smartdata.model;

/**
* This class is used between compression scheduler and action. It maintains CompressionFileState
* generated by CompressionAction and the temp file path for CompressionAction to store the
* compressed file. After CompressionScheduler gets this class, it will replace the original file
* with the temp compressed file and then update the metastore with CompressionFileState.
*/
public class CompressionFileInfo {
private boolean needReplace = false;
private String tempPath = null;
private CompressionFileState compressionFileState = null;

public CompressionFileInfo(boolean needReplace, CompressionFileState compressionFileState) {
this(needReplace, null, compressionFileState);
}

public CompressionFileInfo(boolean needReplace, String tempPath,
CompressionFileState compressionFileState) {
this.needReplace = needReplace;
this.tempPath = tempPath;
this.compressionFileState = compressionFileState;
}

public void setCompressionFileState(CompressionFileState compressionFileState) {
this.compressionFileState = compressionFileState;
}

public void setTempPath(String tempPath) {
this.tempPath = tempPath;
}

public void setNeedReplace(boolean needReplace) {
this.needReplace = needReplace;
}

public CompressionFileState getCompressionFileState() {
return compressionFileState;
}

public String getTempPath() {
return tempPath;
}

public boolean needReplace() {
return needReplace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public CompressionTrunk locateCompressionTrunk(boolean compressed,
int index = compressed ? getPosIndexByCompressedOffset(offset) :
getPosIndexByOriginalOffset(offset);
CompressionTrunk compressionTrunk = new CompressionTrunk(index);
compressionTrunk.setCompressionImpl(compressionImpl);
compressionTrunk.setOriginOffset(originalPos[index]);
compressionTrunk.setOriginLength(getOriginTrunkSize(index));
compressionTrunk.setCompressedOffset(compressedPos[index]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,30 @@
*/
public class CompressionTrunk {
private int index;
private String compressionImpl;
private long originOffset;
private long originLength;
private long compressedOffset;
private long compressedLength;

public CompressionTrunk(int index) {
this(index, 0, 0, 0, 0);
this(index, "snappy", 0, 0, 0, 0);
}

public CompressionTrunk(int index, long originOffset, long originLength,
long compressedOffset, long compressedLength) {
public CompressionTrunk(int index, String compressionImpl, long originOffset, long originLength,
long compressedOffset, long compressedLength) {
this.index = index;
this.compressionImpl = compressionImpl;
this.originOffset = originOffset;
this.originLength = originLength;
this.compressedOffset = compressedOffset;
this.compressedLength = compressedLength;
}

public String getCompressionImpl() {
return compressionImpl;
}

public void setOriginOffset(long originOffset) {
this.originOffset = originOffset;
}
Expand All @@ -60,6 +66,10 @@ public int getIndex() {
return index;
}

public void setCompressionImpl(String compressionImpl) {
this.compressionImpl = compressionImpl;
}

public long getOriginOffset() {
return originOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public static CompressionFileState convert(String path,
builder.setFileName(path)
.setFileStage(stage)
.setBufferSize(proto.getBufferSize())
.setCompressImpl(proto.getCompressionImpl())
.setOriginalLength(proto.getOriginalLength())
.setCompressedLength(proto.getCompressedLength())
.setOriginalPos(proto.getOriginalPosList())
Expand All @@ -240,6 +241,7 @@ public static CompressionFileState convert(String path,
public static CompressionFileStateProto convert(CompressionFileState fileState) {
CompressionFileStateProto.Builder builder = CompressionFileStateProto.newBuilder();
builder.setBufferSize(fileState.getBufferSize())
.setCompressionImpl(fileState.getCompressionImpl())
.setOriginalLength(fileState.getOriginalLength())
.setCompressedLength(fileState.getCompressedLength());
builder.addAllOriginalPos(Arrays.asList(fileState.getOriginalPos()));
Expand Down
3 changes: 2 additions & 1 deletion smart-common/src/main/proto/ClientServer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ message GetFileStateRequestProto {
}

message CompressionFileStateProto {
required int32 bufferSize = 2;
required int32 bufferSize = 1;
required string compressionImpl = 2;
required int64 originalLength = 3;
required int64 compressedLength = 4;
repeated int64 originalPos = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class TestCompressionFileState {
private CompressionFileState compressionFileState;
private final String fileName = "/testFile";
private final int bufferSize = 10;
private final String compressionImpl = "snappy";
private final long originalLength = 86;
private final long compressedLength = 34;
private final Long[] originalPos = {0L, 10L, 20L, 30L, 40L, 50L, 60L, 70L, 80L};
Expand All @@ -40,6 +41,7 @@ public void init() {
.setFileStage(FileState.FileStage.DONE)
.setFileName(fileName)
.setBufferSize(bufferSize)
.setCompressImpl(compressionImpl)
.setOriginalLength(originalLength)
.setCompressedLength(compressedLength)
.setOriginalPos(originalPos)
Expand All @@ -52,6 +54,7 @@ public void testBasicVariables() {
Assert.assertEquals(FileState.FileStage.DONE, compressionFileState.getFileStage());
Assert.assertEquals(fileName, compressionFileState.getPath());
Assert.assertEquals(bufferSize, compressionFileState.getBufferSize());
Assert.assertEquals(compressionImpl, compressionFileState.getCompressionImpl());
Assert.assertEquals(originalLength, compressionFileState.getOriginalLength());
Assert.assertEquals(compressedLength, compressionFileState.getCompressedLength());
Assert.assertArrayEquals(originalPos, compressionFileState.getOriginalPos());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
import org.smartdata.hdfs.CompressionCodec;
import org.smartdata.model.CompressionFileState;
import org.smartdata.model.CompressionTrunk;
import org.smartdata.model.FileState;
Expand All @@ -40,6 +40,7 @@ public class CompressionInputStream extends SmartInputStream {

private CompressionFileState compressionFileState;
private final long originalLength;
private CompressionCodec compressionCodec;

CompressionInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
FileState fileState) throws IOException, UnresolvedLinkException {
Expand All @@ -51,8 +52,9 @@ public class CompressionInputStream extends SmartInputStream {
}
originalLength = compressionFileState.getOriginalLength();
int bufferSize = compressionFileState.getBufferSize();
this.decompressor = new SnappyDecompressor(bufferSize);
this.compressionCodec = new CompressionCodec();
this.buffer = new byte[bufferSize];
this.decompressor = compressionCodec.creatDecompressor(bufferSize, compressionFileState.getCompressionImpl());
}

@Override
Expand Down Expand Up @@ -104,6 +106,7 @@ private int decompress(byte[] b, int off, int len) throws IOException {
return -1;
}
// Send the read data to the decompressor
decompressor.reset();
decompressor.setInput(buffer, 0, m);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
package org.smartdata.hdfs;

import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.bzip2.Bzip2Compressor;
import org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor;
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor;
import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,32 +39,37 @@
*/
public class CompressionCodec {
static final Logger LOG = LoggerFactory.getLogger(SmartAction.class);
private String hadoopnativePath;
SmartConf conf = new SmartConf();

public CompressionCodec() {
//hadoopnativePath used to suport Bzip2 compresionImpl
if (!(System.getenv("HADOOP_HOME") == null)) {
this.hadoopnativePath = System.getenv("HADOOP_HOME") + "/lib/native/libhadoop.so";
}else {
this.hadoopnativePath = System.getenv("HADOOP_COMMON_HOME") + "/lib/native/libhadoop.so";
}
System.load(hadoopnativePath);
}

/**
* Create a compressor
*/
public Compressor createCompressor(int bufferSize, String compressionImpl) {
switch (compressionImpl){
switch (compressionImpl) {
case "Lz4" :
return new Lz4Compressor(bufferSize);

case "Bzip2" :
String hadoopnativePath;
if (!(System.getenv("HADOOP_HOME") == null)) {
hadoopnativePath = System.getenv("HADOOP_HOME") + "/lib/native/libhadoop.so";
}else {
hadoopnativePath = System.getenv("HADOOP_COMMON_HOME") + "/lib/native/libhadoop.so";
}
System.load(hadoopnativePath);
if (NativeCodeLoader.isNativeCodeLoaded())
if (NativeCodeLoader.isNativeCodeLoaded()) {
if (Bzip2Factory.isNativeBzip2Loaded(conf)) {
return new Bzip2Compressor(Bzip2Factory.getBlockSize(conf),
Bzip2Factory.getWorkFactor(conf),
bufferSize);
} else {
LOG.error("Failed to load/initialize native-bzip2 library");
}
}

case "Zlib" :
return new ZlibCompressor(ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION,
Expand All @@ -71,4 +81,29 @@ public Compressor createCompressor(int bufferSize, String compressionImpl) {
return new SnappyCompressor(bufferSize);
}
}

/**
* Create a Decompressor
*/
public Decompressor creatDecompressor(int bufferSize, String compressionImpl){
switch (compressionImpl){
case "Lz4" :
return new Lz4Decompressor(bufferSize);

case "Bzip2" :
if (NativeCodeLoader.isNativeCodeLoaded()) {
if (Bzip2Factory.isNativeBzip2Loaded(conf)) {
return new Bzip2Decompressor(false, bufferSize);
} else {
LOG.error("Failed to load/initialize native-bzip2 library");
}
}

case "Zlib" :
return new ZlibDecompressor(ZlibDecompressor.CompressionHeader.DEFAULT_HEADER,bufferSize);

default:
return new SnappyDecompressor(bufferSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public SmartCompressorStream(InputStream inputStream, OutputStream outputStream,
int overHead = bufferSize / 6 + 32;
buffer = new byte[bufferSize + overHead];
this.compressor = compressionCodec.createCompressor(bufferSize + overHead, compressionInfo.getCompressionImpl());
if(compressor instanceof SnappyCompressor){
if (compressor instanceof SnappyCompressor) {
compressionInfo.setCompressionImpl("snappy");
}
}
Expand Down
Loading

0 comments on commit b74579f

Please sign in to comment.