Skip to content

Commit

Permalink
ORC-557 Fix problem reading large header with uncompressed streams.
Browse files Browse the repository at this point in the history
Fixes #439 #435

Signed-off-by: Owen O'Malley <omalley@apache.org>
  • Loading branch information
omalley committed Oct 22, 2019
1 parent 0635bb3 commit 1b24716
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 38 deletions.
69 changes: 31 additions & 38 deletions java/core/src/java/org/apache/orc/impl/InStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public abstract class InStream extends InputStream {
protected final Object name;
protected final long offset;
protected final long length;
protected DiskRangeList bytes;
// position in the stream (0..length)
protected long position;

public InStream(Object name, long offset, long length) {
this.name = name;
Expand All @@ -59,6 +62,32 @@ public String toString() {
@Override
public abstract void close();

/**
* Set the current range
* @param newRange the block that is current
* @param isJump if this was a seek instead of a natural read
*/
abstract protected void setCurrent(DiskRangeList newRange,
boolean isJump);
/**
* Reset the input to a new set of data.
* @param input the input data
*/
protected void reset(DiskRangeList input) {
bytes = input;
while (input != null &&
(input.getEnd() <= offset ||
input.getOffset() > offset + length)) {
input = input.next;
}
if (input == null || input.getOffset() <= offset) {
position = 0;
} else {
position = input.getOffset() - offset;
}
setCurrent(input, true);
}

public abstract void changeIv(Consumer<byte[]> modifier);

static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
Expand All @@ -75,9 +104,6 @@ static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
* Implements a stream over an uncompressed stream.
*/
public static class UncompressedStream extends InStream {
private DiskRangeList bytes;
// position in the stream (0..length)
protected long position;
protected ByteBuffer decrypted;
protected DiskRangeList currentRange;
protected long currentOffset;
Expand All @@ -100,16 +126,6 @@ public UncompressedStream(Object name,
reset(input);
}

protected void reset(DiskRangeList input) {
this.bytes = input;
if (input == null || input.getOffset() <= offset) {
position = 0;
} else {
position = input.getOffset() - offset;
}
setCurrent(input, true);
}

@Override
public int read() {
if (decrypted == null || decrypted.remaining() == 0) {
Expand Down Expand Up @@ -230,7 +246,6 @@ private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
*/
static class EncryptionState {
private final Object name;
private final EncryptionAlgorithm algorithm;
private final Key key;
private final byte[] iv;
private final Cipher cipher;
Expand All @@ -240,7 +255,7 @@ static class EncryptionState {
EncryptionState(Object name, long offset, StreamOptions options) {
this.name = name;
this.offset = offset;
algorithm = options.getAlgorithm();
EncryptionAlgorithm algorithm = options.getAlgorithm();
key = options.getKey();
iv = options.getIv();
cipher = algorithm.createCipher();
Expand Down Expand Up @@ -347,9 +362,8 @@ protected void setCurrent(DiskRangeList newRange, boolean isJump) {
// what is the position of the start of the newRange?
currentOffset = newRange.getOffset();
ByteBuffer encrypted = newRange.getData().slice();
int ignoreBytes = 0;
if (currentOffset < offset) {
ignoreBytes = (int) (offset - currentOffset);
int ignoreBytes = (int) (offset - currentOffset);
encrypted.position(ignoreBytes);
currentOffset = offset;
}
Expand Down Expand Up @@ -382,12 +396,10 @@ public String toString() {
}

private static class CompressedStream extends InStream {
private DiskRangeList bytes;
private final int bufferSize;
private ByteBuffer uncompressed;
private final CompressionCodec codec;
protected ByteBuffer compressed;
protected long position;
protected DiskRangeList currentRange;
private boolean isUncompressedOriginal;

Expand Down Expand Up @@ -426,25 +438,6 @@ public CompressedStream(Object name,
reset(input);
}

/**
* Reset the input to a new set of data.
* @param input the input data
*/
void reset(DiskRangeList input) {
bytes = input;
while (input != null &&
(input.getEnd() <= offset ||
input.getOffset() > offset + length)) {
input = input.next;
}
if (input == null || input.getOffset() <= offset) {
position = 0;
} else {
position = input.getOffset() - offset;
}
setCurrent(input, true);
}

private void allocateForUncompressed(int size, boolean isDirect) {
uncompressed = allocateBuffer(size, isDirect);
}
Expand Down
52 changes: 52 additions & 0 deletions java/core/src/test/org/apache/orc/impl/TestInStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,58 @@ public void testMultiRangeCompressed() throws IOException {
}
}

@Test
public void testExtraFrontUncompressed() throws IOException {
// Set up a stream that starts at START, which is divided in to regions
// of CHUNK_LENGTH. There are two EXTRA_FRONT byte buffers in front of the
// stream.
final long START = 1_000_000_000;
final int EXTRA_FRONT = 3_000;
final int CHUNK_LENGTH = 100;
final int STREAM_LENGTH = 4096;

BufferChunkList list = new BufferChunkList();
list.add(new BufferChunk(ByteBuffer.allocate(EXTRA_FRONT),
START - 2 * EXTRA_FRONT));
byte[] extraFront = new byte[EXTRA_FRONT + CHUNK_LENGTH];
Arrays.fill(extraFront, (byte) -1);
for(int i=0; i < CHUNK_LENGTH; ++i) {
extraFront[EXTRA_FRONT + i] = (byte) i;
}
list.add(new BufferChunk(ByteBuffer.wrap(extraFront), START - EXTRA_FRONT));
byte[] expected = new byte[STREAM_LENGTH];
for(int i=CHUNK_LENGTH; i < expected.length; ++i) {
expected[i] = (byte) i;
}
int posn = CHUNK_LENGTH;
while (posn <= expected.length) {
list.add(new BufferChunk(
ByteBuffer.wrap(expected, posn,
Math.min(CHUNK_LENGTH, expected.length - posn)),
START + posn));
posn += CHUNK_LENGTH;
}

// now set up the stream to read it
InStream.StreamOptions options = InStream.options();
InStream inStream = InStream.create("test", list.get(), START, STREAM_LENGTH,
options);

// ensure the data is correct
byte[] inBuffer = new byte[STREAM_LENGTH];
posn = 0;
int read = inStream.read(inBuffer);
while (read != -1) {
assertEquals("Read length at " + posn,
Math.min(STREAM_LENGTH - posn, CHUNK_LENGTH), read);
for(int i=0; i < read; ++i) {
assertEquals("posn " + posn + " + " + i, (byte)(posn + i), inBuffer[i]);
}
posn += read;
read = inStream.read(inBuffer);
}
}

@Test
public void testExtraFrontCompressed() throws IOException {
// Set up a stream that starts at START, which is divided in to regions
Expand Down

0 comments on commit 1b24716

Please sign in to comment.