Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a StreamInterceptor interface to allow users to plug in custom interceptors for formats like Zstd. #930

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dependencies {
testImplementation("org.hamcrest:hamcrest:2.2")
testImplementation("pl.pragmatists:JUnitParams:1.1.1")
testImplementation("com.google.code.tempus-fugit:tempus-fugit:1.1")
testImplementation("com.github.luben:zstd-jni:1.5.6-5")
}

group = "com.amazon.ion"
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/com/amazon/ion/GZIPStreamInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion;

import java.io.IOException;
import java.io.InputStream;
import java.util.zip.GZIPInputStream;

/**
* The interceptor for GZIP streams.
*/
public enum GZIPStreamInterceptor implements StreamInterceptor {

INSTANCE;

private static final byte[] GZIP_HEADER = {0x1F, (byte) 0x8B};

@Override
public String formatName() {
return "GZIP";
}

@Override
public int headerLength() {
return GZIP_HEADER.length;
}

@Override
public boolean matchesHeader(byte[] candidate, int offset, int length) {
if (candidate == null || length < GZIP_HEADER.length) {
return false;
}

for (int i = 0; i < GZIP_HEADER.length; i++) {
if (GZIP_HEADER[i] != candidate[offset + i]) {
return false;
}
}
return true;
}

@Override
public InputStream newInputStream(InputStream interceptedStream) throws IOException {
return new GZIPInputStream(interceptedStream);
}
}
48 changes: 48 additions & 0 deletions src/main/java/com/amazon/ion/StreamInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion;

import com.amazon.ion.system.IonReaderBuilder;

import java.io.IOException;
import java.io.InputStream;

/**
* An interceptor to be consulted by the {@link com.amazon.ion.system.IonReaderBuilder} when creating an
* {@link IonReader} over a user-provided stream. This allows users to configure a sequence of interceptors
* to allow transformation of the stream's raw bytes into valid text or binary Ion.
*
* @see com.amazon.ion.system.IonReaderBuilder#addStreamInterceptor(StreamInterceptor)
* @see com.amazon.ion.system.IonSystemBuilder#withReaderBuilder(IonReaderBuilder)
*/
public interface StreamInterceptor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Stream" has several meanings in this package, I suggest renaming to InputStreamInterceptor to be more specific.


/**
* The name of the format the interceptor recognizes.
* @return a constant String.
*/
String formatName();

/**
* The length of the byte header that identifies streams in this format.
* @return the length in bytes.
*/
int headerLength();
Comment on lines +26 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since some formats (eg Ion itself) may have variable-length headers, I'd make this a bit more general. Maybe headerMatchLength.


/**
* Determines whether the given candidate byte sequence matches this format.
* @param candidate the candidate byte sequence.
* @param offset the offset into the candidate bytes to begin matching.
* @param length the number of bytes (beginning at 'offset') in the candidate byte sequence.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some connection between this length and the headerLength?

The class could use more docs on exactly how the matching process works. After seeing headerLength I expected this method to receive that number of bytes in candidate, and nothing else.

* @return true if the candidate byte sequence matches; otherwise, false.
*/
boolean matchesHeader(byte[] candidate, int offset, int length);

/**
* Creates a new InputStream that transforms the bytes in the given InputStream into valid text or binary Ion.
* @param interceptedStream the stream containing bytes in this format.
* @return a new InputStream.
* @throws IOException if thrown when constructing the new InputStream.
*/
InputStream newInputStream(InputStream interceptedStream) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether this should have a sibling method for character streams, so one can accomplish transformations of text inputs.

For example, suppose I want to teach the Ion reader to ignore shebang lines atop my Fusion scripts...

}
65 changes: 32 additions & 33 deletions src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package com.amazon.ion.impl;

import com.amazon.ion.IonCatalog;
import com.amazon.ion.IonException;
import com.amazon.ion.IonReader;
import com.amazon.ion.IonTextReader;
import com.amazon.ion.IonValue;
import com.amazon.ion.StreamInterceptor;
import com.amazon.ion.system.IonReaderBuilder;
import com.amazon.ion.util.IonStreamUtils;

Expand All @@ -16,7 +16,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.util.zip.GZIPInputStream;

