Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-523: Update ReaderImpl to work column encryption. #408

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions java/core/src/java/org/apache/orc/OrcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,17 @@ TypeDescription convertTypeFromProtobuf(List<OrcProto.Type> types,

public static List<StripeInformation> convertProtoStripesToStripes(
List<OrcProto.StripeInformation> stripes) {
List<StripeInformation> result = new ArrayList<StripeInformation>(stripes.size());
for (OrcProto.StripeInformation info : stripes) {
result.add(new ReaderImpl.StripeInformationImpl(info));
List<StripeInformation> result = new ArrayList<>(stripes.size());
long previousStripeId = -1;
byte[][] previousKeys = null;
long stripeId = 0;
for (OrcProto.StripeInformation stripeProto: stripes) {
ReaderImpl.StripeInformationImpl stripe =
new ReaderImpl.StripeInformationImpl(stripeProto, stripeId++,
previousStripeId, previousKeys);
result.add(stripe);
previousStripeId = stripe.getEncryptionStripeId();
previousKeys = stripe.getEncryptedLocalKeys();
}
return result;
}
Expand Down
35 changes: 24 additions & 11 deletions java/core/src/java/org/apache/orc/PhysicalWriter.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -22,6 +22,8 @@
import java.nio.ByteBuffer;

import org.apache.orc.impl.StreamName;
import org.apache.orc.impl.writer.StreamOptions;
import org.apache.orc.impl.writer.WriterEncryptionVariant;

/**
* This interface separates the physical layout of ORC files from the higher
Expand All @@ -39,7 +41,6 @@ interface OutputReceiver {
* Output the given buffer to the final destination
*
* @param buffer the buffer to output
* @throws IOException
*/
void output(ByteBuffer buffer) throws IOException;

Expand All @@ -48,24 +49,22 @@ interface OutputReceiver {
*/
void suppress();
}

/**
* Writes the header of the file, which consists of the magic "ORC" bytes.
* @throws IOException
*/
void writeHeader() throws IOException;

/**
* Create an OutputReceiver for the given name.
* @param name the name of the stream
* @throws IOException
*/
OutputReceiver createDataStream(StreamName name) throws IOException;

/**
* Write an index in the given stream name.
* @param name the name of the stream
* @param index the bloom filter to write
* @param codec the compression codec to use
*/
void writeIndex(StreamName name,
OrcProto.RowIndex.Builder index) throws IOException;
Expand All @@ -74,7 +73,6 @@ void writeIndex(StreamName name,
* Write a bloom filter index in the given stream name.
* @param name the name of the stream
* @param bloom the bloom filter to write
* @param codec the compression codec to use
*/
void writeBloomFilter(StreamName name,
OrcProto.BloomFilterIndex.Builder bloom) throws IOException;
Expand All @@ -89,6 +87,16 @@ void writeBloomFilter(StreamName name,
void finalizeStripe(OrcProto.StripeFooter.Builder footer,
OrcProto.StripeInformation.Builder dirEntry) throws IOException;

/**
* Write a stripe or file statistics to the file.
* @param name the name of the stream
* @param statistics the statistics to write
* @throws IOException
*/
void writeStatistics(StreamName name,
OrcProto.ColumnStatistics.Builder statistics
) throws IOException;

/**
* Writes out the file metadata.
* @param builder Metadata builder to finalize and write.
Expand Down Expand Up @@ -122,19 +130,24 @@ void finalizeStripe(OrcProto.StripeFooter.Builder footer,
* @param stripe Stripe data buffer.
* @param dirEntry File metadata entry for the stripe, to be updated with
* relevant data.
* @throws IOException
*/
void appendRawStripe(ByteBuffer stripe,
OrcProto.StripeInformation.Builder dirEntry
) throws IOException;

/** Gets a compression codec used by this writer. */
CompressionCodec getCompressionCodec();

/**
* Get the number of bytes for a file in a givem column.
* @param column column from which to get file size
* @param variant the encryption variant to check
* @return number of bytes for the given column
*/
long getFileBytes(int column);
long getFileBytes(int column, WriterEncryptionVariant variant);

/**
* Get the unencrypted stream options for this file. This class needs the
* stream options to write the indexes and footers.
*
* Additionally, the LLAP CacheWriter wants to disable the generic compression.
*/
StreamOptions getStreamOptions();
}
21 changes: 21 additions & 0 deletions java/core/src/java/org/apache/orc/StripeInformation.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,25 @@ public interface StripeInformation {
* @return a count of the number of rows
*/
long getNumberOfRows();

/**
* Get the index of this stripe in the current file.
* @return 0 to number_of_stripes - 1
*/
long getStripeId();

/**
* Get the original stripe id that was used when the stripe was originally
* written. This is only different that getStripeId in merged files.
* @return the original stripe id
*/
long getEncryptionStripeId();

/**
* Get the encrypted keys starting from this stripe until overridden by
* a new set in a following stripe. The top level array is one for each
* encryption variant. Each element is an encrypted key.
* @return the array of encrypted keys
*/
byte[][] getEncryptedLocalKeys();
}
5 changes: 5 additions & 0 deletions java/core/src/java/org/apache/orc/impl/BitFieldWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.orc.impl;

import java.io.IOException;
import java.util.function.Consumer;

public class BitFieldWriter {
private RunLengthByteWriter output;
Expand Down Expand Up @@ -70,4 +71,8 @@ public void getPosition(PositionRecorder recorder) throws IOException {
public long estimateMemory() {
return output.estimateMemory();
}

public void changeIv(Consumer<byte[]> modifier) {
output.changeIv(modifier);
}
}
29 changes: 19 additions & 10 deletions java/core/src/java/org/apache/orc/impl/BufferChunk.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
package org.apache.orc.impl;

/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -18,6 +16,8 @@
* limitations under the License.
*/

package org.apache.orc.impl;

import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.slf4j.Logger;
Expand All @@ -34,15 +34,20 @@ public class BufferChunk extends DiskRangeList {

private static final Logger LOG =
LoggerFactory.getLogger(BufferChunk.class);
final ByteBuffer chunk;
private ByteBuffer chunk;

public BufferChunk(long offset, int length) {
super(offset, offset + length);
chunk = null;
}

public BufferChunk(ByteBuffer chunk, long offset) {
super(offset, offset + chunk.remaining());
this.chunk = chunk;
}

public ByteBuffer getChunk() {
return chunk;
public void setChunk(ByteBuffer chunk) {
this.chunk = chunk;
}

@Override
Expand All @@ -52,10 +57,14 @@ public boolean hasData() {

@Override
public final String toString() {
boolean makesSense = chunk.remaining() == (end - offset);
return "data range [" + offset + ", " + end + "), size: " + chunk.remaining()
+ (makesSense ? "" : "(!)") + " type: " +
(chunk.isDirect() ? "direct" : "array-backed");
if (chunk == null) {
return "data range[" + offset + ", " + end +")";
} else {
boolean makesSense = chunk.remaining() == (end - offset);
return "data range [" + offset + ", " + end + "), size: " + chunk.remaining()
+ (makesSense ? "" : "(!)") + " type: " +
(chunk.isDirect() ? "direct" : "array-backed");
}
}

@Override
Expand Down
14 changes: 14 additions & 0 deletions java/core/src/java/org/apache/orc/impl/BufferChunkList.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void add(BufferChunk value) {
} else {
tail.next = value;
value.prev = tail;
value.next = null;
tail = value;
}
}
Expand All @@ -40,6 +41,19 @@ public BufferChunk get() {
return head;
}

/**
* Get the nth element of the list
* @param chunk the element number to get from 0
* @return the given element number
*/
public BufferChunk get(int chunk) {
BufferChunk ptr = head;
for(int i=0; i < chunk; ++i) {
ptr = ptr == null ? null : (BufferChunk) ptr.next;
}
return ptr;
}

public void clear() {
head = null;
tail = null;
Expand Down
Loading