Skip to content

Commit

Permalink
Send BinaryData based HttpRequests via OkHttp client. (Azure#28889)
Browse files Browse the repository at this point in the history
* add weird flux.

* that works.

* Handling BinaryData requests. Unbuffered writes.

* do this.

* this is better.

* right constant size.

* more tests.

* call timeout.

* expose call timeout.

* pr feedback.

* undo formatting change.

* un-flaky this test.

* ugh..

* .

* don't use side effect operator for this. it's flaky.

* improve flux send.

* validate timeout.

* more pr feedback.

* changelog.

* chlog.

* fix build.
  • Loading branch information
kasobol-msft authored May 24, 2022
1 parent 42844e2 commit a29b606
Show file tree
Hide file tree
Showing 34 changed files with 541 additions and 107 deletions.
2 changes: 1 addition & 1 deletion common/smoke-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-security-keyvault-secrets</artifactId>
<version>4.4.1</version> <!-- {x-version-update;com.azure:azure-security-keyvault-secrets;dependency} -->
<version>4.4.2</version> <!-- {x-version-update;com.azure:azure-security-keyvault-secrets;dependency} -->
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2666,6 +2666,16 @@
<Bug pattern="BC_UNCONFIRMED_CAST_OF_RETURN_VALUE"/>
</Match>

<!-- False-positive when using ClientLogger.logThrowableAsError/Warning -->
<Match>
<Or>
<Class name="com.azure.core.http.okhttp.implementation.OkHttpFluxRequestBody"/>
<Class name="com.azure.core.http.okhttp.implementation.OkHttpInputStreamRequestBody"/>
</Or>
<Method name="writeTo"/>
<Bug pattern="BC_UNCONFIRMED_CAST_OF_RETURN_VALUE"/>
</Match>

<!-- Exclude BC_UNCONFIRMED_CAST_OF_RETURN_VALUE as false positive -->
<Match>
<Class name="com.azure.ai.textanalytics.AnalyzeActionsAsyncClient"/>
Expand Down Expand Up @@ -2728,5 +2738,4 @@
<Method name="getRecognizePiiEntitiesResponse"/>
<Bug pattern="BC_UNCONFIRMED_CAST_OF_RETURN_VALUE"/>
</Match>

</FindBugsFilter>
8 changes: 4 additions & 4 deletions sdk/aot/azure-aot-graalvm-samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
</developers>

<dependencies>

<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-data-appconfiguration</artifactId>
Expand All @@ -71,7 +71,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-security-keyvault-keys</artifactId>
<version>4.4.1</version> <!-- {x-version-update;com.azure:azure-security-keyvault-keys;dependency} -->
<version>4.4.2</version> <!-- {x-version-update;com.azure:azure-security-keyvault-keys;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand All @@ -82,12 +82,12 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-security-keyvault-secrets</artifactId>
<version>4.4.1</version> <!-- {x-version-update;com.azure:azure-security-keyvault-secrets;dependency} -->
<version>4.4.2</version> <!-- {x-version-update;com.azure:azure-security-keyvault-secrets;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-security-keyvault-certificates</artifactId>
<version>4.3.1</version> <!-- {x-version-update;com.azure:azure-security-keyvault-certificates;dependency} -->
<version>4.3.2</version> <!-- {x-version-update;com.azure:azure-security-keyvault-certificates;dependency} -->
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-security-keyvault-secrets</artifactId>
<version>4.4.1</version> <!-- {x-version-update;com.azure:azure-security-keyvault-secrets;dependency} -->
<version>4.4.2</version> <!-- {x-version-update;com.azure:azure-security-keyvault-secrets;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand All @@ -104,9 +104,9 @@
<scope>test</scope>
</dependency>

<!-- Added this dependency to include necessary annotations used
by reactor core. Without this dependency, javadoc throws a warning as it
cannot find enum When.MAYBE which is used in @Nullable annotation in reactor
<!-- Added this dependency to include necessary annotations used
by reactor core. Without this dependency, javadoc throws a warning as it
cannot find enum When.MAYBE which is used in @Nullable annotation in reactor
core classes -->
<dependency>
<groupId>com.google.code.findbugs</groupId>
Expand Down
5 changes: 5 additions & 0 deletions sdk/core/azure-core-http-okhttp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@

### Features Added

- This client can now stream bodies larger than 2GB. The buffering for `Flux<ByteBuffer>` request bodies has been removed.

### Breaking Changes

### Bugs Fixed

### Other Changes

- Added specialized consumption for `HttpRequest.getBodyAsBinaryData()`.


## 1.9.0 (2022-05-06)

### Features Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,38 @@
import com.azure.core.http.HttpResponse;
import com.azure.core.http.okhttp.implementation.OkHttpAsyncBufferedResponse;
import com.azure.core.http.okhttp.implementation.OkHttpAsyncResponse;
import com.azure.core.http.okhttp.implementation.OkHttpFileRequestBody;
import com.azure.core.http.okhttp.implementation.OkHttpFluxRequestBody;
import com.azure.core.http.okhttp.implementation.OkHttpInputStreamRequestBody;
import com.azure.core.implementation.util.BinaryDataContent;
import com.azure.core.implementation.util.BinaryDataHelper;
import com.azure.core.implementation.util.ByteArrayContent;
import com.azure.core.implementation.util.FileContent;
import com.azure.core.implementation.util.InputStreamContent;
import com.azure.core.implementation.util.SerializableContent;
import com.azure.core.implementation.util.StringContent;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import okio.ByteString;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

/**
* HttpClient implementation for OkHttp.
*/
class OkHttpAsyncHttpClient implements HttpClient {

private static final Mono<RequestBody> EMPTY_REQUEST_BODY_MONO = Mono.just(RequestBody.create(new byte[0]));

final OkHttpClient httpClient;
//
private static final Mono<okio.ByteString> EMPTY_BYTE_STRING_MONO = Mono.just(okio.ByteString.EMPTY);

OkHttpAsyncHttpClient(OkHttpClient httpClient) {
this.httpClient = httpClient;
Expand Down Expand Up @@ -80,7 +88,7 @@ public Mono<HttpResponse> send(HttpRequest request, Context context) {
* @param request the azure-core request
* @return the Mono emitting okhttp request
*/
private static Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request) {
private Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request) {
Request.Builder requestBuilder = new Request.Builder()
.url(request.getUrl());

Expand All @@ -98,54 +106,62 @@ private static Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request) {
return Mono.just(requestBuilder.head().build());
}

return toOkHttpRequestBody(request.getBody(), request.getHeaders())
return toOkHttpRequestBody(request.getBodyAsBinaryData(), request.getHeaders())
.map(okhttpRequestBody -> requestBuilder.method(request.getHttpMethod().toString(), okhttpRequestBody)
.build());
}

/**
* Create a Mono of okhttp3.RequestBody from the given java.nio.ByteBuffer Flux.
* Create a Mono of okhttp3.RequestBody from the given BinaryData.
*
* @param bbFlux stream of java.nio.ByteBuffer representing request content
* @param bodyContent The request body content
* @param headers the headers associated with the original request
* @return the Mono emitting okhttp3.RequestBody
* @return the Mono emitting okhttp request
*/
private static Mono<RequestBody> toOkHttpRequestBody(Flux<ByteBuffer> bbFlux, HttpHeaders headers) {
Mono<okio.ByteString> bsMono = bbFlux == null
? EMPTY_BYTE_STRING_MONO
: toByteString(bbFlux);
private Mono<RequestBody> toOkHttpRequestBody(BinaryData bodyContent, HttpHeaders headers) {
String contentType = headers.getValue("Content-Type");
MediaType mediaType = (contentType == null) ? null : MediaType.parse(contentType);

return bsMono.map(bs -> {
String contentType = headers.getValue("Content-Type");
MediaType mediaType = (contentType == null) ? null : MediaType.parse(contentType);
if (bodyContent == null) {
return EMPTY_REQUEST_BODY_MONO;
}

return RequestBody.create(bs, mediaType);
});
BinaryDataContent content = BinaryDataHelper.getContent(bodyContent);

if (content instanceof ByteArrayContent) {
return Mono.just(RequestBody.create(content.toBytes(), mediaType));
} else if (content instanceof StringContent
|| content instanceof SerializableContent) {
return Mono.fromCallable(() -> RequestBody.create(content.toBytes(), mediaType));
} else {
long effectiveContentLength = getRequestContentLength(content, headers);
if (content instanceof InputStreamContent) {
// The OkHttpInputStreamRequestBody doesn't read bytes until it's triggered by OkHttp dispatcher.
return Mono.just(new OkHttpInputStreamRequestBody(
(InputStreamContent) content, effectiveContentLength, mediaType));
} else if (content instanceof FileContent) {
// The OkHttpFileRequestBody doesn't read bytes until it's triggered by OkHttp dispatcher.
return Mono.just(new OkHttpFileRequestBody((FileContent) content, effectiveContentLength, mediaType));
} else {
// The OkHttpFluxRequestBody doesn't read bytes until it's triggered by OkHttp dispatcher.
return Mono.just(new OkHttpFluxRequestBody(
content, effectiveContentLength, mediaType, httpClient.callTimeoutMillis()));
}
}
}

/**
* Aggregate Flux of java.nio.ByteBuffer to single okio.ByteString.
*
* Pooled okio.Buffer type is used to buffer emitted ByteBuffer instances. Content of each ByteBuffer will be
* written (i.e copied) to the internal okio.Buffer slots. Once the stream terminates, the contents of all slots get
* copied to one single byte array and okio.ByteString will be created referring this byte array. Finally the
* initial okio.Buffer will be returned to the pool.
*
* @param bbFlux the Flux of ByteBuffer to aggregate
* @return a mono emitting aggregated ByteString
*/
private static Mono<ByteString> toByteString(Flux<ByteBuffer> bbFlux) {
Objects.requireNonNull(bbFlux, "'bbFlux' cannot be null.");
return Mono.using(okio.Buffer::new,
buffer -> bbFlux.reduce(buffer, (b, byteBuffer) -> {
try {
b.write(byteBuffer);
return b;
} catch (IOException ioe) {
throw Exceptions.propagate(ioe);
}
}).map(b -> ByteString.of(b.readByteArray())), okio.Buffer::clear)
.switchIfEmpty(EMPTY_BYTE_STRING_MONO);
private static long getRequestContentLength(BinaryDataContent content, HttpHeaders headers) {
Long contentLength = content.getLength();
if (contentLength == null) {
String contentLengthHeaderValue = headers.getValue("Content-Length");
if (contentLengthHeaderValue != null) {
contentLength = Long.parseLong(contentLengthHeaderValue);
} else {
// -1 means that content length is unknown.
contentLength = -1L;
}
}
return contentLength;
}

private static class OkHttpCallback implements okhttp3.Callback {
Expand Down
Loading

0 comments on commit a29b606

Please sign in to comment.