import static com.amazon.ion.impl.LocalSymbolTable.DEFAULT_LST_FACTORY;
import static com.amazon.ion.impl._Private_IonReaderFactory.makeReader;
Expand Down Expand Up @@ -200,16 +199,18 @@ static IonReader buildReader(
IonReaderFromBytesFactoryBinary binary,
IonReaderFromBytesFactoryText text
) {
if (IonStreamUtils.isGzip(ionData, offset, length)) {
try {
return buildReader(
builder,
new GZIPInputStream(new ByteArrayInputStream(ionData, offset, length)),
_Private_IonReaderFactory::makeReaderBinary,
_Private_IonReaderFactory::makeReaderText
);
} catch (IOException e) {
throw new IonException(e);
for (StreamInterceptor streamInterceptor : builder.getStreamInterceptors()) {
if (streamInterceptor.matchesHeader(ionData, offset, length)) {
try {
return buildReader(
builder,
streamInterceptor.newInputStream(new ByteArrayInputStream(ionData, offset, length)),
_Private_IonReaderFactory::makeReaderBinary,
_Private_IonReaderFactory::makeReaderText
);
} catch (IOException e) {
throw new IonException(e);
}
}
}
if (IonStreamUtils.isIonBinary(ionData, offset, length)) {
Expand Down Expand Up @@ -247,15 +248,6 @@ private static boolean startsWithIvm(byte[] buffer, int length) {
return true;
}

static final byte[] GZIP_HEADER = {0x1F, (byte) 0x8B};

private static boolean startsWithGzipHeader(byte[] buffer, int length) {
if (length >= GZIP_HEADER.length) {
return buffer[0] == GZIP_HEADER[0] && buffer[1] == GZIP_HEADER[1];
}
return false;
}

@FunctionalInterface
interface IonReaderFromInputStreamFactoryText {
IonReader makeReader(IonCatalog catalog, InputStream source, _Private_LocalSymbolTableFactory lstFactory);
Expand All @@ -275,11 +267,15 @@ static IonReader buildReader(
if (source == null) {
throw new NullPointerException("Cannot build a reader from a null InputStream.");
}
int maxHeaderLength = Math.max(
_Private_IonConstants.BINARY_VERSION_MARKER_SIZE,
builder.getStreamInterceptors().stream().mapToInt(StreamInterceptor::headerLength).max().orElse(0)
);
// Note: this can create a lot of layers of InputStream wrappers. For example, if this method is called
// from build(byte[]) and the bytes contain GZIP, the chain will be SequenceInputStream(ByteArrayInputStream,
// GZIPInputStream -> PushbackInputStream -> ByteArrayInputStream). If this creates a drag on efficiency,
// alternatives should be evaluated.
byte[] possibleIVM = new byte[_Private_IonConstants.BINARY_VERSION_MARKER_SIZE];
byte[] possibleIVM = new byte[maxHeaderLength];
InputStream ionData = source;
int bytesRead;
try {
Expand All @@ -296,19 +292,22 @@ static IonReader buildReader(
// stream will always be empty (in which case it doesn't matter whether a text or binary reader is used)
// or it's a binary stream (in which case the correct reader was created) or it's a growing text stream
// (which has always been unsupported).
if (startsWithGzipHeader(possibleIVM, bytesRead)) {
try {
ionData = new GZIPInputStream(
new TwoElementSequenceInputStream(new ByteArrayInputStream(possibleIVM, 0, bytesRead), ionData)
);
for (StreamInterceptor streamInterceptor : builder.getStreamInterceptors()) {
if (streamInterceptor.matchesHeader(possibleIVM, 0, bytesRead)) {
try {
bytesRead = ionData.read(possibleIVM);
} catch (EOFException e) {
// Only a GZIP header was available, so this may be a binary Ion stream.
bytesRead = 0;
ionData = streamInterceptor.newInputStream(
new TwoElementSequenceInputStream(new ByteArrayInputStream(possibleIVM, 0, bytesRead), ionData)
);
try {
bytesRead = ionData.read(possibleIVM);
} catch (EOFException e) {
// Only a compression format header was available, so this may be a binary Ion stream.
bytesRead = 0;
}
} catch (IOException e) {
throw new IonException(e);
}
} catch (IOException e) {
throw new IonException(e);
break;
}
}
if (startsWithIvm(possibleIVM, bytesRead)) {
Expand Down
48 changes: 33 additions & 15 deletions src/main/java/com/amazon/ion/system/IonReaderBuilder.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,8 @@
/*
* Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion.system;

import com.amazon.ion.GZIPStreamInterceptor;
import com.amazon.ion.IonBufferConfiguration;
import com.amazon.ion.IonCatalog;
import com.amazon.ion.IonException;
Expand All @@ -23,11 +11,15 @@
import com.amazon.ion.IonSystem;
import com.amazon.ion.IonTextReader;
import com.amazon.ion.IonValue;
import com.amazon.ion.StreamInterceptor;
import com.amazon.ion.impl._Private_IonReaderBuilder;

import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Build a new {@link IonReader} from the given {@link IonCatalog} and data
Expand All @@ -45,6 +37,7 @@ public abstract class IonReaderBuilder
private IonCatalog catalog = null;
private boolean isIncrementalReadingEnabled = false;
private IonBufferConfiguration bufferConfiguration = IonBufferConfiguration.DEFAULT;
private List<StreamInterceptor> streamInterceptors = new ArrayList<>(Collections.singletonList(GZIPStreamInterceptor.INSTANCE));

protected IonReaderBuilder()
{
Expand All @@ -55,6 +48,7 @@ protected IonReaderBuilder(IonReaderBuilder that)
this.catalog = that.catalog;
this.isIncrementalReadingEnabled = that.isIncrementalReadingEnabled;
this.bufferConfiguration = that.bufferConfiguration;
this.streamInterceptors = new ArrayList<>(that.streamInterceptors);
}

/**
Expand Down Expand Up @@ -263,6 +257,30 @@ public IonBufferConfiguration getBufferConfiguration() {
return bufferConfiguration;
}

/**
* Adds a {@link StreamInterceptor} to the end of the list that the builder will apply
* in order to each stream before creating {@link IonReader} instances over that stream.
* {@link GZIPStreamInterceptor} is always consulted first, and need not be added.
*
* @param streamInterceptor the stream interceptor to add.
*
* @return this builder instance, if mutable;
* otherwise a mutable copy of this builder.
*/
public IonReaderBuilder addStreamInterceptor(StreamInterceptor streamInterceptor) {
IonReaderBuilder b = mutable();
b.streamInterceptors.add(streamInterceptor);
return b;
}

/**
* @see #addStreamInterceptor(StreamInterceptor)
* @return an unmodifiable view of the stream interceptors currently configured.
*/
public List<StreamInterceptor> getStreamInterceptors() {
return Collections.unmodifiableList(streamInterceptors);
}

/**
* Based on the builder's configuration properties, creates a new IonReader
* instance over the given block of Ion data, detecting whether it's text or
Expand Down
30 changes: 15 additions & 15 deletions src/test/java/com/amazon/ion/system/IonReaderBuilderTest.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
/*
* Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion.system;

import static com.amazon.ion.TestUtils.gzippedBytes;
Expand All @@ -26,18 +13,21 @@
import static org.junit.Assert.fail;

import com.amazon.ion.BitUtils;
import com.amazon.ion.GZIPStreamInterceptor;
import com.amazon.ion.IonBufferConfiguration;
import com.amazon.ion.IonCatalog;
import com.amazon.ion.IonException;
import com.amazon.ion.IonReader;
import com.amazon.ion.IonType;
import com.amazon.ion.IonWriter;
import com.amazon.ion.StreamInterceptor;
import com.amazon.ion.impl.ResizingPipedInputStream;
import com.amazon.ion.impl._Private_IonBinaryWriterBuilder;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.zip.GZIPOutputStream;

import com.amazon.ion.impl._Private_IonConstants;
Expand Down Expand Up @@ -234,4 +224,14 @@ public void incompleteIvmFailsCleanly(boolean isIncremental) throws Exception {
reader.close();
}

@Test
public void gzipInterceptorEnabledByDefault() {
IonReaderBuilder builder = IonReaderBuilder.standard();
List<StreamInterceptor> interceptors = builder.getStreamInterceptors();
assertEquals(1, interceptors.size());
assertEquals(GZIPStreamInterceptor.INSTANCE.formatName(), interceptors.get(0).formatName());
// The list returned from IonReaderBuilder.getStreamInterceptors() is unmodifiable.
assertThrows(UnsupportedOperationException.class, () -> interceptors.add(GZIPStreamInterceptor.INSTANCE));
}

}
Loading
Loading