Skip to content

Commit

Permalink
ORC-614: Implement efficient seek() in decompression streams
Browse files Browse the repository at this point in the history
The current implementation of ZlibDecompressionStream::seek and
BlockDecompressionStream::seek resets the state of the decompressor
and the underlying file reader and throws away their buffers.

This commit introduces two optimizations which rely on reusing
the buffers that still contain useful data, and therefore reducing
the time spent reading/uncompressing the buffers again.

The first case is when the seeked position is already read
and decompressed into the output stream.

The second case is when the seeked position is already read from
the input stream, but has not been decompressed yet, ie. it's
not in the output stream.

Tests:
 - Run the ORC tests, and the Impala tests working on ORC tables.
 - The regression that apache#476 would cause is not present anymore.
  • Loading branch information
luksan47 committed Mar 25, 2020
1 parent 06bc0a4 commit f8b3a93
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 25 deletions.
167 changes: 142 additions & 25 deletions c++/src/Compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ DIAGNOSTIC_PUSH
inputBufferEnd = nullptr;
} else {
inputBufferEnd = inputBuffer + length;
inputBufferStartPosition = input->ByteCount() - length;
inputBufferStart = inputBuffer;
}
}

Expand Down Expand Up @@ -373,21 +375,31 @@ DIAGNOSTIC_PUSH
z_stream zstream;
DataBuffer<char> buffer;

// the current state
// The current state.
DecompressState state;

// the start of the current buffer
// This pointer is not owned by us. It is either owned by zstream or
// the underlying stream.
// The starting and current position of the buffer for the uncompressed
// data.
const char* outputBufferStart;
const char* outputBuffer;
// the size of the current buffer
// The original (ie. the overall) and the actual length of the uncompressed
// data.
size_t uncompressedBufferLength;
size_t outputBufferLength;
// the size of the current chunk

// The remaining size of the current chunk that is not yet consumed
// ie. decompressed or returned in output if state==DECOMPRESS_ORIGINAL
size_t remainingLength;

// the last buffer returned from the input
const char *inputBuffer;
const char *inputBufferEnd;
const char* inputBufferStart;
const char* inputBuffer;
const char* inputBufferEnd;

// Variables for saving the position of the header and the start of the
// buffer. Used when we have to seek a position.
size_t headerPosition;
size_t inputBufferStartPosition;

// roughly the number of bytes returned
off_t bytesReturned;
Expand Down Expand Up @@ -434,6 +446,11 @@ DIAGNOSTIC_PUSH
inputBuffer = nullptr;
inputBufferEnd = nullptr;
bytesReturned = 0;
headerPosition = 0;
inputBufferStartPosition = 0;
uncompressedBufferLength = 0;
inputBufferStart = nullptr;
outputBufferStart = nullptr;
}

DIAGNOSTIC_POP
Expand All @@ -447,7 +464,10 @@ DIAGNOSTIC_POP
}

bool ZlibDecompressionStream::Next(const void** data, int*size) {
// if the user pushed back, return them the partial buffer
// If we are starting a new header, we will have to store its positions
// after decompressing.
bool saveBufferPositions = false;
// If the user pushed back or seeked within the same chunk.
if (outputBufferLength) {
*data = outputBuffer;
*size = static_cast<int>(outputBufferLength);
Expand All @@ -457,6 +477,9 @@ DIAGNOSTIC_POP
}
if (state == DECOMPRESS_HEADER || remainingLength == 0) {
readHeader();
headerPosition
= inputBufferStartPosition + (inputBuffer - inputBufferStart);
saveBufferPositions = true;
}
if (state == DECOMPRESS_EOF) {
return false;
Expand Down Expand Up @@ -527,6 +550,10 @@ DIAGNOSTIC_POP
inputBuffer += availSize;
remainingLength -= availSize;
bytesReturned += *size;
if (saveBufferPositions) {
uncompressedBufferLength = *size;
outputBufferStart = reinterpret_cast<const char*>(*data);
}
return true;
}

Expand Down Expand Up @@ -564,16 +591,47 @@ DIAGNOSTIC_POP
return bytesReturned;
}

