networ
*
* @param readTimeout Read timeout duration.
* @return The updated OkHttpAsyncHttpClientBuilder object.
+ * @see OkHttpClient.Builder#readTimeout(Duration)
*/
public OkHttpAsyncHttpClientBuilder readTimeout(Duration readTimeout) {
// setReadTimeout can be null
@@ -135,6 +138,7 @@ public OkHttpAsyncHttpClientBuilder readTimeout(Duration readTimeout) {
*
* @param writeTimeout Write operation timeout duration.
* @return The updated OkHttpAsyncHttpClientBuilder object.
+ * @see OkHttpClient.Builder#writeTimeout(Duration)
*/
public OkHttpAsyncHttpClientBuilder writeTimeout(Duration writeTimeout) {
this.writeTimeout = writeTimeout;
@@ -152,10 +156,11 @@ public OkHttpAsyncHttpClientBuilder writeTimeout(Duration writeTimeout) {
* applied. When applying the timeout the greatest of one millisecond and the value of {@code connectTimeout} will
* be used.
*
- * By default the connection timeout is 10 seconds.
+ * By default, the connection timeout is 10 seconds.
*
* @param connectionTimeout Connect timeout duration.
* @return The updated OkHttpAsyncHttpClientBuilder object.
+ * @see OkHttpClient.Builder#connectTimeout(Duration)
*/
public OkHttpAsyncHttpClientBuilder connectionTimeout(Duration connectionTimeout) {
// setConnectionTimeout can be null
@@ -163,11 +168,36 @@ public OkHttpAsyncHttpClientBuilder connectionTimeout(Duration connectionTimeout
return this;
}
+ /**
+ * Sets the default timeout for complete calls.
+ *
+ * The call timeout spans the entire call: resolving DNS, connecting, writing the request body,
+ * server processing, and reading the response body.
+ *
+ * Null or {@link Duration#ZERO} means no call timeout, otherwise values
+ * must be between 1 and {@link Integer#MAX_VALUE} when converted to milliseconds.
+ *
+ * By default, call timeout is not enabled.
+ *
+ * @param callTimeout Call timeout duration.
+ * @return The updated OkHttpAsyncHttpClientBuilder object.
+ * @see OkHttpClient.Builder#callTimeout(Duration)
+ */
+ public OkHttpAsyncHttpClientBuilder callTimeout(Duration callTimeout) {
+ // callTimeout can be null
+ if (callTimeout != null && callTimeout.isNegative()) {
+ throw LOGGER.logExceptionAsError(new IllegalArgumentException("'callTimeout' cannot be negative"));
+ }
+ this.callTimeout = callTimeout;
+ return this;
+ }
+
/**
* Sets the Http connection pool.
*
* @param connectionPool The OkHttp connection pool to use.
* @return The updated OkHttpAsyncHttpClientBuilder object.
+ * @see OkHttpClient.Builder#connectionPool(ConnectionPool)
*/
public OkHttpAsyncHttpClientBuilder connectionPool(ConnectionPool connectionPool) {
// Null ConnectionPool is not allowed
@@ -180,6 +210,7 @@ public OkHttpAsyncHttpClientBuilder connectionPool(ConnectionPool connectionPool
*
* @param dispatcher The dispatcher to use.
* @return The updated OkHttpAsyncHttpClientBuilder object.
+ * @see OkHttpClient.Builder#dispatcher(Dispatcher)
*/
public OkHttpAsyncHttpClientBuilder dispatcher(Dispatcher dispatcher) {
// Null Dispatcher is not allowed
@@ -245,9 +276,14 @@ public HttpClient build() {
// Configure operation timeouts.
httpClientBuilder = httpClientBuilder
- .connectTimeout(getTimeoutMillis(connectionTimeout, DEFAULT_CONNECT_TIMEOUT), TimeUnit.MILLISECONDS)
- .writeTimeout(getTimeoutMillis(writeTimeout, DEFAULT_WRITE_TIMEOUT), TimeUnit.MILLISECONDS)
- .readTimeout(getTimeoutMillis(readTimeout, DEFAULT_READ_TIMEOUT), TimeUnit.MILLISECONDS);
+ .connectTimeout(getTimeout(connectionTimeout, DEFAULT_CONNECT_TIMEOUT))
+ .writeTimeout(getTimeout(writeTimeout, DEFAULT_WRITE_TIMEOUT))
+ .readTimeout(getTimeout(readTimeout, DEFAULT_READ_TIMEOUT));
+
+ if (callTimeout != null) {
+ // Call timeout is disabled by default.
+ httpClientBuilder.callTimeout(callTimeout);
+ }
// If set use the configured connection pool.
if (this.connectionPool != null) {
@@ -294,7 +330,7 @@ public HttpClient build() {
* If the timeout is {@code null} the default timeout will be used. If the timeout is less than or equal to zero
* no timeout will be used. If the timeout is less than one millisecond a timeout of one millisecond will be used.
*/
- static long getTimeoutMillis(Duration configuredTimeout, long defaultTimeout) {
+ static Duration getTimeout(Duration configuredTimeout, Duration defaultTimeout) {
// Timeout is null, use the default timeout.
if (configuredTimeout == null) {
return defaultTimeout;
@@ -302,10 +338,14 @@ static long getTimeoutMillis(Duration configuredTimeout, long defaultTimeout) {
// Timeout is less than or equal to zero, return no timeout.
if (configuredTimeout.isZero() || configuredTimeout.isNegative()) {
- return 0;
+ return Duration.ZERO;
}
// Return the maximum of the timeout period and the minimum allowed timeout period.
- return Math.max(configuredTimeout.toMillis(), MINIMUM_TIMEOUT);
+ if (configuredTimeout.compareTo(MINIMUM_TIMEOUT) < 0) {
+ return MINIMUM_TIMEOUT;
+ } else {
+ return configuredTimeout;
+ }
}
}
diff --git a/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpFileRequestBody.java b/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpFileRequestBody.java
new file mode 100644
index 0000000000000..babfcd22842ed
--- /dev/null
+++ b/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpFileRequestBody.java
@@ -0,0 +1,38 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.http.okhttp.implementation;
+
+import com.azure.core.implementation.util.FileContent;
+import okhttp3.MediaType;
+import okio.BufferedSink;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+/**
+ * An {@link okhttp3.RequestBody} subtype that sends {@link FileContent} in an unbuffered manner.
+ */
+public class OkHttpFileRequestBody extends OkHttpStreamableRequestBody {
+
+ public OkHttpFileRequestBody(FileContent content, long effectiveContentLength, MediaType mediaType) {
+ super(content, effectiveContentLength, mediaType);
+ }
+
+ @Override
+ public void writeTo(BufferedSink bufferedSink) throws IOException {
+ long count = effectiveContentLength;
+ if (count < 0) {
+ // OkHttp marks chunked encoding as -1.
+ // The content length is not specified so sending all remaining content.
+ count = Long.MAX_VALUE;
+ }
+ // RequestBody.create(File) does not support position and length.
+ // BufferedSink implements WritableByteChannel so we can leverage FileChannel as source.
+ // FileChannel supports positional reads.
+ try (FileChannel channel = FileChannel.open(content.getFile(), StandardOpenOption.READ)) {
+ channel.transferTo(0, count, bufferedSink);
+ }
+ }
+}
diff --git a/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpFluxRequestBody.java b/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpFluxRequestBody.java
new file mode 100644
index 0000000000000..cf8b5e2225127
--- /dev/null
+++ b/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpFluxRequestBody.java
@@ -0,0 +1,82 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.http.okhttp.implementation;
+
+import com.azure.core.implementation.util.BinaryDataContent;
+import com.azure.core.util.logging.ClientLogger;
+import okhttp3.MediaType;
+import okio.BufferedSink;
+import reactor.core.Exceptions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An {@link okhttp3.RequestBody} subtype that sends {@link BinaryDataContent}
+ * as {@link Flux} of {@link ByteBuffer} in an unbuffered manner.
+ * This class accepts any {@link BinaryDataContent} as catch-all for backwards compatibility
+ * but ideally should be used only with reactive payloads.
+ */
+public class OkHttpFluxRequestBody extends OkHttpStreamableRequestBody {
+
+ private static final ClientLogger LOGGER = new ClientLogger(OkHttpFluxRequestBody.class);
+
+ private final AtomicBoolean bodySent = new AtomicBoolean(false);
+ private final int callTimeoutMillis;
+
+ public OkHttpFluxRequestBody(
+ BinaryDataContent content, long effectiveContentLength, MediaType mediaType, int callTimeoutMillis) {
+ super(content, effectiveContentLength, mediaType);
+ this.callTimeoutMillis = callTimeoutMillis;
+ }
+
+ @Override
+ public void writeTo(BufferedSink bufferedSink) throws IOException {
+ if (bodySent.compareAndSet(false, true)) {
+ Mono requestSendMono = content.toFluxByteBuffer()
+ .flatMapSequential(buffer -> {
+ if (Schedulers.isInNonBlockingThread()) {
+ return Mono.just(buffer)
+ .publishOn(Schedulers.boundedElastic())
+ .map(b -> writeBuffer(bufferedSink, b))
+ .then();
+ } else {
+ writeBuffer(bufferedSink, buffer);
+ return Mono.empty();
+ }
+ }, 1, 1)
+ .then();
+
+ // The blocking happens on OkHttp thread pool.
+ if (callTimeoutMillis > 0) {
+ /*
+ * Default call timeout (in milliseconds). By default there is no timeout for complete calls, but
+ * there is for the connection, write, and read actions within a call.
+ */
+ requestSendMono.block(Duration.ofMillis(callTimeoutMillis));
+ } else {
+ requestSendMono.block();
+ }
+ } else {
+ // Prevent OkHttp from potentially re-sending non-repeatable body outside of retry policies.
+ throw LOGGER.logThrowableAsError(new IOException("Re-attempt to send Flux body is not supported"));
+ }
+ }
+
+ private ByteBuffer writeBuffer(BufferedSink sink, ByteBuffer buffer) {
+ try {
+ while (buffer.hasRemaining()) {
+ sink.write(buffer);
+ }
+ return buffer;
+ } catch (IOException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+}
diff --git a/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpInputStreamRequestBody.java b/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpInputStreamRequestBody.java
new file mode 100644
index 0000000000000..dfa66efdb53cb
--- /dev/null
+++ b/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpInputStreamRequestBody.java
@@ -0,0 +1,39 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.http.okhttp.implementation;
+
+import com.azure.core.implementation.util.InputStreamContent;
+import com.azure.core.util.logging.ClientLogger;
+import okhttp3.MediaType;
+import okio.BufferedSink;
+import okio.Okio;
+import okio.Source;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An {@link okhttp3.RequestBody} subtype that sends {@link InputStreamContent} in an unbuffered manner.
+ */
+public class OkHttpInputStreamRequestBody extends OkHttpStreamableRequestBody {
+
+ private static final ClientLogger LOGGER = new ClientLogger(OkHttpInputStreamRequestBody.class);
+
+ private final AtomicBoolean bodySent = new AtomicBoolean(false);
+
+ public OkHttpInputStreamRequestBody(InputStreamContent content, long effectiveContentLength, MediaType mediaType) {
+ super(content, effectiveContentLength, mediaType);
+ }
+
+ @Override
+ public void writeTo(BufferedSink bufferedSink) throws IOException {
+ if (bodySent.compareAndSet(false, true)) {
+ Source source = Okio.source(content.toStream());
+ bufferedSink.writeAll(source);
+ } else {
+ // Prevent OkHttp from potentially re-sending non-repeatable body outside of retry policies.
+ throw LOGGER.logThrowableAsError(new IOException("Re-attempt to send InputStream body is not supported."));
+ }
+ }
+}
diff --git a/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpStreamableRequestBody.java b/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpStreamableRequestBody.java
new file mode 100644
index 0000000000000..545695956ddca
--- /dev/null
+++ b/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpStreamableRequestBody.java
@@ -0,0 +1,39 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.core.http.okhttp.implementation;
+
+import com.azure.core.implementation.util.BinaryDataContent;
+import okhttp3.MediaType;
+import okhttp3.RequestBody;
+
+import java.util.Objects;
+
+/**
+ * Base class for streamable request bodies.
+ * @param BinaryDataContent.
+ */
+public abstract class OkHttpStreamableRequestBody extends RequestBody {
+ protected final T content;
+ /**
+ * Content length or -1 if unspecified (i.e. chunked encoding)
+ */
+ protected final long effectiveContentLength;
+ private final MediaType mediaType;
+
+ public OkHttpStreamableRequestBody(T content, long effectiveContentLength, MediaType mediaType) {
+ this.content = Objects.requireNonNull(content, "'content' cannot be null.");
+ this.effectiveContentLength = effectiveContentLength;
+ this.mediaType = mediaType;
+ }
+
+ @Override
+ public final MediaType contentType() {
+ return mediaType;
+ }
+
+ @Override
+ public final long contentLength() {
+ return effectiveContentLength;
+ }
+}
diff --git a/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientBuilderTests.java b/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientBuilderTests.java
index b6e487a54b310..fe0f700602534 100644
--- a/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientBuilderTests.java
+++ b/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientBuilderTests.java
@@ -262,6 +262,60 @@ public void buildWithReadTimeout() {
.verifyComplete();
}
+ /**
+ * Tests building a client with a given {@code callTimeout}.
+ */
+ @Test
+ public void buildWithCallTimeout() {
+ long expectedCallTimeoutNanos = 3600000000000L;
+ Interceptor validatorInterceptor = chain -> {
+ assertEquals(expectedCallTimeoutNanos, chain.call().timeout().timeoutNanos());
+ return chain.proceed(chain.request());
+ };
+
+ HttpClient okClient = new OkHttpAsyncHttpClientBuilder()
+ .addNetworkInterceptor(validatorInterceptor)
+ .callTimeout(Duration.ofSeconds(3600))
+ .build();
+
+ StepVerifier.create(okClient.send(new HttpRequest(HttpMethod.GET, defaultUrl)))
+ .assertNext(response -> assertEquals(200, response.getStatusCode()))
+ .verifyComplete();
+ }
+
+ /**
+ * Tests building a client with negative callTimeout.
+ */
+ @Test
+ public void throwsWithNegativeCallTimeout() {
+ assertThrows(IllegalArgumentException.class, () -> {
+ new OkHttpAsyncHttpClientBuilder()
+ .callTimeout(Duration.ofSeconds(-1));
+ });
+ }
+
+ /**
+ * Tests building a client with default timeouts.
+ */
+ @Test
+ public void buildWithDefaultTimeouts() {
+ Interceptor validatorInterceptor = chain -> {
+ assertEquals(0L, chain.call().timeout().timeoutNanos());
+ assertEquals(60000, chain.readTimeoutMillis());
+ assertEquals(60000, chain.writeTimeoutMillis());
+ assertEquals(10000, chain.connectTimeoutMillis());
+ return chain.proceed(chain.request());
+ };
+
+ HttpClient okClient = new OkHttpAsyncHttpClientBuilder()
+ .addNetworkInterceptor(validatorInterceptor)
+ .build();
+
+ StepVerifier.create(okClient.send(new HttpRequest(HttpMethod.GET, defaultUrl)))
+ .assertNext(response -> assertEquals(200, response.getStatusCode()))
+ .verifyComplete();
+ }
+
/**
* Tests building a client with a given {@code connectionPool}.
*/
diff --git a/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientHttpClientTests.java b/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientHttpClientTests.java
index 59f61ba4f3371..e51d004c32957 100644
--- a/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientHttpClientTests.java
+++ b/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientHttpClientTests.java
@@ -4,11 +4,22 @@
package com.azure.core.http.okhttp;
import com.azure.core.http.HttpClient;
+import com.azure.core.http.HttpHeaders;
+import com.azure.core.http.HttpMethod;
+import com.azure.core.http.HttpRequest;
import com.azure.core.test.HttpClientTestsWireMockServer;
import com.azure.core.test.http.HttpClientTests;
+import com.azure.core.util.BinaryData;
import com.github.tomakehurst.wiremock.WireMockServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
public class OkHttpAsyncHttpClientHttpClientTests extends HttpClientTests {
private static WireMockServer server;
@@ -35,4 +46,46 @@ protected int getWireMockPort() {
protected HttpClient createHttpClient() {
return new OkHttpAsyncClientProvider().createInstance();
}
+
+ @Test
+ public void testVerySlowFluxGetsInterruptedByOkHttpInternals() {
+ HttpClient httpClient = new OkHttpAsyncHttpClientBuilder()
+ .callTimeout(Duration.ofMillis(1000)) // this caps full req-res round trip.
+ .build();
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ Flux delayedFlux = Flux.just(byteBuffer).map(ByteBuffer::duplicate).repeat(101)
+ .delayElements(Duration.ofMillis(10))
+ // append last element that takes a day to emit.
+ .concatWith(Flux.just(byteBuffer).map(ByteBuffer::duplicate).delayElements(Duration.ofDays(1)));
+ Mono requestBodyMono = BinaryData.fromFlux(delayedFlux, null, false);
+
+ StepVerifier.create(
+ requestBodyMono.flatMap(data -> {
+ HttpRequest request = new HttpRequest(
+ HttpMethod.PUT, getRequestUrl(ECHO_RESPONSE), new HttpHeaders(), data);
+ return httpClient.send(request);
+ })
+ ).verifyError();
+ }
+
+ @Test
+ public void testUnresponsiveFluxGetsInterruptedInFluxRequestBody() {
+ HttpClient httpClient = new OkHttpAsyncHttpClientBuilder()
+ .callTimeout(Duration.ofMillis(1000)) // this caps full req-res round trip.
+ .build();
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ Flux delayedFlux = Flux.just(byteBuffer).map(ByteBuffer::duplicate)
+ .delayElements(Duration.ofDays(1));
+ Mono requestBodyMono = BinaryData.fromFlux(delayedFlux, null, false);
+
+ StepVerifier.create(
+ requestBodyMono.flatMap(data -> {
+ HttpRequest request = new HttpRequest(
+ HttpMethod.PUT, getRequestUrl(ECHO_RESPONSE), new HttpHeaders(), data);
+ return httpClient.send(request);
+ })
+ ).verifyError();
+ }
}
diff --git a/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientTests.java b/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientTests.java
index 09a16a9fae575..b8eab3064fd98 100644
--- a/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientTests.java
+++ b/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientTests.java
@@ -11,6 +11,7 @@
import com.azure.core.http.HttpResponse;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import okhttp3.Dispatcher;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -143,7 +144,7 @@ public void testRequestBodyIsErrorShouldPropagateToResponse() {
.setBody(Flux.error(new RuntimeException("boo")));
StepVerifier.create(client.send(request))
- .expectErrorMessage("boo")
+ .expectErrorMatches(e -> e.getMessage().contains("boo"))
.verify();
}
@@ -161,7 +162,7 @@ public void testRequestBodyEndsInErrorShouldPropagateToResponse() {
try {
StepVerifier.create(client.send(request))
- .expectErrorMessage("boo")
+ .expectErrorMatches(e -> e.getMessage().contains("boo"))
.verify(Duration.ofSeconds(10));
} catch (Exception ex) {
assertEquals("boo", ex.getMessage());
@@ -216,17 +217,25 @@ public void testServerShutsDownSocketShouldPushErrorToContentFlowable() {
@Test
public void testConcurrentRequests() throws NoSuchAlgorithmException {
int numRequests = 100; // 100 = 1GB of data read
- HttpClient client = new OkHttpAsyncClientProvider().createInstance();
+ int concurrency = 10;
+ Dispatcher dispatcher = new Dispatcher();
+ dispatcher.setMaxRequestsPerHost(concurrency); // this is 5 by default.
+ HttpClient client = new OkHttpAsyncHttpClientBuilder()
+ .dispatcher(dispatcher)
+ .build();
byte[] expectedDigest = digest(LONG_BODY);
long expectedByteCount = (long) numRequests * LONG_BODY.getBytes(StandardCharsets.UTF_8).length;
Mono numBytesMono = Flux.range(1, numRequests)
- .parallel(10)
+ .parallel(concurrency)
.runOn(Schedulers.boundedElastic())
.flatMap(n -> Mono.fromCallable(() -> getResponse(client, "/long")).flatMapMany(response -> {
MessageDigest md = md5Digest();
return response.getBody()
- .doOnNext(buffer -> md.update(buffer.duplicate()))
+ .map(buffer -> {
+ md.update(buffer.duplicate());
+ return buffer;
+ })
.doOnComplete(() -> assertArrayEquals(expectedDigest, md.digest(), "wrong digest!"));
}))
.sequential()
diff --git a/sdk/core/azure-core-test/src/main/java/com/azure/core/test/http/HttpClientTests.java b/sdk/core/azure-core-test/src/main/java/com/azure/core/test/http/HttpClientTests.java
index b794e1b849daa..8213f690185e0 100644
--- a/sdk/core/azure-core-test/src/main/java/com/azure/core/test/http/HttpClientTests.java
+++ b/sdk/core/azure-core-test/src/main/java/com/azure/core/test/http/HttpClientTests.java
@@ -9,6 +9,7 @@
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.util.BinaryData;
+import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.core.util.serializer.TypeReference;
import org.junit.jupiter.api.Named;
@@ -31,6 +32,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -43,6 +45,8 @@
* Generic test suite for {@link HttpClient HttpClients}.
*/
public abstract class HttpClientTests {
+ private static final ClientLogger LOGGER = new ClientLogger(HttpClientTests.class);
+
private static final String REQUEST_HOST = "http://localhost";
private static final String PLAIN_RESPONSE = "plainBytesNoHeader";
private static final String HEADER_RESPONSE = "plainBytesWithHeader";
@@ -54,7 +58,7 @@ public abstract class HttpClientTests {
private static final String UTF_32LE_BOM_RESPONSE = "utf32LeBomBytes";
private static final String BOM_WITH_SAME_HEADER = "bomBytesWithSameHeader";
private static final String BOM_WITH_DIFFERENT_HEADER = "bomBytesWithDifferentHeader";
- private static final String ECHO_RESPONSE = "echo";
+ protected static final String ECHO_RESPONSE = "echo";
private static final Random RANDOM = new Random();
@@ -206,7 +210,7 @@ public void canSendBinaryData(BinaryData requestBody, byte[] expectedResponseBod
throws MalformedURLException {
HttpRequest request = new HttpRequest(
HttpMethod.PUT,
- new URL(REQUEST_HOST + ":" + getWireMockPort() + "/" + ECHO_RESPONSE),
+ getRequestUrl(ECHO_RESPONSE),
new HttpHeaders(),
requestBody);
@@ -233,15 +237,40 @@ private static Stream getBinaryDataBodyVariants() {
BinaryData streamData = BinaryData.fromStream(new ByteArrayInputStream(bytes));
List bufferList = new ArrayList<>();
- int bufferSize = 10;
+ int bufferSize = 113;
for (int startIndex = 0; startIndex < bytes.length; startIndex += bufferSize) {
bufferList.add(
ByteBuffer.wrap(
bytes, startIndex, Math.min(bytes.length - startIndex, bufferSize)));
}
- BinaryData fluxBinaryData = BinaryData.fromFlux(Flux.fromIterable(bufferList),
+ BinaryData fluxBinaryData = BinaryData.fromFlux(
+ Flux.fromIterable(bufferList)
+ .map(ByteBuffer::duplicate),
+ null, false).block();
+
+ BinaryData fluxBinaryDataWithLength = BinaryData.fromFlux(
+ Flux.fromIterable(bufferList)
+ .map(ByteBuffer::duplicate),
+ size.longValue(), false).block();
+
+ BinaryData asyncFluxBinaryData = BinaryData.fromFlux(
+ Flux.fromIterable(bufferList)
+ .map(ByteBuffer::duplicate)
+ .delayElements(Duration.ofNanos(10))
+ .flatMapSequential(
+ buffer -> Mono.delay(Duration.ofNanos(10)).map(i -> buffer)
+ ),
null, false).block();
+ BinaryData asyncFluxBinaryDataWithLength = BinaryData.fromFlux(
+ Flux.fromIterable(bufferList)
+ .map(ByteBuffer::duplicate)
+ .delayElements(Duration.ofNanos(10))
+ .flatMapSequential(
+ buffer -> Mono.delay(Duration.ofNanos(10)).map(i -> buffer)
+ ),
+ size.longValue(), false).block();
+
BinaryData objectBinaryData = BinaryData.fromObject(bytes, new ByteArraySerializer());
@@ -258,6 +287,9 @@ private static Stream getBinaryDataBodyVariants() {
Arguments.of(Named.named("InputStream",
streamData), Named.named("" + size, bytes)),
Arguments.of(Named.named("Flux", fluxBinaryData), Named.named("" + size, bytes)),
+ Arguments.of(Named.named("Flux with length", fluxBinaryDataWithLength), Named.named("" + size, bytes)),
+ Arguments.of(Named.named("async Flux", asyncFluxBinaryData), Named.named("" + size, bytes)),
+ Arguments.of(Named.named("async Flux with length", asyncFluxBinaryDataWithLength), Named.named("" + size, bytes)),
Arguments.of(Named.named("Object", objectBinaryData), Named.named("" + size, bytes)),
Arguments.of(Named.named("File", fileData), Named.named("" + size, bytes))
);
@@ -269,10 +301,24 @@ private static Stream getBinaryDataBodyVariants() {
private Mono sendRequest(String requestPath) {
return createHttpClient()
- .send(new HttpRequest(HttpMethod.GET, REQUEST_HOST + ":" + getWireMockPort() + "/" + requestPath))
+ .send(new HttpRequest(HttpMethod.GET, getRequestUrl(requestPath)))
.flatMap(HttpResponse::getBodyAsString);
}
+ /**
+ * Gets the request URL for given path.
+ * @param requestPath The path.
+ * @return The request URL for given path.
+ * @throws RuntimeException if url is invalid.
+ */
+ protected URL getRequestUrl(String requestPath) {
+ try {
+ return new URL(REQUEST_HOST + ":" + getWireMockPort() + "/" + requestPath);
+ } catch (MalformedURLException e) {
+ throw LOGGER.logExceptionAsError(new RuntimeException(e));
+ }
+ }
+
private static class ByteArraySerializer implements ObjectSerializer {
@Override
public T deserialize(InputStream stream, TypeReference typeReference) {
diff --git a/sdk/core/azure-core-test/src/main/java/com/azure/core/test/implementation/RestProxyTests.java b/sdk/core/azure-core-test/src/main/java/com/azure/core/test/implementation/RestProxyTests.java
index 995f6b8f7bf4c..5a27437fe75cc 100644
--- a/sdk/core/azure-core-test/src/main/java/com/azure/core/test/implementation/RestProxyTests.java
+++ b/sdk/core/azure-core-test/src/main/java/com/azure/core/test/implementation/RestProxyTests.java
@@ -539,7 +539,7 @@ public void syncPutRequestWithBodyAndEqualContentLength() {
@Test
public void syncPutRequestWithBodyLessThanContentLength() {
ByteBuffer body = ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8));
- UnexpectedLengthException unexpectedLengthException = assertThrows(UnexpectedLengthException.class, () -> {
+ Exception unexpectedLengthException = assertThrows(Exception.class, () -> {
createService(Service9.class).putBodyAndContentLength(body, 5L);
body.clear();
});
@@ -549,7 +549,7 @@ public void syncPutRequestWithBodyLessThanContentLength() {
@Test
public void syncPutRequestWithBodyMoreThanContentLength() {
ByteBuffer body = ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8));
- UnexpectedLengthException unexpectedLengthException = assertThrows(UnexpectedLengthException.class, () -> {
+ Exception unexpectedLengthException = assertThrows(Exception.class, () -> {
createService(Service9.class).putBodyAndContentLength(body, 3L);
body.clear();
});
@@ -573,7 +573,9 @@ public void asyncPutRequestWithBodyAndLessThanContentLength() {
Flux body = Flux.just(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)));
StepVerifier.create(createService(Service9.class).putAsyncBodyAndContentLength(body, 5L))
.verifyErrorSatisfies(exception -> {
- assertTrue(exception instanceof UnexpectedLengthException);
+ assertTrue(exception instanceof UnexpectedLengthException
+ || (exception.getSuppressed().length > 0
+ && exception.getSuppressed()[0] instanceof UnexpectedLengthException));
assertTrue(exception.getMessage().contains("less than"));
});
}
@@ -583,7 +585,9 @@ public void asyncPutRequestWithBodyAndMoreThanContentLength() {
Flux body = Flux.just(ByteBuffer.wrap("test".getBytes(StandardCharsets.UTF_8)));
StepVerifier.create(createService(Service9.class).putAsyncBodyAndContentLength(body, 3L))
.verifyErrorSatisfies(exception -> {
- assertTrue(exception instanceof UnexpectedLengthException);
+ assertTrue(exception instanceof UnexpectedLengthException
+ || (exception.getSuppressed().length > 0
+ && exception.getSuppressed()[0] instanceof UnexpectedLengthException));
assertTrue(exception.getMessage().contains("more than"));
});
}
diff --git a/sdk/core/azure-core-tracing-opentelemetry/pom.xml b/sdk/core/azure-core-tracing-opentelemetry/pom.xml
index ac91cb7ed171d..acfa3575b2a3c 100644
--- a/sdk/core/azure-core-tracing-opentelemetry/pom.xml
+++ b/sdk/core/azure-core-tracing-opentelemetry/pom.xml
@@ -115,7 +115,7 @@
com.azure
azure-security-keyvault-secrets
- 4.4.1
+ 4.4.2
test
diff --git a/sdk/cosmos/azure-cosmos-benchmark/pom.xml b/sdk/cosmos/azure-cosmos-benchmark/pom.xml
index 7bf5959607b99..c36e0a7fae47b 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/pom.xml
+++ b/sdk/cosmos/azure-cosmos-benchmark/pom.xml
@@ -171,7 +171,7 @@ Licensed under the MIT License.
com.azure
azure-security-keyvault-keys
- 4.4.1
+ 4.4.2
compile