diff --git a/build.gradle.kts b/build.gradle.kts index 9df5b7f987..9451d2550c 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -79,6 +79,7 @@ dependencies { testImplementation("org.hamcrest:hamcrest:2.2") testImplementation("pl.pragmatists:JUnitParams:1.1.1") testImplementation("com.google.code.tempus-fugit:tempus-fugit:1.1") + testImplementation("com.github.luben:zstd-jni:1.5.6-5") } group = "com.amazon.ion" diff --git a/src/main/java/com/amazon/ion/GZIPStreamInterceptor.java b/src/main/java/com/amazon/ion/GZIPStreamInterceptor.java new file mode 100644 index 0000000000..a3706ca741 --- /dev/null +++ b/src/main/java/com/amazon/ion/GZIPStreamInterceptor.java @@ -0,0 +1,46 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.amazon.ion; + +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; + +/** + * The interceptor for GZIP streams. + */ +public enum GZIPStreamInterceptor implements StreamInterceptor { + + INSTANCE; + + private static final byte[] GZIP_HEADER = {0x1F, (byte) 0x8B}; + + @Override + public String formatName() { + return "GZIP"; + } + + @Override + public int headerLength() { + return GZIP_HEADER.length; + } + + @Override + public boolean matchesHeader(byte[] candidate, int offset, int length) { + if (candidate == null || length < GZIP_HEADER.length) { + return false; + } + + for (int i = 0; i < GZIP_HEADER.length; i++) { + if (GZIP_HEADER[i] != candidate[offset + i]) { + return false; + } + } + return true; + } + + @Override + public InputStream newInputStream(InputStream interceptedStream) throws IOException { + return new GZIPInputStream(interceptedStream); + } +} diff --git a/src/main/java/com/amazon/ion/StreamInterceptor.java b/src/main/java/com/amazon/ion/StreamInterceptor.java new file mode 100644 index 0000000000..9e771d17ab --- /dev/null +++ b/src/main/java/com/amazon/ion/StreamInterceptor.java @@ -0,0 +1,48 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.amazon.ion; + +import com.amazon.ion.system.IonReaderBuilder; + +import java.io.IOException; +import java.io.InputStream; + +/** + * An interceptor to be consulted by the {@link com.amazon.ion.system.IonReaderBuilder} when creating an + * {@link IonReader} over a user-provided stream. This allows users to configure a sequence of interceptors + * to allow transformation of the stream's raw bytes into valid text or binary Ion. + * + * @see com.amazon.ion.system.IonReaderBuilder#addStreamInterceptor(StreamInterceptor) + * @see com.amazon.ion.system.IonSystemBuilder#withReaderBuilder(IonReaderBuilder) + */ +public interface StreamInterceptor { + + /** + * The name of the format the interceptor recognizes. + * @return a constant String. + */ + String formatName(); + + /** + * The length of the byte header that identifies streams in this format. + * @return the length in bytes. + */ + int headerLength(); + + /** + * Determines whether the given candidate byte sequence matches this format. + * @param candidate the candidate byte sequence. + * @param offset the offset into the candidate bytes to begin matching. + * @param length the number of bytes (beginning at 'offset') in the candidate byte sequence. + * @return true if the candidate byte sequence matches; otherwise, false. + */ + boolean matchesHeader(byte[] candidate, int offset, int length); + + /** + * Creates a new InputStream that transforms the bytes in the given InputStream into valid text or binary Ion. + * @param interceptedStream the stream containing bytes in this format. + * @return a new InputStream. + * @throws IOException if thrown when constructing the new InputStream. + */ + InputStream newInputStream(InputStream interceptedStream) throws IOException; +} diff --git a/src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java b/src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java index c7e56f8abd..96acca6e50 100644 --- a/src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java +++ b/src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java @@ -1,6 +1,5 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 - package com.amazon.ion.impl; import com.amazon.ion.IonCatalog; @@ -8,6 +7,7 @@ import com.amazon.ion.IonReader; import com.amazon.ion.IonTextReader; import com.amazon.ion.IonValue; +import com.amazon.ion.StreamInterceptor; import com.amazon.ion.system.IonReaderBuilder; import com.amazon.ion.util.IonStreamUtils; @@ -16,7 +16,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.Reader; -import java.util.zip.GZIPInputStream; import static com.amazon.ion.impl.LocalSymbolTable.DEFAULT_LST_FACTORY; import static com.amazon.ion.impl._Private_IonReaderFactory.makeReader; @@ -200,16 +199,18 @@ static IonReader buildReader( IonReaderFromBytesFactoryBinary binary, IonReaderFromBytesFactoryText text ) { - if (IonStreamUtils.isGzip(ionData, offset, length)) { - try { - return buildReader( - builder, - new GZIPInputStream(new ByteArrayInputStream(ionData, offset, length)), - _Private_IonReaderFactory::makeReaderBinary, - _Private_IonReaderFactory::makeReaderText - ); - } catch (IOException e) { - throw new IonException(e); + for (StreamInterceptor streamInterceptor : builder.getStreamInterceptors()) { + if (streamInterceptor.matchesHeader(ionData, offset, length)) { + try { + return buildReader( + builder, + streamInterceptor.newInputStream(new ByteArrayInputStream(ionData, offset, length)), + _Private_IonReaderFactory::makeReaderBinary, + _Private_IonReaderFactory::makeReaderText + ); + } catch (IOException e) { + throw new IonException(e); + } } } if (IonStreamUtils.isIonBinary(ionData, offset, length)) { @@ -247,15 +248,6 @@ private static boolean startsWithIvm(byte[] buffer, int length) { return true; } - static final byte[] GZIP_HEADER = {0x1F, (byte) 0x8B}; - - private static boolean startsWithGzipHeader(byte[] buffer, int length) { - if (length >= GZIP_HEADER.length) { - return buffer[0] == GZIP_HEADER[0] && buffer[1] == GZIP_HEADER[1]; - } - return false; - } - @FunctionalInterface interface IonReaderFromInputStreamFactoryText { IonReader makeReader(IonCatalog catalog, InputStream source, _Private_LocalSymbolTableFactory lstFactory); @@ -275,11 +267,15 @@ static IonReader buildReader( if (source == null) { throw new NullPointerException("Cannot build a reader from a null InputStream."); } + int maxHeaderLength = Math.max( + _Private_IonConstants.BINARY_VERSION_MARKER_SIZE, + builder.getStreamInterceptors().stream().mapToInt(StreamInterceptor::headerLength).max().orElse(0) + ); // Note: this can create a lot of layers of InputStream wrappers. For example, if this method is called // from build(byte[]) and the bytes contain GZIP, the chain will be SequenceInputStream(ByteArrayInputStream, // GZIPInputStream -> PushbackInputStream -> ByteArrayInputStream). If this creates a drag on efficiency, // alternatives should be evaluated. - byte[] possibleIVM = new byte[_Private_IonConstants.BINARY_VERSION_MARKER_SIZE]; + byte[] possibleIVM = new byte[maxHeaderLength]; InputStream ionData = source; int bytesRead; try { @@ -296,19 +292,22 @@ static IonReader buildReader( // stream will always be empty (in which case it doesn't matter whether a text or binary reader is used) // or it's a binary stream (in which case the correct reader was created) or it's a growing text stream // (which has always been unsupported). - if (startsWithGzipHeader(possibleIVM, bytesRead)) { - try { - ionData = new GZIPInputStream( - new TwoElementSequenceInputStream(new ByteArrayInputStream(possibleIVM, 0, bytesRead), ionData) - ); + for (StreamInterceptor streamInterceptor : builder.getStreamInterceptors()) { + if (streamInterceptor.matchesHeader(possibleIVM, 0, bytesRead)) { try { - bytesRead = ionData.read(possibleIVM); - } catch (EOFException e) { - // Only a GZIP header was available, so this may be a binary Ion stream. - bytesRead = 0; + ionData = streamInterceptor.newInputStream( + new TwoElementSequenceInputStream(new ByteArrayInputStream(possibleIVM, 0, bytesRead), ionData) + ); + try { + bytesRead = ionData.read(possibleIVM); + } catch (EOFException e) { + // Only a compression format header was available, so this may be a binary Ion stream. + bytesRead = 0; + } + } catch (IOException e) { + throw new IonException(e); } - } catch (IOException e) { - throw new IonException(e); + break; } } if (startsWithIvm(possibleIVM, bytesRead)) { diff --git a/src/main/java/com/amazon/ion/system/IonReaderBuilder.java b/src/main/java/com/amazon/ion/system/IonReaderBuilder.java index 0c0771749b..b143eb6380 100644 --- a/src/main/java/com/amazon/ion/system/IonReaderBuilder.java +++ b/src/main/java/com/amazon/ion/system/IonReaderBuilder.java @@ -1,20 +1,8 @@ -/* - * Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 package com.amazon.ion.system; +import com.amazon.ion.GZIPStreamInterceptor; import com.amazon.ion.IonBufferConfiguration; import com.amazon.ion.IonCatalog; import com.amazon.ion.IonException; @@ -23,11 +11,15 @@ import com.amazon.ion.IonSystem; import com.amazon.ion.IonTextReader; import com.amazon.ion.IonValue; +import com.amazon.ion.StreamInterceptor; import com.amazon.ion.impl._Private_IonReaderBuilder; import java.io.IOException; import java.io.InputStream; import java.io.Reader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; /** * Build a new {@link IonReader} from the given {@link IonCatalog} and data @@ -45,6 +37,7 @@ public abstract class IonReaderBuilder private IonCatalog catalog = null; private boolean isIncrementalReadingEnabled = false; private IonBufferConfiguration bufferConfiguration = IonBufferConfiguration.DEFAULT; + private List streamInterceptors = new ArrayList<>(Collections.singletonList(GZIPStreamInterceptor.INSTANCE)); protected IonReaderBuilder() { @@ -55,6 +48,7 @@ protected IonReaderBuilder(IonReaderBuilder that) this.catalog = that.catalog; this.isIncrementalReadingEnabled = that.isIncrementalReadingEnabled; this.bufferConfiguration = that.bufferConfiguration; + this.streamInterceptors = new ArrayList<>(that.streamInterceptors); } /** @@ -263,6 +257,30 @@ public IonBufferConfiguration getBufferConfiguration() { return bufferConfiguration; } + /** + * Adds a {@link StreamInterceptor} to the end of the list that the builder will apply + * in order to each stream before creating {@link IonReader} instances over that stream. + * {@link GZIPStreamInterceptor} is always consulted first, and need not be added. + * + * @param streamInterceptor the stream interceptor to add. + * + * @return this builder instance, if mutable; + * otherwise a mutable copy of this builder. + */ + public IonReaderBuilder addStreamInterceptor(StreamInterceptor streamInterceptor) { + IonReaderBuilder b = mutable(); + b.streamInterceptors.add(streamInterceptor); + return b; + } + + /** + * @see #addStreamInterceptor(StreamInterceptor) + * @return an unmodifiable view of the stream interceptors currently configured. + */ + public List getStreamInterceptors() { + return Collections.unmodifiableList(streamInterceptors); + } + /** * Based on the builder's configuration properties, creates a new IonReader * instance over the given block of Ion data, detecting whether it's text or diff --git a/src/test/java/com/amazon/ion/system/IonReaderBuilderTest.java b/src/test/java/com/amazon/ion/system/IonReaderBuilderTest.java index b7d379922e..7c20416bdc 100644 --- a/src/test/java/com/amazon/ion/system/IonReaderBuilderTest.java +++ b/src/test/java/com/amazon/ion/system/IonReaderBuilderTest.java @@ -1,18 +1,5 @@ -/* - * Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 package com.amazon.ion.system; import static com.amazon.ion.TestUtils.gzippedBytes; @@ -26,18 +13,21 @@ import static org.junit.Assert.fail; import com.amazon.ion.BitUtils; +import com.amazon.ion.GZIPStreamInterceptor; import com.amazon.ion.IonBufferConfiguration; import com.amazon.ion.IonCatalog; import com.amazon.ion.IonException; import com.amazon.ion.IonReader; import com.amazon.ion.IonType; import com.amazon.ion.IonWriter; +import com.amazon.ion.StreamInterceptor; import com.amazon.ion.impl.ResizingPipedInputStream; import com.amazon.ion.impl._Private_IonBinaryWriterBuilder; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.List; import java.util.zip.GZIPOutputStream; import com.amazon.ion.impl._Private_IonConstants; @@ -234,4 +224,14 @@ public void incompleteIvmFailsCleanly(boolean isIncremental) throws Exception { reader.close(); } + @Test + public void gzipInterceptorEnabledByDefault() { + IonReaderBuilder builder = IonReaderBuilder.standard(); + List interceptors = builder.getStreamInterceptors(); + assertEquals(1, interceptors.size()); + assertEquals(GZIPStreamInterceptor.INSTANCE.formatName(), interceptors.get(0).formatName()); + // The list returned from IonReaderBuilder.getStreamInterceptors() is unmodifiable. + assertThrows(UnsupportedOperationException.class, () -> interceptors.add(GZIPStreamInterceptor.INSTANCE)); + } + } diff --git a/src/test/java/com/amazon/ion/system/ZstdStreamInterceptorTest.java b/src/test/java/com/amazon/ion/system/ZstdStreamInterceptorTest.java new file mode 100644 index 0000000000..d2a90955b4 --- /dev/null +++ b/src/test/java/com/amazon/ion/system/ZstdStreamInterceptorTest.java @@ -0,0 +1,153 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package com.amazon.ion.system; + +import com.amazon.ion.IonReader; +import com.amazon.ion.IonType; +import com.amazon.ion.IonWriter; +import com.amazon.ion.StreamInterceptor; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import com.github.luben.zstd.ZstdInputStream; +import com.github.luben.zstd.ZstdOutputStream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Demonstrates how a StreamInterceptor that recognizes Zstd streams can be plugged into the IonReaderBuilder and + * IonSystem. + */ +public class ZstdStreamInterceptorTest { + + enum ZstdStreamInterceptor implements StreamInterceptor { + INSTANCE; + + private static final byte[] ZSTD_HEADER = {(byte) 0x28, (byte) 0xB5, (byte) 0x2F, (byte) 0xFD}; + + @Override + public String formatName() { + return "Zstd"; + } + + @Override + public int headerLength() { + return ZSTD_HEADER.length; + } + + @Override + public boolean matchesHeader(byte[] candidate, int offset, int length) { + if (candidate == null || length < ZSTD_HEADER.length) { + return false; + } + + for (int i = 0; i < ZSTD_HEADER.length; i++) { + if (ZSTD_HEADER[i] != candidate[offset + i]) { + return false; + } + } + return true; + } + + @Override + public InputStream newInputStream(InputStream interceptedStream) throws IOException { + return new ZstdInputStream(interceptedStream).setContinuous(true); + } + } + + public enum ZstdStream { + BINARY_STREAM_READER { + @Override + IonReader newReader(IonReaderBuilder builder) { + return builder.build(new ByteArrayInputStream(BINARY_BYTES)); + } + }, + TEXT_STREAM_READER { + @Override + IonReader newReader(IonReaderBuilder builder) { + return builder.build(new ByteArrayInputStream(TEXT_BYTES)); + } + }, + BINARY_BYTES_READER { + @Override + IonReader newReader(IonReaderBuilder builder) { + return builder.build(BINARY_BYTES); + } + }, + TEXT_BYTES_READER { + @Override + IonReader newReader(IonReaderBuilder builder) { + return builder.build(TEXT_BYTES); + } + }, + BINARY_STREAM_SYSTEM { + @Override + IonReader newReader(IonReaderBuilder builder) { + return IonSystemBuilder.standard() + .withReaderBuilder(builder) + .build() + .newReader(new ByteArrayInputStream(BINARY_BYTES)); + } + }, + TEXT_STREAM_SYSTEM { + @Override + IonReader newReader(IonReaderBuilder builder) { + return IonSystemBuilder.standard() + .withReaderBuilder(builder) + .build() + .newReader(new ByteArrayInputStream(TEXT_BYTES)); + } + }, + BINARY_BYTES_SYSTEM { + @Override + IonReader newReader(IonReaderBuilder builder) { + return IonSystemBuilder.standard() + .withReaderBuilder(builder) + .build() + .newReader(BINARY_BYTES); + } + }, + TEXT_BYTES_SYSTEM { + @Override + IonReader newReader(IonReaderBuilder builder) { + return IonSystemBuilder.standard() + .withReaderBuilder(builder) + .build() + .newReader(TEXT_BYTES); + } + }; + + abstract IonReader newReader(IonReaderBuilder builder); + + private static byte[] writeCompressedStream(boolean isText) { + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + try (IonWriter writer = isText + ? IonTextWriterBuilder.standard().build(new ZstdOutputStream(bytes)) + : IonBinaryWriterBuilder.standard().build(new ZstdOutputStream(bytes)) + ) { + writer.writeInt(123); + } catch (IOException e) { + throw new IllegalStateException(e); + } + return bytes.toByteArray(); + } + + private static final byte[] TEXT_BYTES = writeCompressedStream(true); + private static final byte[] BINARY_BYTES = writeCompressedStream(false); + } + + @ParameterizedTest + @EnumSource(ZstdStream.class) + public void interceptedViaIonReader(ZstdStream stream) throws IOException { + IonReaderBuilder builder = IonReaderBuilder.standard().addStreamInterceptor(ZstdStreamInterceptor.INSTANCE); + try (IonReader reader = stream.newReader(builder)) { + assertEquals(IonType.INT, reader.next()); + assertEquals(123, reader.intValue()); + } + } +}