Skip to content

Commit

Permalink
Improve ORC double and float performance with batch reads
Browse files Browse the repository at this point in the history
Speedup:

Benchmark                                  Speedup
BenchmarkStreamReaders.readDoubleNoNull      3.93x
BenchmarkStreamReaders.readDoubleWithNull    1.57x
BenchmarkStreamReaders.readFloatNoNull       5.17x
BenchmarkStreamReaders.readFloatWithNull     1.70x

Before:

Benchmark                                  Mode  Cnt  Score   Error  Units
BenchmarkStreamReaders.readDoubleNoNull    avgt   60  0.228 ± 0.004   s/op
BenchmarkStreamReaders.readDoubleWithNull  avgt   60  0.173 ± 0.002   s/op
BenchmarkStreamReaders.readFloatNoNull     avgt   60  0.212 ± 0.003   s/op
BenchmarkStreamReaders.readFloatWithNull   avgt   60  0.179 ± 0.003   s/op

After:

Benchmark                                  Mode  Cnt  Score   Error  Units
BenchmarkStreamReaders.readDoubleNoNull    avgt   60  0.058 ± 0.001   s/op
BenchmarkStreamReaders.readDoubleWithNull  avgt   60  0.110 ± 0.002   s/op
BenchmarkStreamReaders.readFloatNoNull     avgt   60  0.041 ± 0.001   s/op
BenchmarkStreamReaders.readFloatWithNull   avgt   60  0.105 ± 0.001   s/op
  • Loading branch information
dain committed Mar 13, 2019
1 parent 8b95182 commit 63ec248
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.prestosql.orc.stream.InputStreamSource;
import io.prestosql.orc.stream.InputStreamSources;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.block.RunLengthEncodedBlock;
import io.prestosql.spi.type.Type;
import org.openjdk.jol.info.ClassLayout;
Expand Down Expand Up @@ -98,36 +97,39 @@ public Block readBlock(Type type)
}
}

if (dataStream == null && presentStream != null) {
if (dataStream == null) {
if (presentStream == null) {
throw new OrcCorruptionException(streamDescriptor.getOrcDataSourceId(), "Value is not null but data stream is not present");
}
presentStream.skip(nextBatchSize);
Block nullValueBlock = RunLengthEncodedBlock.create(type, null, nextBatchSize);
readOffset = 0;
nextBatchSize = 0;
return nullValueBlock;
}

BlockBuilder builder = type.createBlockBuilder(null, nextBatchSize);
Block block;
if (presentStream == null) {
if (dataStream == null) {
throw new OrcCorruptionException(streamDescriptor.getOrcDataSourceId(), "Value is not null but data stream is not present");
}
dataStream.nextVector(type, nextBatchSize, builder);
block = dataStream.nextBlock(type, nextBatchSize);
}
else {
for (int i = 0; i < nextBatchSize; i++) {
if (presentStream.nextBit()) {
type.writeDouble(builder, dataStream.next());
}
else {
builder.appendNull();
}
boolean[] isNull = new boolean[nextBatchSize];
int nullCount = presentStream.getUnsetBits(nextBatchSize, isNull);
if (nullCount == 0) {
block = dataStream.nextBlock(type, nextBatchSize);
}
else if (nullCount != nextBatchSize) {
block = dataStream.nextBlock(type, isNull);
}
else {
block = RunLengthEncodedBlock.create(type, null, nextBatchSize);
}
}

readOffset = 0;
nextBatchSize = 0;

return builder.build();
return block;
}

private void openRowGroup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.prestosql.orc.stream.InputStreamSource;
import io.prestosql.orc.stream.InputStreamSources;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.block.RunLengthEncodedBlock;
import io.prestosql.spi.type.Type;
import org.openjdk.jol.info.ClassLayout;
Expand All @@ -38,7 +37,6 @@
import static io.prestosql.orc.metadata.Stream.StreamKind.DATA;
import static io.prestosql.orc.metadata.Stream.StreamKind.PRESENT;
import static io.prestosql.orc.stream.MissingInputStreamSource.missingStreamSource;
import static java.lang.Float.floatToRawIntBits;
import static java.util.Objects.requireNonNull;

public class FloatStreamReader
Expand Down Expand Up @@ -99,36 +97,39 @@ public Block readBlock(Type type)
}
}