/** There are three possible scenarios when seeking a position:
* 1. The seeked position is already read and decompressed into
* the output stream.
* 2. It is already read from the input stream, but has not been
* decompressed yet, ie. it's not in the output stream.
* 3. It is not read yet from the inputstream.
*/
void ZlibDecompressionStream::seek(PositionProvider& position) {
// clear state to force seek to read from the right position
size_t seekedPosition = position.current();
// Case 3.: the seeked position is the one that is currently buffered and
// decompressed. Here we only need to set the output buffer's pointer to the
// seeked position. Since headerPos saves the position after reading the
// first 3 bytes of the header, it is compared to the looked up pos + 3.
if (headerPosition == seekedPosition + 3
&& inputBufferStartPosition <= headerPosition && inputBufferStart) {
position.next(); // Skip the input level position.
size_t posInChunk = position.next(); // Chunk level position.
outputBufferLength = uncompressedBufferLength - posInChunk;
outputBuffer = outputBufferStart + posInChunk;
return;
}
// Clear state to prepare reading from a new chunk header.
state = DECOMPRESS_HEADER;
outputBuffer = nullptr;
outputBufferLength = 0;
remainingLength = 0;
inputBuffer = nullptr;
inputBufferEnd = nullptr;
if (seekedPosition < static_cast<uint64_t>(input->ByteCount())) {
// Case 2.: The input is buffered, but not yet decompressed. No need to
// force re-reading the inputBuffer, we just have to move it to the
// seeked position.
position.next(); // Skip the input level position.
inputBuffer
= inputBufferStart + (seekedPosition - inputBufferStartPosition);
} else {
// Case 1.: The seeked position is not in the input buffer, here we are
// forcing to read it.
inputBuffer = nullptr;
inputBufferEnd = nullptr;
input->seek(position); // Actually use the input level position.
}

input->seek(position);
bytesReturned = static_cast<off_t>(input->ByteCount());
if (!Skip(static_cast<int>(position.next()))) {
throw ParseError("Bad skip in ZlibDecompressionStream::seek");
Expand Down Expand Up @@ -621,6 +679,8 @@ DIAGNOSTIC_POP
inputBufferPtrEnd = nullptr;
} else {
inputBufferPtrEnd = inputBufferPtr + length;
inputBufferStartPosition = input->ByteCount() - length;
inputBufferPtrStart = inputBufferPtr;
}
}

Expand Down Expand Up @@ -663,17 +723,28 @@ DIAGNOSTIC_POP
// the current state
DecompressState state;

// the start of the current output buffer
// The starting and current position of the buffer for the uncompressed
// data.
const char* outputBufferPtrStart;
const char* outputBufferPtr;
// the size of the current output buffer
// The original (ie. the overall) and the actual length of the uncompressed
// data.
size_t uncompressedBufferLength;
size_t outputBufferLength;

// the size of the current chunk
// The remaining size of the current chunk that is not yet consumed
// ie. decompressed or returned in output if state==DECOMPRESS_ORIGINAL
size_t remainingLength;

// the last buffer returned from the input
const char *inputBufferPtr;
const char *inputBufferPtrEnd;
const char* inputBufferPtrStart;
const char* inputBufferPtr;
const char* inputBufferPtrEnd;

// Variables for saving the position of the header and the start of the
// buffer. Used when we have to seek a position.
size_t headerPosition;
size_t inputBufferStartPosition;

