Skip to content

Commit

Permalink
Add API to buffer BinaryData. (Azure#29605)
Browse files Browse the repository at this point in the history
* first draft.

* tests.

* more tests.

* Javadoc.

* chlog.

* snippets.

* pr feedback.

* smart buffers.

* improvements.

* harden the stream.

* more testing.

* more testing.

* orly?

* tests

* Update sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/StreamUtil.java

Co-authored-by: Alan Zimmer <48699787+alzimmermsft@users.noreply.github.com>

* PR feedback.

* cache replayable flux inside.

* cold supplier test.

Co-authored-by: Alan Zimmer <48699787+alzimmermsft@users.noreply.github.com>
  • Loading branch information
kasobol-msft and alzimmermsft authored Jun 27, 2022
1 parent 513c87e commit 04598ba
Show file tree
Hide file tree
Showing 15 changed files with 1,403 additions and 39 deletions.
2 changes: 2 additions & 0 deletions sdk/core/azure-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
### Features Added

- Added `BinaryData.isReplayable()` to indicate if multiple consumptions of the content are safe.
- Added `BinaryData.toReplayableBinaryData` and `BinaryData.toReplayableBinaryDataAsync` to allow
transforming `BinaryData` instances into replayable `BinaryData` for all content types.
- Added support for sending synchronous requests using `sendSync` in `HttpPipeline`:
- Added `HttpPipelinePolicy.processSync(HttpPipelineCallContext context, HttpPipelineNextSyncPolicy next)` to allow processing policies synchronously.
- Added `HttpPipelineSyncPolicy` to represent synchronous `HttpPipelinePolicy`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.core.util.serializer.TypeReference;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.InputStream;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -78,7 +79,44 @@ public abstract class BinaryDataContent {
/**
* Returns a flag indicating whether the content can be repeatedly consumed using all accessors including
* {@link #toStream()} and {@link #toFluxByteBuffer()}
*
* <p>
* Replayability does not imply thread-safety. The caller must not use data accessors simultaneously.
* </p>
*
* @return a flag indicating whether the content can be repeatedly consumed.
*/
public abstract boolean isReplayable();

/**
* Converts the {@link BinaryDataContent} into a {@link BinaryDataContent} that is replayable, i.e. content
* can be consumed repeatedly using all accessors.
*
* <p>
* A {@link BinaryDataContent} that is already replayable is returned as is. Otherwise techniques like
* marking and resetting a stream or buffering in memory are employed to assure replayability.
* </p>
*
* <p>
* Replayability does not imply thread-safety. The caller must not use data accessors simultaneously.
* </p>
* @return Replayable {@link BinaryDataContent}.
*/
public abstract BinaryDataContent toReplayableContent();

/**
* Converts the {@link BinaryDataContent} into a {@link BinaryDataContent} that is replayable, i.e. content
* can be consumed repeatedly using all accessors.
*
* <p>
* A {@link BinaryDataContent} that is already replayable is returned as is. Otherwise techniques like
* marking and resetting a stream or buffering in memory are employed to assure replayability.
* </p>
*
* <p>
* Replayability does not imply thread-safety. The caller must not use data accessors simultaneously.
* </p>
* @return Mono that emits replayable {@link BinaryDataContent}.
*/
public abstract Mono<BinaryDataContent> toReplayableContentAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,14 @@ public Flux<ByteBuffer> toFluxByteBuffer() {
public boolean isReplayable() {
return true;
}

@Override
public BinaryDataContent toReplayableContent() {
return this;
}

@Override
public Mono<BinaryDataContent> toReplayableContentAsync() {
return Mono.just(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.core.util.serializer.TypeReference;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
Expand Down Expand Up @@ -192,6 +193,16 @@ public boolean isReplayable() {
return true;
}

@Override
public BinaryDataContent toReplayableContent() {
return this;
}

@Override
public Mono<BinaryDataContent> toReplayableContentAsync() {
return Mono.just(this);
}

private byte[] getBytes() {
if (length > MAX_ARRAY_SIZE) {
throw LOGGER.logExceptionAsError(new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.core.util.serializer.TypeReference;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

/**
* A {@link BinaryDataContent} implementation which is backed by a {@link Flux} of {@link ByteBuffer}.
Expand All @@ -22,7 +25,9 @@ public final class FluxByteBufferContent extends BinaryDataContent {

private final Flux<ByteBuffer> content;
private final AtomicReference<byte[]> bytes = new AtomicReference<>();
private final AtomicReference<FluxByteBufferContent> cachedReplayableContent = new AtomicReference<>();
private final Long length;
private final boolean isReplayable;

/**
* Creates an instance of {@link FluxByteBufferContent}.
Expand All @@ -40,8 +45,15 @@ public FluxByteBufferContent(Flux<ByteBuffer> content) {
* @throws NullPointerException if {@code content} is null.
*/
public FluxByteBufferContent(Flux<ByteBuffer> content, Long length) {
// There's currently no way to tell if Flux is replayable or not.
// https://github.com/reactor/reactor-core/issues/1977
this(content, length, false);
}

private FluxByteBufferContent(Flux<ByteBuffer> content, Long length, boolean isReplayable) {
this.content = Objects.requireNonNull(content, "'content' cannot be null.");
this.length = length;
this.isReplayable = isReplayable;
}

@Override
Expand Down Expand Up @@ -89,7 +101,43 @@ public Flux<ByteBuffer> toFluxByteBuffer() {

@Override
public boolean isReplayable() {
return false;
return isReplayable;
}

@Override
public BinaryDataContent toReplayableContent() {
if (isReplayable) {
return this;
}

FluxByteBufferContent replayableContent = cachedReplayableContent.get();
if (replayableContent != null) {
return replayableContent;
}

Flux<ByteBuffer> bufferedFlux = content
.map(buffer -> {
// deep copy direct buffers
ByteBuffer copy = ByteBuffer.allocate(buffer.remaining());
copy.put(buffer);
copy.flip();
return copy;
})
// collectList() uses ArrayList. We don't want to be bound by array capacity
// and we don't need random access.
.collect(LinkedList::new, (BiConsumer<LinkedList<ByteBuffer>, ByteBuffer>) LinkedList::add)
.cache()
.flatMapMany(
// Duplicate buffers on re-subscription.
listOfBuffers -> Flux.fromIterable(listOfBuffers).map(ByteBuffer::duplicate));
replayableContent = new FluxByteBufferContent(bufferedFlux, length, true);
cachedReplayableContent.set(replayableContent);
return replayableContent;
}

@Override
public Mono<BinaryDataContent> toReplayableContentAsync() {
return Mono.fromCallable(this::toReplayableContent);
}

private byte[] getBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,32 @@
import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.core.util.serializer.TypeReference;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* A {@link BinaryDataContent} implementation which is backed by an {@link InputStream}.
*/
public final class InputStreamContent extends BinaryDataContent {
private static final ClientLogger LOGGER = new ClientLogger(InputStreamContent.class);
private final InputStream content;
private static final int INITIAL_BUFFER_CHUNK_SIZE = 8 * 1024;
private static final int MAX_BUFFER_CHUNK_SIZE = 8 * 1024 * 1024;
private static final int MAX_ARRAY_LENGTH = Integer.MAX_VALUE - 8;
private final Supplier<InputStream> content;
private final Long length;
private final AtomicReference<byte[]> bytes = new AtomicReference<>();
private final boolean isReplayable;


/**
Expand All @@ -33,16 +42,25 @@ public final class InputStreamContent extends BinaryDataContent {
* @param inputStream The inputStream that is used as the content for this instance.
* @throws NullPointerException if {@code content} is null.
*/
public InputStreamContent(InputStream inputStream) {
this.content = Objects.requireNonNull(inputStream, "'inputStream' cannot be null.");
public InputStreamContent(InputStream inputStream, Long length) {
Objects.requireNonNull(inputStream, "'inputStream' cannot be null.");
this.content = () -> inputStream;
this.length = length;
isReplayable = false;
}

private InputStreamContent(Supplier<InputStream> inputStreamSupplier, Long length, boolean isReplayable) {
this.content = Objects.requireNonNull(inputStreamSupplier, "'inputStreamSupplier' cannot be null.");
this.length = length;
this.isReplayable = isReplayable;
}

@Override
public Long getLength() {
if (bytes.get() != null) {
return (long) bytes.get().length;
}
return null;
return length;
}

@Override
Expand All @@ -67,7 +85,7 @@ public <T> T toObject(TypeReference<T> typeReference, ObjectSerializer serialize

@Override
public InputStream toStream() {
return this.content;
return this.content.get();
}

@Override
Expand All @@ -77,20 +95,82 @@ public ByteBuffer toByteBuffer() {

@Override
public Flux<ByteBuffer> toFluxByteBuffer() {
return FluxUtil.toFluxByteBuffer(this.content, STREAM_READ_SIZE);
return FluxUtil.toFluxByteBuffer(this.content.get(), STREAM_READ_SIZE);
}

@Override
public boolean isReplayable() {
return false;
return isReplayable;
}

@Override
public BinaryDataContent toReplayableContent() {
if (isReplayable) {
return this;
}

InputStream inputStream = this.content.get();
if (canMarkReset(inputStream, length)) {
return createMarkResetContent(inputStream, length);
} else {
return readAndBuffer(inputStream, length);
}
}

@Override
public Mono<BinaryDataContent> toReplayableContentAsync() {
if (isReplayable) {
return Mono.just(this);
}

InputStream inputStream = this.content.get();
if (canMarkReset(inputStream, length)) {
return Mono.fromCallable(() -> createMarkResetContent(inputStream, length));
}

return Mono.just(inputStream)
.publishOn(Schedulers.boundedElastic()) // reading stream can be blocking.
.map(is -> readAndBuffer(is, length));
}

private static boolean canMarkReset(InputStream inputStream, Long length) {
return length != null && length < MAX_ARRAY_LENGTH && inputStream.markSupported();
}

private static InputStreamContent createMarkResetContent(InputStream inputStream, Long length) {
inputStream.mark(length.intValue());
return new InputStreamContent(
() -> {
try {
inputStream.reset();
return inputStream;
} catch (IOException e) {
throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
}
}, length, true
);
}

private static InputStreamContent readAndBuffer(InputStream inputStream, Long length) {
try {
List<ByteBuffer> byteBuffers = StreamUtil.readStreamToListOfByteBuffers(
inputStream, length, INITIAL_BUFFER_CHUNK_SIZE, MAX_BUFFER_CHUNK_SIZE);

return new InputStreamContent(
() -> new IterableOfByteBuffersInputStream(byteBuffers),
length, true);
} catch (IOException e) {
throw LOGGER.logExceptionAsError(new UncheckedIOException(e));
}
}

private byte[] getBytes() {
try {
ByteArrayOutputStream dataOutputBuffer = new ByteArrayOutputStream();
int nRead;
byte[] data = new byte[STREAM_READ_SIZE];
while ((nRead = this.content.read(data, 0, data.length)) != -1) {
InputStream inputStream = this.content.get();
while ((nRead = inputStream.read(data, 0, data.length)) != -1) {
dataOutputBuffer.write(data, 0, nRead);
}
return dataOutputBuffer.toByteArray();
Expand Down
Loading

0 comments on commit 04598ba

Please sign in to comment.