if (dataStream == null && presentStream != null) {
if (dataStream == null) {
if (presentStream == null) {
throw new OrcCorruptionException(streamDescriptor.getOrcDataSourceId(), "Value is not null but data stream is not present");
}
presentStream.skip(nextBatchSize);
Block nullValueBlock = RunLengthEncodedBlock.create(type, null, nextBatchSize);
readOffset = 0;
nextBatchSize = 0;
return nullValueBlock;
}

BlockBuilder builder = type.createBlockBuilder(null, nextBatchSize);
Block block;
if (presentStream == null) {
if (dataStream == null) {
throw new OrcCorruptionException(streamDescriptor.getOrcDataSourceId(), "Value is not null but data stream is not present");
}
dataStream.nextVector(type, nextBatchSize, builder);
block = dataStream.nextBlock(type, nextBatchSize);
}
else {
for (int i = 0; i < nextBatchSize; i++) {
if (presentStream.nextBit()) {
type.writeLong(builder, floatToRawIntBits(dataStream.next()));
}
else {
builder.appendNull();
}
boolean[] isNull = new boolean[nextBatchSize];
int nullCount = presentStream.getUnsetBits(nextBatchSize, isNull);
if (nullCount == 0) {
block = dataStream.nextBlock(type, nextBatchSize);
}
else if (nullCount != nextBatchSize) {
block = dataStream.nextBlock(type, isNull);
}
else {
block = RunLengthEncodedBlock.create(type, null, nextBatchSize);
}
}

readOffset = 0;
nextBatchSize = 0;

return builder.build();
return block;
}

private void openRowGroup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.prestosql.orc.checkpoint.DoubleStreamCheckpoint;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.type.Type;

import java.io.IOException;

import static io.airlift.slice.SizeOf.SIZE_OF_DOUBLE;
import static java.lang.Integer.min;

public class DoubleInputStream
implements ValueInputStream<DoubleStreamCheckpoint>
{
private static final int BUFFER_SIZE = 128;
private final OrcInputStream input;
private final byte[] buffer = new byte[SIZE_OF_DOUBLE];
private final byte[] buffer = new byte[SIZE_OF_DOUBLE * BUFFER_SIZE];
private final Slice slice = Slices.wrappedBuffer(buffer);

public DoubleInputStream(OrcInputStream input)
Expand Down Expand Up @@ -63,11 +66,51 @@ public double next()
return slice.getDouble(0);
}

public void nextVector(Type type, int items, BlockBuilder builder)
public Block nextBlock(Type type, boolean[] isNull)
throws IOException
{
for (int i = 0; i < items; i++) {
type.writeDouble(builder, next());
int items = isNull.length;
BlockBuilder blockBuilder = type.createBlockBuilder(null, items);

for (int batchBase = 0; batchBase < items; batchBase += BUFFER_SIZE) {
int batchSize = min(items - batchBase, BUFFER_SIZE);

// stream is null suppressed, so count the present values
int nonNullCount = 0;
for (int i = batchBase; i < batchBase + batchSize; i++) {
if (!isNull[i]) {
nonNullCount++;
}
}
input.readFully(buffer, 0, SIZE_OF_DOUBLE * nonNullCount);

int bufferIndex = 0;
for (int i = batchBase; i < batchBase + batchSize; i++) {
if (!isNull[i]) {
type.writeDouble(blockBuilder, slice.getDouble(bufferIndex * SIZE_OF_DOUBLE));
bufferIndex++;
}
else {
blockBuilder.appendNull();
}
}
}
return blockBuilder.build();
}

public Block nextBlock(Type type, int items)
throws IOException
{
BlockBuilder blockBuilder = type.createBlockBuilder(null, items);
for (int batchBase = 0; batchBase < items; batchBase += BUFFER_SIZE) {
int batchSize = min(items - batchBase, BUFFER_SIZE);

input.readFully(buffer, 0, SIZE_OF_DOUBLE * batchSize);

for (int i = 0; i < batchSize; i++) {
type.writeDouble(blockBuilder, slice.getDouble(i * SIZE_OF_DOUBLE));
}
}
return blockBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.prestosql.orc.checkpoint.FloatStreamCheckpoint;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.type.Type;

import java.io.IOException;

import static io.airlift.slice.SizeOf.SIZE_OF_FLOAT;
import static java.lang.Float.floatToRawIntBits;
import static java.lang.Float.floatToIntBits;
import static java.lang.Integer.min;

public class FloatInputStream
implements ValueInputStream<FloatStreamCheckpoint>
{
private static final int BUFFER_SIZE = 128;
private final OrcInputStream input;
private final byte[] buffer = new byte[SIZE_OF_FLOAT];
private final byte[] buffer = new byte[SIZE_OF_FLOAT * BUFFER_SIZE];
private final Slice slice = Slices.wrappedBuffer(buffer);

public FloatInputStream(OrcInputStream input)
Expand Down Expand Up @@ -64,11 +67,51 @@ public float next()
return slice.getFloat(0);
}

public void nextVector(Type type, int items, BlockBuilder builder)
public Block nextBlock(Type type, boolean[] isNull)
throws IOException
{
for (int i = 0; i < items; i++) {
type.writeLong(builder, floatToRawIntBits(next()));
int items = isNull.length;
BlockBuilder blockBuilder = type.createBlockBuilder(null, items);

for (int batchBase = 0; batchBase < items; batchBase += BUFFER_SIZE) {
int batchSize = min(items - batchBase, BUFFER_SIZE);

// stream is null suppressed, so count the present values
int nonNullCount = 0;
for (int i = batchBase; i < batchBase + batchSize; i++) {
if (!isNull[i]) {
nonNullCount++;
}
}
input.readFully(buffer, 0, SIZE_OF_FLOAT * nonNullCount);

int bufferIndex = 0;
for (int i = batchBase; i < batchBase + batchSize; i++) {
if (!isNull[i]) {
type.writeLong(blockBuilder, floatToIntBits(slice.getFloat(bufferIndex * SIZE_OF_FLOAT)));
bufferIndex++;
}
else {
blockBuilder.appendNull();
}
}
}
return blockBuilder.build();
}

public Block nextBlock(Type type, int items)
throws IOException
{
BlockBuilder blockBuilder = type.createBlockBuilder(null, items);
for (int batchBase = 0; batchBase < items; batchBase += BUFFER_SIZE) {
int batchSize = min(items - batchBase, BUFFER_SIZE);

input.readFully(buffer, 0, SIZE_OF_FLOAT * batchSize);

for (int i = 0; i < batchSize; i++) {
type.writeLong(blockBuilder, floatToIntBits(slice.getFloat(i * SIZE_OF_FLOAT)));
}
}
return blockBuilder.build();
}
}

0 comments on commit 63ec248

Please sign in to comment.