// bytes returned by this stream
off_t bytesReturned;
Expand All @@ -688,16 +759,25 @@ DIAGNOSTIC_POP
inputBuffer(pool, bufferSize),
outputBuffer(pool, bufferSize),
state(DECOMPRESS_HEADER),
outputBufferPtrStart(nullptr),
outputBufferPtr(nullptr),
uncompressedBufferLength(0),
outputBufferLength(0),
remainingLength(0),
inputBufferPtrStart(nullptr),
inputBufferPtr(nullptr),
inputBufferPtrEnd(nullptr),
bytesReturned(0) {
headerPosition(0),
inputBufferStartPosition(0),
bytesReturned(0)
{
}

bool BlockDecompressionStream::Next(const void** data, int*size) {
// if the user pushed back, return them the partial buffer
// If we are starting a new header, we will have to store its positions
// after decompressing.
bool saveBufferPositions = false;
// If the user pushed back or seeked within the same chunk.
if (outputBufferLength) {
*data = outputBufferPtr;
*size = static_cast<int>(outputBufferLength);
Expand All @@ -708,6 +788,9 @@ DIAGNOSTIC_POP
}
if (state == DECOMPRESS_HEADER || remainingLength == 0) {
readHeader();
headerPosition
= inputBufferStartPosition + (inputBufferPtr - inputBufferPtrStart);
saveBufferPositions = true;
}
if (state == DECOMPRESS_EOF) {
return false;
Expand Down Expand Up @@ -763,7 +846,10 @@ DIAGNOSTIC_POP
outputBufferPtr = outputBuffer.data() + outputBufferLength;
outputBufferLength = 0;
}

if (saveBufferPositions) {
uncompressedBufferLength = *size;
outputBufferPtrStart = reinterpret_cast<const char*>(*data);
}
bytesReturned += *size;
return true;
}
Expand Down Expand Up @@ -801,16 +887,47 @@ DIAGNOSTIC_POP
return bytesReturned;
}

/** There are three possible scenarios when seeking a position:
* 1. The seeked position is already read and decompressed into
* the output stream.
* 2. It is already read from the input stream, but has not been
* decompressed yet, ie. it's not in the output stream.
* 3. It is not read yet from the inputstream.
*/
void BlockDecompressionStream::seek(PositionProvider& position) {
// clear state to force seek to read from the right position
size_t seekedPosition = position.current();
// Case 1.: the seeked position is the one that is currently buffered and
// decompressed. Here we only need to set the output buffer's pointer to the
// seeked position. Since headerPos saves the position after reading the
// first 3 bytes of the header, it is compared to the looked up pos + 3.
if (headerPosition == seekedPosition + 3
&& inputBufferStartPosition <= headerPosition && inputBufferPtrStart) {
position.next(); // Skip the input level position.
size_t posInChunk = position.next(); // Chunk level position.
outputBufferLength = uncompressedBufferLength - posInChunk;
outputBufferPtr = outputBufferPtrStart + posInChunk;
return;
}
// Clear state to prepare reading from a new chunk header.
state = DECOMPRESS_HEADER;
outputBufferPtr = nullptr;
outputBufferLength = 0;
remainingLength = 0;
inputBufferPtr = nullptr;
inputBufferPtrEnd = nullptr;
if (seekedPosition < static_cast<uint64_t>(input->ByteCount())) {
// Case 2.: The input is buffered, but not yet decompressed. No need to
// force re-reading the inputBuffer, we just have to move it to the
// seeked position.
position.next(); // Skip the input level position.
inputBufferPtr
= inputBufferPtrStart + (seekedPosition - inputBufferStartPosition);
} else {
// Case 3.: The seeked position is not in the input buffer, here we are
// forcing to read it.
inputBufferPtr = nullptr;
inputBufferPtrEnd = nullptr;
input->seek(position); // Actually use the input level position.
}

input->seek(position);
if (!Skip(static_cast<int>(position.next()))) {
throw ParseError("Bad skip in " + getName());
}
Expand Down
4 changes: 4 additions & 0 deletions c++/src/io/InputStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ namespace orc {
return result;
}

uint64_t PositionProvider::current() {
return *position;
}

SeekableInputStream::~SeekableInputStream() {
// PASS
}
Expand Down
1 change: 1 addition & 0 deletions c++/src/io/InputStream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ namespace orc {
public:
PositionProvider(const std::list<uint64_t>& positions);
uint64_t next();
uint64_t current();
};

/**
Expand Down

0 comments on commit f8b3a93

Please sign in to comment.