diff --git a/eng/code-quality-reports/src/main/java/com/azure/tools/checkstyle/checks/GoodLoggingCheck.java b/eng/code-quality-reports/src/main/java/com/azure/tools/checkstyle/checks/GoodLoggingCheck.java index 0ab97e15b83cc..420d566fc4212 100644 --- a/eng/code-quality-reports/src/main/java/com/azure/tools/checkstyle/checks/GoodLoggingCheck.java +++ b/eng/code-quality-reports/src/main/java/com/azure/tools/checkstyle/checks/GoodLoggingCheck.java @@ -32,6 +32,15 @@ public class GoodLoggingCheck extends AbstractCheck { private static final String CLIENT_LOGGER = "ClientLogger"; private static final String LOGGER = "logger"; private static final String STATIC_LOGGER_ERROR = "Use a static ClientLogger instance in a static method."; + private static final int[] REQUIRED_TOKENS = new int[]{ + TokenTypes.IMPORT, + TokenTypes.INTERFACE_DEF, + TokenTypes.CLASS_DEF, + TokenTypes.LITERAL_NEW, + TokenTypes.VARIABLE_DEF, + TokenTypes.METHOD_CALL, + TokenTypes.METHOD_DEF + }; private static final String LOGGER_NAME_ERROR = "ClientLogger instance naming: use ''%s'' instead of ''%s'' for consistency."; @@ -42,7 +51,7 @@ public class GoodLoggingCheck extends AbstractCheck { // Boolean indicator that indicates if the java class imports ClientLogger private boolean hasClientLoggerImported; // A LIFO queue stores the class names, pop top element if exist the class name AST node - private Queue classNameDeque = Collections.asLifoQueue(new ArrayDeque<>()); + private final Queue classNameDeque = Collections.asLifoQueue(new ArrayDeque<>()); // Collection of Invalid logging packages private static final Set INVALID_LOGS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( "org.slf4j", "org.apache.logging.log4j", "java.util.logging" @@ -60,14 +69,7 @@ public int[] getAcceptableTokens() { @Override public int[] getRequiredTokens() { - return new int[] { - TokenTypes.IMPORT, - TokenTypes.CLASS_DEF, - TokenTypes.LITERAL_NEW, - TokenTypes.VARIABLE_DEF, - TokenTypes.METHOD_CALL, - TokenTypes.METHOD_DEF - }; + return REQUIRED_TOKENS; } @Override @@ -96,6 +98,7 @@ public void visitToken(DetailAST ast) { }); break; case TokenTypes.CLASS_DEF: + case TokenTypes.INTERFACE_DEF: classNameDeque.offer(ast.findFirstToken(TokenTypes.IDENT).getText()); break; case TokenTypes.LITERAL_NEW: @@ -194,21 +197,20 @@ private void checkForInvalidStaticLoggerUsage(DetailAST methodDefToken) { // if not a static method if (!(TokenUtil.findFirstTokenByPredicate(methodDefToken, node -> node.branchContains(TokenTypes.LITERAL_STATIC)).isPresent())) { - + // error if static `LOGGER` present, LOGGER.* if (methodDefToken.findFirstToken(TokenTypes.SLIST) != null) { - TokenUtil - .forEachChild(methodDefToken.findFirstToken(TokenTypes.SLIST), TokenTypes.EXPR, (exprToken) -> { - if (exprToken != null) { - DetailAST methodCallToken = exprToken.findFirstToken(TokenTypes.METHOD_CALL); - if (methodCallToken != null && methodCallToken.findFirstToken(TokenTypes.DOT) != null) { - if (methodCallToken.findFirstToken(TokenTypes.DOT) - .findFirstToken(TokenTypes.IDENT).getText().equals(LOGGER.toUpperCase())) { - log(methodDefToken, STATIC_LOGGER_ERROR); - } + TokenUtil.forEachChild(methodDefToken.findFirstToken(TokenTypes.SLIST), TokenTypes.EXPR, exprToken -> { + if (exprToken != null) { + DetailAST methodCallToken = exprToken.findFirstToken(TokenTypes.METHOD_CALL); + if (methodCallToken != null && methodCallToken.findFirstToken(TokenTypes.DOT) != null) { + if (methodCallToken.findFirstToken(TokenTypes.DOT) + .findFirstToken(TokenTypes.IDENT).getText().equals(LOGGER.toUpperCase())) { + log(methodDefToken, STATIC_LOGGER_ERROR); } } - }); + } + }); } } } diff --git a/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/package-info.java b/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/package-info.java new file mode 100644 index 0000000000000..1f021260c8e21 --- /dev/null +++ b/sdk/core/azure-core-http-netty/src/main/java/com/azure/core/http/netty/implementation/package-info.java @@ -0,0 +1,7 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * Package containing implementation details. + */ +package com.azure.core.http.netty.implementation; diff --git a/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClient.java b/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClient.java index a5b2a09cbd278..f0537c3837b15 100644 --- a/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClient.java +++ b/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClient.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; -import java.util.function.Function; /** * HttpClient implementation for OkHttp. @@ -64,9 +63,13 @@ public Mono send(HttpRequest request, Context context) { // but block on the thread backing flux. This ignore any subscribeOn applied to send(r) // toOkHttpRequest(request).subscribe(okHttpRequest -> { - Call call = httpClient.newCall(okHttpRequest); - call.enqueue(new OkHttpCallback(sink, request, eagerlyReadResponse)); - sink.onCancel(call::cancel); + try { + Call call = httpClient.newCall(okHttpRequest); + call.enqueue(new OkHttpCallback(sink, request, eagerlyReadResponse)); + sink.onCancel(call::cancel); + } catch (Exception ex) { + sink.error(ex); + } }, sink::error); })); } @@ -78,29 +81,26 @@ public Mono send(HttpRequest request, Context context) { * @return the Mono emitting okhttp request */ private static Mono toOkHttpRequest(HttpRequest request) { - return Mono.just(new okhttp3.Request.Builder()) - .map(rb -> { - rb.url(request.getUrl()); - if (request.getHeaders() != null) { - for (HttpHeader hdr : request.getHeaders()) { - // OkHttp allows for headers with multiple values, but it treats them as separate headers, - // therefore, we must call rb.addHeader for each value, using the same key for all of them - hdr.getValuesList().forEach(value -> rb.addHeader(hdr.getName(), value)); - } - } - return rb; - }) - .flatMap((Function>) rb -> { - if (request.getHttpMethod() == HttpMethod.GET) { - return Mono.just(rb.get()); - } else if (request.getHttpMethod() == HttpMethod.HEAD) { - return Mono.just(rb.head()); - } else { - return toOkHttpRequestBody(request.getBody(), request.getHeaders()) - .map(requestBody -> rb.method(request.getHttpMethod().toString(), requestBody)); - } - }) - .map(Request.Builder::build); + Request.Builder requestBuilder = new Request.Builder() + .url(request.getUrl()); + + if (request.getHeaders() != null) { + for (HttpHeader hdr : request.getHeaders()) { + // OkHttp allows for headers with multiple values, but it treats them as separate headers, + // therefore, we must call rb.addHeader for each value, using the same key for all of them + hdr.getValuesList().forEach(value -> requestBuilder.addHeader(hdr.getName(), value)); + } + } + + if (request.getHttpMethod() == HttpMethod.GET) { + return Mono.just(requestBuilder.get().build()); + } else if (request.getHttpMethod() == HttpMethod.HEAD) { + return Mono.just(requestBuilder.head().build()); + } + + return toOkHttpRequestBody(request.getBody(), request.getHeaders()) + .map(okhttpRequestBody -> requestBuilder.method(request.getHttpMethod().toString(), okhttpRequestBody) + .build()); } /** @@ -117,11 +117,9 @@ private static Mono toOkHttpRequestBody(Flux bbFlux, Ht return bsMono.map(bs -> { String contentType = headers.getValue("Content-Type"); - if (contentType == null) { - return RequestBody.create(bs, null); - } else { - return RequestBody.create(bs, MediaType.parse(contentType)); - } + MediaType mediaType = (contentType == null) ? null : MediaType.parse(contentType); + + return RequestBody.create(bs, mediaType); }); } @@ -146,9 +144,7 @@ private static Mono toByteString(Flux bbFlux) { } catch (IOException ioe) { throw Exceptions.propagate(ioe); } - }) - .map(b -> ByteString.of(b.readByteArray())), - okio.Buffer::clear) + }).map(b -> ByteString.of(b.readByteArray())), okio.Buffer::clear) .switchIfEmpty(EMPTY_BYTE_STRING_MONO); } @@ -163,11 +159,13 @@ private static class OkHttpCallback implements okhttp3.Callback { this.eagerlyReadResponse = eagerlyReadResponse; } + @SuppressWarnings("NullableProblems") @Override public void onFailure(okhttp3.Call call, IOException e) { sink.error(e); } + @SuppressWarnings("NullableProblems") @Override public void onResponse(okhttp3.Call call, okhttp3.Response response) { /* diff --git a/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/package-info.java b/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/package-info.java new file mode 100644 index 0000000000000..23b5eea78c143 --- /dev/null +++ b/sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/package-info.java @@ -0,0 +1,7 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * Package containing implementation details. + */ +package com.azure.core.http.okhttp.implementation; diff --git a/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientTests.java b/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientTests.java index 714abd0994e3f..09a16a9fae575 100644 --- a/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientTests.java +++ b/sdk/core/azure-core-http-okhttp/src/test/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClientTests.java @@ -14,7 +14,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -40,6 +39,7 @@ import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.get; import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertLinesMatch; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -66,6 +66,7 @@ public static void beforeClass() { server.stubFor(post("/shortPost").willReturn(aResponse().withBody(SHORT_BODY))); server.stubFor(get(RETURN_HEADERS_AS_IS_PATH).willReturn(aResponse() .withTransformers(OkHttpAsyncHttpClientResponseTransformer.NAME))); + server.start(); } @@ -87,25 +88,33 @@ public void testFlowableResponseLongBodyAsByteArrayAsync() { } @Test - @Disabled("This tests behaviour of reactor netty's ByteBufFlux, not applicable for OkHttp") public void testMultipleSubscriptionsEmitsError() { HttpResponse response = getResponse("/short"); + // Subscription:1 - response.getBodyAsByteArray().block(); + StepVerifier.create(response.getBodyAsByteArray()) + .assertNext(Assertions::assertNotNull) + .expectComplete() + .verify(Duration.ofSeconds(20)); + // Subscription:2 + // Getting the bytes of an OkHttp response closes the stream on first read. + // Subsequent reads will return an IllegalStateException due to the stream being closed. StepVerifier.create(response.getBodyAsByteArray()) - .expectNextCount(0) // TODO: Check with smaldini, what is the verifier operator equivalent to .awaitDone(20, TimeUnit.SECONDS) - .verifyError(IllegalStateException.class); + .expectNextCount(0) + .expectError(IllegalStateException.class) + .verify(Duration.ofSeconds(20)); } @Test public void testFlowableWhenServerReturnsBodyAndNoErrorsWhenHttp500Returned() { HttpResponse response = getResponse("/error"); - StepVerifier.create(response.getBodyAsString()) - .expectNext("error") // TODO: .awaitDone(20, TimeUnit.SECONDS) [See previous todo] - .verifyComplete(); assertEquals(500, response.getStatusCode()); + StepVerifier.create(response.getBodyAsString()) + .expectNext("error") + .expectComplete() + .verify(Duration.ofSeconds(20)); } @Test @@ -128,7 +137,7 @@ public void testFlowableBackpressure() { @Test public void testRequestBodyIsErrorShouldPropagateToResponse() { - HttpClient client = HttpClient.createDefault(); + HttpClient client = new OkHttpAsyncClientProvider().createInstance(); HttpRequest request = new HttpRequest(HttpMethod.POST, url(server, "/shortPost")) .setHeader("Content-Length", "123") .setBody(Flux.error(new RuntimeException("boo"))); @@ -140,7 +149,7 @@ public void testRequestBodyIsErrorShouldPropagateToResponse() { @Test public void testRequestBodyEndsInErrorShouldPropagateToResponse() { - HttpClient client = HttpClient.createDefault(); + HttpClient client = new OkHttpAsyncClientProvider().createInstance(); String contentChunk = "abcdefgh"; int repetitions = 1000; HttpRequest request = new HttpRequest(HttpMethod.POST, url(server, "/shortPost")) @@ -149,10 +158,14 @@ public void testRequestBodyEndsInErrorShouldPropagateToResponse() { .repeat(repetitions) .map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) .concatWith(Flux.error(new RuntimeException("boo")))); - StepVerifier.create(client.send(request)) - // .awaitDone(10, TimeUnit.SECONDS) - .expectErrorMessage("boo") - .verify(); + + try { + StepVerifier.create(client.send(request)) + .expectErrorMessage("boo") + .verify(Duration.ofSeconds(10)); + } catch (Exception ex) { + assertEquals("boo", ex.getMessage()); + } } @Test @@ -200,12 +213,12 @@ public void testServerShutsDownSocketShouldPushErrorToContentFlowable() { }); } - @Disabled("This flakey test fails often on MacOS. https://github.com/Azure/azure-sdk-for-java/issues/4357.") @Test public void testConcurrentRequests() throws NoSuchAlgorithmException { int numRequests = 100; // 100 = 1GB of data read - HttpClient client = HttpClient.createDefault(); + HttpClient client = new OkHttpAsyncClientProvider().createInstance(); byte[] expectedDigest = digest(LONG_BODY); + long expectedByteCount = (long) numRequests * LONG_BODY.getBytes(StandardCharsets.UTF_8).length; Mono numBytesMono = Flux.range(1, numRequests) .parallel(10) @@ -213,29 +226,17 @@ public void testConcurrentRequests() throws NoSuchAlgorithmException { .flatMap(n -> Mono.fromCallable(() -> getResponse(client, "/long")).flatMapMany(response -> { MessageDigest md = md5Digest(); return response.getBody() - .doOnNext(md::update) - .map(bb -> new NumberedByteBuffer(n, bb)) -// .doOnComplete(() -> System.out.println("completed " + n)) - .doOnComplete(() -> Assertions.assertArrayEquals(expectedDigest, - md.digest(), "wrong digest!")); + .doOnNext(buffer -> md.update(buffer.duplicate())) + .doOnComplete(() -> assertArrayEquals(expectedDigest, md.digest(), "wrong digest!")); })) .sequential() - // enable the doOnNext call to see request numbers and thread names - // .doOnNext(g -> System.out.println(g.n + " " + - // Thread.currentThread().getName())) - .map(nbb -> (long) nbb.bb.limit()) - .reduce(Long::sum) - .subscribeOn(Schedulers.boundedElastic()); + .map(buffer -> (long) buffer.remaining()) + .reduce(Long::sum); StepVerifier.create(numBytesMono) -// .awaitDone(timeoutSeconds, TimeUnit.SECONDS) - .expectNext((long) (numRequests * LONG_BODY.getBytes(StandardCharsets.UTF_8).length)) - .verifyComplete(); -// -// long numBytes = numBytesMono.block(); -// t = System.currentTimeMillis() - t; -// System.out.println("totalBytesRead=" + numBytes / 1024 / 1024 + "MB in " + t / 1000.0 + "s"); -// assertEquals(numRequests * LONG_BODY.getBytes(StandardCharsets.UTF_8).length, numBytes); + .expectNext(expectedByteCount) + .expectComplete() + .verify(Duration.ofSeconds(60)); } @Test @@ -284,16 +285,6 @@ private static byte[] digest(String s) throws NoSuchAlgorithmException { return md.digest(); } - private static final class NumberedByteBuffer { - final long n; - final ByteBuffer bb; - - NumberedByteBuffer(long n, ByteBuffer bb) { - this.n = n; - this.bb = bb; - } - } - private static HttpResponse getResponse(String path) { HttpClient client = new OkHttpAsyncHttpClientBuilder().build(); return getResponse(client, path); diff --git a/sdk/core/azure-core/pom.xml b/sdk/core/azure-core/pom.xml index ab88f5934cd72..e7f2494f7fd8a 100644 --- a/sdk/core/azure-core/pom.xml +++ b/sdk/core/azure-core/pom.xml @@ -265,6 +265,7 @@ --add-opens com.azure.core/com.azure.core.implementation.models.jsonflatten=com.fasterxml.jackson.databind --add-opens com.azure.core/com.azure.core.implementation.models.jsonflatten=ALL-UNNAMED --add-opens com.azure.core/com.azure.core.implementation.serializer=ALL-UNNAMED + --add-opens com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED --add-opens com.azure.core/com.azure.core.models=ALL-UNNAMED --add-opens com.azure.core/com.azure.core.util=ALL-UNNAMED --add-opens com.azure.core/com.azure.core.util.jsonpatch=ALL-UNNAMED diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpRequest.java b/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpRequest.java index dad9a966502d6..1f903a7d59823 100644 --- a/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpRequest.java +++ b/sdk/core/azure-core/src/main/java/com/azure/core/http/HttpRequest.java @@ -3,17 +3,18 @@ package com.azure.core.http; +import com.azure.core.implementation.util.FluxByteBufferContent; +import com.azure.core.util.RequestContent; import com.azure.core.util.logging.ClientLogger; import reactor.core.publisher.Flux; import java.net.MalformedURLException; import java.net.URL; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; /** - * The outgoing Http request. It provides ways to construct {@link HttpRequest} with {@link HttpMethod}, - * {@link URL}, {@link HttpHeader} and request body. + * The outgoing Http request. It provides ways to construct {@link HttpRequest} with {@link HttpMethod}, {@link URL}, + * {@link HttpHeader} and request body. */ public class HttpRequest { private final ClientLogger logger = new ClientLogger(HttpRequest.class); @@ -21,7 +22,7 @@ public class HttpRequest { private HttpMethod httpMethod; private URL url; private HttpHeaders headers; - private Flux body; + private RequestContent requestContent; /** * Create a new HttpRequest instance. @@ -30,9 +31,7 @@ public class HttpRequest { * @param url the target address to send the request to */ public HttpRequest(HttpMethod httpMethod, URL url) { - this.httpMethod = httpMethod; - this.url = url; - this.headers = new HttpHeaders(); + this(httpMethod, url, new HttpHeaders(), (RequestContent) null); } /** @@ -61,10 +60,22 @@ public HttpRequest(HttpMethod httpMethod, String url) { * @param body the request content */ public HttpRequest(HttpMethod httpMethod, URL url, HttpHeaders headers, Flux body) { + this(httpMethod, url, headers, new FluxByteBufferContent(body)); + } + + /** + * Creates a new {@link HttpRequest} instance. + * + * @param httpMethod The HTTP request method. + * @param url The target address to send the request. + * @param headers The HTTP headers of the request. + * @param requestContent The {@link RequestContent}. + */ + public HttpRequest(HttpMethod httpMethod, URL url, HttpHeaders headers, RequestContent requestContent) { this.httpMethod = httpMethod; this.url = url; this.headers = headers; - this.body = body; + this.requestContent = requestContent; } /** @@ -144,8 +155,8 @@ public HttpRequest setHeaders(HttpHeaders headers) { } /** - * Set a request header, replacing any existing value. - * A null for {@code value} will remove the header if one with matching name exists. + * Set a request header, replacing any existing value. A null for {@code value} will remove the header if one with + * matching name exists. * * @param name the header name * @param value the header value @@ -162,57 +173,92 @@ public HttpRequest setHeader(String name, String value) { * @return the content to be send */ public Flux getBody() { - return body; + return (requestContent == null) ? null : requestContent.asFluxByteBuffer(); } /** * Set the request content. + *

+ * The Content-Length header will be set based on the given content's length. * * @param content the request content * @return this HttpRequest */ public HttpRequest setBody(String content) { - final byte[] bodyBytes = content.getBytes(StandardCharsets.UTF_8); - return setBody(bodyBytes); + return setRequestContent(RequestContent.fromString(content)); } /** * Set the request content. - * The Content-Length header will be set based on the given content's length + *

+ * The Content-Length header will be set based on the given content's length. * * @param content the request content * @return this HttpRequest */ public HttpRequest setBody(byte[] content) { - headers.set("Content-Length", String.valueOf(content.length)); + setContentLength(content.length); return setBody(Flux.defer(() -> Flux.just(ByteBuffer.wrap(content)))); } /** * Set request content. - * - * Caller must set the Content-Length header to indicate the length of the content, - * or use Transfer-Encoding: chunked. + *

+ * Caller must set the Content-Length header to indicate the length of the content, or use Transfer-Encoding: + * chunked. * * @param content the request content * @return this HttpRequest */ public HttpRequest setBody(Flux content) { - this.body = content; + this.requestContent = new FluxByteBufferContent(content); + return this; + } + + /** + * Gets the HttpRequest's {@link RequestContent}. + * + * @return The {@link RequestContent}. + */ + public RequestContent getRequestContent() { + return this.requestContent; + } + + /** + * Sets the {@link RequestContent}. + *

+ * If {@link RequestContent#getLength()} returns null for the passed {@link RequestContent} the caller must set the + * Content-Length header to indicate the length of the content, or use Transfer-Encoding: chunked. Otherwise, {@link + * RequestContent#getLength()} will be used to set the Content-Length header. + * + * @param requestContent The {@link RequestContent}. + * @return The updated HttpRequest object. + */ + public HttpRequest setRequestContent(RequestContent requestContent) { + Long requestContentLength = requestContent.getLength(); + if (requestContentLength != null) { + setContentLength(requestContentLength); + } + + this.requestContent = requestContent; return this; } + private void setContentLength(long contentLength) { + headers.set("Content-Length", String.valueOf(contentLength)); + } + /** * Creates a copy of the request. * - * The main purpose of this is so that this HttpRequest can be changed and the resulting - * HttpRequest can be a backup. This means that the cloned HttpHeaders and body must - * not be able to change from side effects of this HttpRequest. + * The main purpose of this is so that this HttpRequest can be changed and the resulting HttpRequest can be a + * backup. This means that the cloned HttpHeaders and body must not be able to change from side effects of this + * HttpRequest. * * @return a new HTTP request instance with cloned instances of all mutable properties. */ public HttpRequest copy() { final HttpHeaders bufferedHeaders = new HttpHeaders(headers); - return new HttpRequest(httpMethod, url, bufferedHeaders, body); + return new HttpRequest(httpMethod, url, bufferedHeaders, requestContent); } } diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ArrayContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ArrayContent.java new file mode 100644 index 0000000000000..cae8dd1510fa3 --- /dev/null +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ArrayContent.java @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.implementation.util; + +import com.azure.core.util.CoreUtils; +import com.azure.core.util.RequestContent; +import reactor.core.publisher.Flux; + +import java.nio.ByteBuffer; + +/** + * A {@link RequestContent} implementation which is backed by a {@code byte[]}. + */ +public final class ArrayContent extends RequestContent { + private final byte[] content; + private final int offset; + private final int length; + + /** + * Creates a new instance of {@link ArrayContent}. + * + * @param content The {@code byte[]} content. + * @param offset The offset in the array to begin reading data. + * @param length The length of the content. + */ + public ArrayContent(byte[] content, int offset, int length) { + this.content = CoreUtils.clone(content); + this.offset = offset; + this.length = length; + } + + @Override + public Flux asFluxByteBuffer() { + return Flux.defer(() -> Flux.just(ByteBuffer.wrap(content, offset, length))); + } + + @Override + public Long getLength() { + return (long) length; + } +} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ByteBufferContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ByteBufferContent.java new file mode 100644 index 0000000000000..14c892da06a3e --- /dev/null +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ByteBufferContent.java @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.implementation.util; + +import com.azure.core.util.RequestContent; +import reactor.core.publisher.Flux; + +import java.nio.ByteBuffer; + +/** + * A {@link RequestContent} implementation which is backed by a {@link ByteBuffer}. + */ +public final class ByteBufferContent extends RequestContent { + private final ByteBuffer byteBuffer; + private final long length; + + /** + * Creates a new instance of {@link ByteBufferContent}. + * + * @param byteBuffer The {@link ByteBuffer} content. + */ + public ByteBufferContent(ByteBuffer byteBuffer) { + this.byteBuffer = byteBuffer; + this.length = byteBuffer.remaining(); + } + + @Override + public Flux asFluxByteBuffer() { + // Duplicate the ByteBuffer so that each invocation of this method uses a fully readable ByteBuffer. + return Flux.defer(() -> Flux.just(byteBuffer.duplicate())); + } + + @Override + public Long getLength() { + return length; + } +} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FileContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FileContent.java new file mode 100644 index 0000000000000..251fbf0456b49 --- /dev/null +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FileContent.java @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.implementation.util; + +import com.azure.core.util.RequestContent; +import com.azure.core.util.logging.ClientLogger; +import reactor.core.Exceptions; +import reactor.core.publisher.Flux; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; + +/** + * A {@link RequestContent} implementation which is backed by a file. + */ +public final class FileContent extends RequestContent { + private final ClientLogger logger = new ClientLogger(FileContent.class); + + private final Path file; + private final long offset; + private final long length; + private final int chunkSize; + + /** + * Creates a new instance of {@link FileContent}. + * + * @param file The {@link Path} content. + * @param offset The offset in the {@link Path} to begin reading data. + * @param length The length of the content. + * @param chunkSize The requested size for each read of the path. + */ + public FileContent(Path file, long offset, long length, int chunkSize) { + this.file = file; + this.offset = offset; + this.length = length; + this.chunkSize = chunkSize; + } + + @Override + public Flux asFluxByteBuffer() { + return Flux.using(() -> FileChannel.open(file), channel -> Flux.generate(() -> 0, (count, sink) -> { + if (count == length) { + sink.complete(); + return count; + } + + int readCount = (int) Math.min(chunkSize, length - count); + try { + sink.next(channel.map(FileChannel.MapMode.READ_ONLY, offset + count, readCount)); + } catch (IOException ex) { + sink.error(ex); + } + + return count + readCount; + }), channel -> { + try { + channel.close(); + } catch (IOException ex) { + throw logger.logExceptionAsError(Exceptions.propagate(ex)); + } + }); + } + + @Override + public Long getLength() { + return length; + } +} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FluxByteBufferContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FluxByteBufferContent.java new file mode 100644 index 0000000000000..14a4b793c7ffc --- /dev/null +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FluxByteBufferContent.java @@ -0,0 +1,47 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.implementation.util; + +import com.azure.core.util.RequestContent; +import reactor.core.publisher.Flux; + +import java.nio.ByteBuffer; + +/** + * A {@link RequestContent} implementation which is backed by a {@link Flux} of {@link ByteBuffer}. + */ +public class FluxByteBufferContent extends RequestContent { + private final Flux content; + private final Long length; + + /** + * Creates a new instance of {@link FluxByteBufferContent}. + * + * @param content The {@link Flux} of {@link ByteBuffer} content. + */ + public FluxByteBufferContent(Flux content) { + this(content, null); + } + + /** + * Creates a new instance of {@link FluxByteBufferContent}. + * + * @param content The {@link Flux} of {@link ByteBuffer} content. + * @param length The length of the content, may be null. + */ + public FluxByteBufferContent(Flux content, Long length) { + this.content = content; + this.length = length; + } + + @Override + public Flux asFluxByteBuffer() { + return content; + } + + @Override + public Long getLength() { + return length; + } +} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/InputStreamContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/InputStreamContent.java new file mode 100644 index 0000000000000..2cb6f81d0cda2 --- /dev/null +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/InputStreamContent.java @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.implementation.util; + +import com.azure.core.util.FluxUtil; +import com.azure.core.util.RequestContent; +import reactor.core.publisher.Flux; + +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * A {@link RequestContent} implementation which is backed by an {@link InputStream}. + */ +public class InputStreamContent extends RequestContent { + private final InputStream content; + private final Long length; + private final int chunkSize; + + /** + * Creates a new instance of {@link InputStreamContent}. + * + * @param content The {@link InputStream} content. + * @param length The length of the content, may be null. + * @param chunkSize The requested size for each {@link InputStream#read(byte[])}. + */ + public InputStreamContent(InputStream content, Long length, int chunkSize) { + this.content = content; + this.length = length; + this.chunkSize = chunkSize; + } + + @Override + public Flux asFluxByteBuffer() { + return FluxUtil.toFluxByteBuffer(content, chunkSize); + } + + @Override + public Long getLength() { + return length; + } +} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/SerializableContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/SerializableContent.java new file mode 100644 index 0000000000000..11d6478a36bd4 --- /dev/null +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/SerializableContent.java @@ -0,0 +1,44 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.implementation.util; + +import com.azure.core.util.RequestContent; +import com.azure.core.util.serializer.ObjectSerializer; +import reactor.core.publisher.Flux; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A {@link RequestContent} implementation which is backed by a serializable object. + */ +public final class SerializableContent extends RequestContent { + private final Object serializable; + private final ObjectSerializer objectSerializer; + + private final AtomicReference serializedObject = new AtomicReference<>(); + + /** + * Creates a new instance of {@link SerializableContent}. + * + * @param serializable The serializable {@link Object} content. + * @param objectSerializer The {@link ObjectSerializer} that will serialize the {@link Object} content. + */ + public SerializableContent(Object serializable, ObjectSerializer objectSerializer) { + this.serializable = serializable; + this.objectSerializer = objectSerializer; + } + + @Override + public Flux asFluxByteBuffer() { + serializedObject.compareAndSet(null, objectSerializer.serializeToBytes(serializable)); + + return Flux.defer(() -> Flux.just(ByteBuffer.wrap(serializedObject.get()).asReadOnlyBuffer())); + } + + @Override + public Long getLength() { + return null; + } +} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/package-info.java b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/package-info.java new file mode 100644 index 0000000000000..5880dc06a0541 --- /dev/null +++ b/sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/package-info.java @@ -0,0 +1,7 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +/** + * Package containing implementation utilities. + */ +package com.azure.core.implementation.util; diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/BufferedFluxByteBuffer.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/BufferedFluxByteBuffer.java new file mode 100644 index 0000000000000..126cba09fa0b6 --- /dev/null +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/BufferedFluxByteBuffer.java @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.util; + +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; + +import java.nio.ByteBuffer; + +/** + * A {@code Flux} implementation which buffers the contents of the passed {@code Flux} before + * emitting them downstream. + */ +final class BufferedFluxByteBuffer extends Flux { + private final Flux flux; + + /** + * Creates a new instance of {@link BufferedFluxByteBuffer}. + * + * @param flux The {@code Flux} to buffer. + */ + BufferedFluxByteBuffer(Flux flux) { + this.flux = flux.map(buffer -> { + ByteBuffer duplicate = ByteBuffer.allocate(buffer.remaining()); + duplicate.put(buffer); + duplicate.rewind(); + return duplicate; + }).cache().map(ByteBuffer::duplicate); + } + + @Override + public void subscribe(CoreSubscriber actual) { + flux.subscribe(actual); + } +} diff --git a/sdk/core/azure-core/src/main/java/com/azure/core/util/RequestContent.java b/sdk/core/azure-core/src/main/java/com/azure/core/util/RequestContent.java new file mode 100644 index 0000000000000..18b0d5754f162 --- /dev/null +++ b/sdk/core/azure-core/src/main/java/com/azure/core/util/RequestContent.java @@ -0,0 +1,304 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.util; + +import com.azure.core.implementation.util.ArrayContent; +import com.azure.core.implementation.util.ByteBufferContent; +import com.azure.core.implementation.util.FileContent; +import com.azure.core.implementation.util.FluxByteBufferContent; +import com.azure.core.implementation.util.InputStreamContent; +import com.azure.core.implementation.util.SerializableContent; +import com.azure.core.util.logging.ClientLogger; +import com.azure.core.util.serializer.JsonSerializerProviders; +import com.azure.core.util.serializer.ObjectSerializer; +import reactor.core.publisher.Flux; + +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.Objects; + +/** + * Represents the content sent as part of a request. + */ +public abstract class RequestContent { + private static final ClientLogger LOGGER = new ClientLogger(RequestContent.class); + + /** + * Converts the {@link RequestContent} into a {@code Flux} for use in reactive streams. + * + * @return The {@link RequestContent} as a {@code Flux}. + */ + public abstract Flux asFluxByteBuffer(); + + /** + * Gets the length of the {@link RequestContent} if it is able to be calculated. + *

+ * If the content length isn't able to be calculated null will be returned. + * + * @return The length of the {@link RequestContent} if it is able to be calculated, otherwise null. + */ + public abstract Long getLength(); + + /** + * Creates a {@link RequestContent} that uses {@code byte[]} as its data. + * + * @param bytes The bytes that will be the {@link RequestContent} data. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code bytes} is null. + */ + public static RequestContent fromBytes(byte[] bytes) { + Objects.requireNonNull(bytes, "'bytes' cannot be null."); + return fromBytes(bytes, 0, bytes.length); + } + + /** + * Creates a {@link RequestContent} that uses {@code byte[]} as its data. + * + * @param bytes The bytes that will be the {@link RequestContent} data. + * @param offset Offset in the bytes where the data will begin. + * @param length Length of the data. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code bytes} is null. + * @throws IllegalArgumentException If {@code offset} or {@code length} are negative or {@code offset} plus {@code + * length} is greater than {@code bytes.length}. + */ + public static RequestContent fromBytes(byte[] bytes, int offset, int length) { + Objects.requireNonNull(bytes, "'bytes' cannot be null."); + if (offset < 0) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException("'offset' cannot be negative.")); + } + if (length < 0) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException("'length' cannot be negative.")); + } + if (offset + length > bytes.length) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException( + "'offset' plus 'length' cannot be greater than 'bytes.length'.")); + } + + return new ArrayContent(bytes, offset, length); + } + + /** + * Creates a {@link RequestContent} that uses {@link String} as its data. + *

+ * The passed {@link String} is converted using {@link StandardCharsets#UTF_8}, if another character set is required + * use {@link #fromBytes(byte[])} and pass {@link String#getBytes(Charset)} using the required character set. + * + * @param content The string that will be the {@link RequestContent} data. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code content} is null. + */ + public static RequestContent fromString(String content) { + Objects.requireNonNull(content, "'content' cannot be null."); + return fromBytes(content.getBytes(StandardCharsets.UTF_8)); + } + + /** + * Creates a {@link RequestContent} that uses {@link BinaryData} as its data. + * + * @param content The {@link BinaryData} that will be the {@link RequestContent} data. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code content} is null. + */ + public static RequestContent fromBinaryData(BinaryData content) { + Objects.requireNonNull(content, "'content' cannot be null."); + return new ByteBufferContent(content.toByteBuffer()); + } + + /** + * Creates a {@link RequestContent} that uses {@link Path} as its data. + * + * @param file The {@link Path} that will be the {@link RequestContent} data. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code file} is null. + */ + public static RequestContent fromFile(Path file) { + Objects.requireNonNull(file, "'file' cannot be null."); + return fromFile(file, 0, file.toFile().length(), 8092); + } + + /** + * Creates a {@link RequestContent} that uses {@link Path} as its data. + * + * @param file The {@link Path} that will be the {@link RequestContent} data. + * @param offset Offset in the {@link Path} where the data will begin. + * @param length Length of the data. + * @param chunkSize The requested size for each read of the path. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code file} is null. + * @throws IllegalArgumentException If {@code offset} or {@code length} are negative or {@code offset} plus {@code + * length} is greater than the file size or {@code chunkSize} is less than or equal to 0. + */ + public static RequestContent fromFile(Path file, long offset, long length, int chunkSize) { + Objects.requireNonNull(file, "'file' cannot be null."); + if (offset < 0) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException("'offset' cannot be negative.")); + } + if (length < 0) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException("'length' cannot be negative.")); + } + if (offset + length > file.toFile().length()) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException( + "'offset' plus 'length' cannot be greater than the file's size.")); + } + if (chunkSize <= 0) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException( + "'chunkSize' cannot be less than or equal to 0.")); + } + + return new FileContent(file, offset, length, chunkSize); + } + + /** + * Creates a {@link RequestContent} that uses a serialized {@link Object} as its data. + *

+ * This uses an {@link ObjectSerializer} found on the classpath. + *

+ * The {@link RequestContent} returned has a null {@link #getLength()}, if the length of the content is needed use + * {@link BinaryData#fromObject(Object)} and {@link RequestContent#fromBinaryData(BinaryData)} to create the request + * content. + * + * @param serializable An {@link Object} that will be serialized to be the {@link RequestContent} data. + * @return A new {@link RequestContent}. + */ + public static RequestContent fromObject(Object serializable) { + return fromObject(serializable, JsonSerializerProviders.createInstance(true)); + } + + /** + * Creates a {@link RequestContent} that uses a serialized {@link Object} as its data. + *

+ * The {@link RequestContent} returned has a null {@link #getLength()}, if the length of the content is needed use + * {@link BinaryData#fromObject(Object, ObjectSerializer)} and {@link RequestContent#fromBinaryData(BinaryData)} to + * create the request content. + * + * @param serializable An {@link Object} that will be serialized to be the {@link RequestContent} data. + * @param serializer The {@link ObjectSerializer} that will serialize the {@link Object}. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code serializer} is null. + */ + public static RequestContent fromObject(Object serializable, ObjectSerializer serializer) { + Objects.requireNonNull(serializer, "'serializer' cannot be null."); + return new SerializableContent(serializable, serializer); + } + + /** + * Creates a {@link RequestContent} that uses a {@link Flux} of {@link ByteBuffer} as its data. + *

+ * {@link RequestContent#getLength()} will be null if this factory method is used, if the length needs to be + * non-null use {@link RequestContent#fromFlux(Flux, long)}. + *

+ * The {@link RequestContent} created by this factory method doesn't buffer the passed {@link Flux} of {@link + * ByteBuffer}, if the content must be replay-able the passed {@link Flux} of {@link ByteBuffer} must be replay-able + * as well. + * + * @param content The {@link Flux} of {@link ByteBuffer} that will be the {@link RequestContent} data. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code content} is null. + */ + public static RequestContent fromFlux(Flux content) { + Objects.requireNonNull(content, "'content' cannot be null."); + return new FluxByteBufferContent(content); + } + + /** + * Creates a {@link RequestContent} that uses a {@link Flux} of {@link ByteBuffer} as its data. + *

+ * The {@link RequestContent} created by this factory method doesn't buffer the passed {@link Flux} of {@link + * ByteBuffer}, if the content must be replay-able the passed {@link Flux} of {@link ByteBuffer} must be replay-able + * as well. + * + * @param content The {@link Flux} of {@link ByteBuffer} that will be the {@link RequestContent} data. + * @param length The length of the content. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code content} is null. + * @throws IllegalStateException If {@code length} is less than 0. + */ + public static RequestContent fromFlux(Flux content, long length) { + Objects.requireNonNull(content, "'content' cannot be null."); + if (length < 0) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException("'length' cannot be less than 0.")); + } + + return new FluxByteBufferContent(content, length); + } + + /** + * Creates a {@link RequestContent} that uses a {@link BufferedFluxByteBuffer} as its data. + *

+ * {@link RequestContent#getLength()} will be null if this factory method is used, if the length needs to be + * non-null use {@link RequestContent#fromBufferedFlux(BufferedFluxByteBuffer, long)}. + * + * @param content The {@link BufferedFluxByteBuffer} that will be the {@link RequestContent} data. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code content} is null. + */ + static RequestContent fromBufferedFlux(BufferedFluxByteBuffer content) { + Objects.requireNonNull(content, "'content' cannot be null."); + return new FluxByteBufferContent(content); + } + + /** + * Creates a {@link RequestContent} that uses a {@link BufferedFluxByteBuffer} as its data. + * + * @param content The {@link BufferedFluxByteBuffer} that will be the {@link RequestContent} data. + * @param length The length of the content. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code content} is null. + * @throws IllegalStateException If {@code length} is less than 0. + */ + static RequestContent fromBufferedFlux(BufferedFluxByteBuffer content, long length) { + Objects.requireNonNull(content, "'content' cannot be null."); + if (length < 0) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException("'length' cannot be less than 0.")); + } + + return new FluxByteBufferContent(content, length); + } + + /** + * Creates a {@link RequestContent} that uses an {@link InputStream} as its data. + *

+ * {@link RequestContent#getLength()} will be null if this factory method is used, if the length needs to be + * non-null use {@link RequestContent#fromInputStream(InputStream, long, int)}. + * + * @param content The {@link InputStream} that will be the {@link RequestContent} data. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code inputStream} is null. + */ + public static RequestContent fromInputStream(InputStream content) { + return fromInputStreamInternal(content, null, 8092); + } + + /** + * Creates a {@link RequestContent} that uses an {@link InputStream} as its data. + * + * @param content The {@link InputStream} that will be the {@link RequestContent} data. + * @param length The length of the content. + * @param chunkSize The requested size for each {@link InputStream#read(byte[])}. + * @return A new {@link RequestContent}. + * @throws NullPointerException If {@code inputStream} is null. + * @throws IllegalArgumentException If {@code length} is less than 0 or {@code chunkSize} is less than or equal to + * 0. + */ + public static RequestContent fromInputStream(InputStream content, long length, int chunkSize) { + return fromInputStreamInternal(content, length, chunkSize); + } + + private static RequestContent fromInputStreamInternal(InputStream content, Long length, int chunkSize) { + Objects.requireNonNull(content, "'content' cannot be null."); + if (length != null && length < 0) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException("'length' cannot be less than 0.")); + } + if (chunkSize <= 0) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException( + "'chunkSize' cannot be less than or equal to 0.")); + } + + return new InputStreamContent(content, length, chunkSize); + } +} diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/util/FileContentTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/util/FileContentTests.java new file mode 100644 index 0000000000000..9637fbfbe0032 --- /dev/null +++ b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/util/FileContentTests.java @@ -0,0 +1,80 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.implementation.util; + +import org.junit.jupiter.api.Test; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.file.FileSystem; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.spi.FileSystemProvider; +import java.util.Objects; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +/** + * Tests {@link FileContent}. + */ +public class FileContentTests { + @Test + public void fileChannelOpenErrorReturnsReactively() { + Path notARealPath = Paths.get("fake"); + FileContent fileContent = new FileContent(notARealPath, 0, 1024, 8092); + + StepVerifier.create(fileContent.asFluxByteBuffer()) + .verifyError(IOException.class); + } + + @Test + public void fileChannelCloseErrorReturnsReactively() throws IOException { + MyFileChannel myFileChannel = spy(MyFileChannel.class); + when(myFileChannel.map(any(), anyLong(), anyLong())).thenReturn(mock(MappedByteBuffer.class)); + doThrow(IOException.class).when(myFileChannel).implCloseChannel(); + + FileSystemProvider fileSystemProvider = mock(FileSystemProvider.class); + when(fileSystemProvider.newFileChannel(any(), any(), any())).thenReturn(myFileChannel); + + FileSystem fileSystem = mock(FileSystem.class); + when(fileSystem.provider()).thenReturn(fileSystemProvider); + + Path path = mock(Path.class); + when(path.getFileSystem()).thenReturn(fileSystem); + + FileContent fileContent = new FileContent(path, 0, 1024, 8092); + StepVerifier.create(fileContent.asFluxByteBuffer()) + .thenConsumeWhile(Objects::nonNull) + .verifyError(IOException.class); + } + + @Test + public void fileChannelIsClosedWhenMapErrors() throws IOException { + MyFileChannel myFileChannel = spy(MyFileChannel.class); + when(myFileChannel.map(any(), anyLong(), anyLong())).thenThrow(IOException.class); + + FileSystemProvider fileSystemProvider = mock(FileSystemProvider.class); + when(fileSystemProvider.newFileChannel(any(), any(), any())).thenReturn(myFileChannel); + + FileSystem fileSystem = mock(FileSystem.class); + when(fileSystem.provider()).thenReturn(fileSystemProvider); + + Path path = mock(Path.class); + when(path.getFileSystem()).thenReturn(fileSystem); + + FileContent fileContent = new FileContent(path, 0, 1024, 8092); + StepVerifier.create(fileContent.asFluxByteBuffer()) + .thenConsumeWhile(Objects::nonNull) + .verifyError(IOException.class); + + assertFalse(myFileChannel.isOpen()); + } +} diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/util/MyFileChannel.java b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/util/MyFileChannel.java new file mode 100644 index 0000000000000..21d08ee7f9d84 --- /dev/null +++ b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/util/MyFileChannel.java @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.implementation.util; + +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; + +public abstract class MyFileChannel extends FileChannel { + // Needed by Mockito + public MyFileChannel() { + super(); + } + + @Override + public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { + return null; + } + + @Override + protected void implCloseChannel() throws IOException { + } +} diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/util/BufferedFluxByteBufferTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/util/BufferedFluxByteBufferTests.java new file mode 100644 index 0000000000000..568e7bc9d0d2f --- /dev/null +++ b/sdk/core/azure-core/src/test/java/com/azure/core/util/BufferedFluxByteBufferTests.java @@ -0,0 +1,70 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.util; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.nio.ByteBuffer; +import java.security.SecureRandom; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +/** + * Tests {@link BufferedFluxByteBuffer}. + */ +public class BufferedFluxByteBufferTests { + @Test + public void coldBufferIsBuffered() { + byte[] randomBytes = new byte[1024 * 1024]; + SecureRandom secureRandom = new SecureRandom(); + secureRandom.nextBytes(randomBytes); + + BufferedFluxByteBuffer bufferedFluxByteBuffer = new BufferedFluxByteBuffer( + Flux.fromArray(splitBytesIntoBuffers(randomBytes))); + + // Run once to verify that the results are expected. + StepVerifier.create(FluxUtil.collectBytesInByteBufferStream(bufferedFluxByteBuffer)) + .assertNext(bytes -> assertArrayEquals(randomBytes, bytes)) + .verifyComplete(); + + // Run again to verify that the results are consistent. + StepVerifier.create(FluxUtil.collectBytesInByteBufferStream(bufferedFluxByteBuffer)) + .assertNext(bytes -> assertArrayEquals(randomBytes, bytes)) + .verifyComplete(); + } + + @Test + public void hotBufferIsBuffered() { + byte[] randomBytes = new byte[1024 * 1024]; + SecureRandom secureRandom = new SecureRandom(); + secureRandom.nextBytes(randomBytes); + + BufferedFluxByteBuffer bufferedFluxByteBuffer = new BufferedFluxByteBuffer( + Flux.fromArray(splitBytesIntoBuffers(randomBytes)).share()); + + // Run once to verify that the results are expected. + StepVerifier.create(FluxUtil.collectBytesInByteBufferStream(bufferedFluxByteBuffer)) + .assertNext(bytes -> assertArrayEquals(randomBytes, bytes)) + .verifyComplete(); + + // Run again to verify that the results are consistent. + StepVerifier.create(FluxUtil.collectBytesInByteBufferStream(bufferedFluxByteBuffer)) + .assertNext(bytes -> assertArrayEquals(randomBytes, bytes)) + .verifyComplete(); + } + + private static ByteBuffer[] splitBytesIntoBuffers(byte[] bytes) { + int expectedBuffers = (int) Math.ceil(bytes.length / (double) 1024); + ByteBuffer[] buffers = new ByteBuffer[expectedBuffers]; + + for (int i = 0; i < expectedBuffers; i++) { + int bufferLength = Math.min(1024, bytes.length - (1024 * i)); + buffers[i] = ByteBuffer.wrap(bytes, i * 1024, bufferLength); + } + + return buffers; + } +} diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/util/RequestContentTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/util/RequestContentTests.java new file mode 100644 index 0000000000000..b4fdf1dca4698 --- /dev/null +++ b/sdk/core/azure-core/src/test/java/com/azure/core/util/RequestContentTests.java @@ -0,0 +1,235 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.util; + +import org.junit.jupiter.api.function.Executable; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.SecureRandom; +import java.time.Duration; +import java.util.Arrays; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests {@link RequestContent}. + */ +public class RequestContentTests { + @ParameterizedTest + @MethodSource("expectedContentSupplier") + public void expectedContent(RequestContent content, boolean checkLength, byte[] expected) { + if (checkLength) { + assertEquals(expected.length, content.getLength()); + } + + StepVerifier.create(FluxUtil.collectBytesInByteBufferStream(content.asFluxByteBuffer())) + .assertNext(bytes -> assertArrayEquals(expected, bytes)) + .expectComplete() + .verify(Duration.ofSeconds(30)); + } + + private static Stream expectedContentSupplier() throws IOException { + byte[] emptyBytes = new byte[0]; + + SecureRandom random = new SecureRandom(); + byte[] randomBytes = new byte[1024 * 1024]; + random.nextBytes(randomBytes); + + String emptyString = new String(emptyBytes, StandardCharsets.UTF_8); + String randomString = new String(randomBytes, StandardCharsets.UTF_8); + // This is done as using random bytes to make a String may result in the String being longer than the initial + // byte array as missing code point bytes are packed. + byte[] randomStringBytes = randomString.getBytes(StandardCharsets.UTF_8); + + Path emptyTempFile = Files.createTempFile("emptyTempFile", ".txt"); + emptyTempFile.toFile().deleteOnExit(); + Files.write(emptyTempFile, emptyBytes); + + Path randomTempFile = Files.createTempFile("randomTempFile", ".txt"); + randomTempFile.toFile().deleteOnExit(); + Files.write(randomTempFile, randomBytes); + + Flux emptyFlux = Flux.defer(() -> Flux.just(ByteBuffer.wrap(emptyBytes))); + Flux randomFlux = Flux.defer(() -> Flux.just(ByteBuffer.wrap(randomBytes))); + Flux chunkedRandomFlux = Flux.generate(() -> 0, (offset, sink) -> { + if (offset == randomBytes.length) { + sink.complete(); + return offset; + } + + int nextLength = Math.min(1024, randomBytes.length - offset); + sink.next(ByteBuffer.wrap(Arrays.copyOfRange(randomBytes, offset, offset + nextLength))); + return offset + nextLength; + }); + + return Stream.of( + Arguments.of(RequestContent.fromBytes(emptyBytes), true, emptyBytes), + Arguments.of(RequestContent.fromBytes(emptyBytes, 0, 0), true, emptyBytes), + + Arguments.of(RequestContent.fromBytes(randomBytes), true, randomBytes), + Arguments.of(RequestContent.fromBytes(randomBytes, 0, randomBytes.length), true, randomBytes), + Arguments.of(RequestContent.fromBytes(randomBytes, 1024, 1024), true, + Arrays.copyOfRange(randomBytes, 1024, 2048)), + + Arguments.of(RequestContent.fromString(emptyString), true, emptyBytes), + Arguments.of(RequestContent.fromString(randomString), true, randomStringBytes), + + Arguments.of(RequestContent.fromFile(emptyTempFile), true, emptyBytes), + Arguments.of(RequestContent.fromFile(emptyTempFile, 0, 0, 8092), true, emptyBytes), + + Arguments.of(RequestContent.fromFile(randomTempFile), true, randomBytes), + Arguments.of(RequestContent.fromFile(randomTempFile, 0, randomBytes.length, 8092), true, randomBytes), + Arguments.of(RequestContent.fromFile(randomTempFile, 1024, 1024, 8092), true, + Arrays.copyOfRange(randomBytes, 1024, 2048)), + + Arguments.of(RequestContent.fromObject(emptyString), false, "\"\"".getBytes(StandardCharsets.UTF_8)), + + Arguments.of(RequestContent.fromFlux(emptyFlux), false, emptyBytes), + Arguments.of(RequestContent.fromFlux(emptyFlux, 0), true, emptyBytes), + + Arguments.of(RequestContent.fromFlux(randomFlux), false, randomBytes), + Arguments.of(RequestContent.fromFlux(randomFlux, randomBytes.length), true, randomBytes), + + Arguments.of(RequestContent.fromFlux(chunkedRandomFlux), false, randomBytes), + Arguments.of(RequestContent.fromFlux(chunkedRandomFlux, randomBytes.length), true, randomBytes), + + Arguments.of(RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(emptyFlux)), false, emptyBytes), + Arguments.of(RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(emptyFlux), 0), true, emptyBytes), + + Arguments.of(RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(randomFlux)), false, randomBytes), + Arguments.of(RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(randomFlux), randomBytes.length), + true, randomBytes), + + Arguments.of(RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(chunkedRandomFlux)), false, + randomBytes), + Arguments.of( + RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(chunkedRandomFlux), randomBytes.length), + true, randomBytes), + + Arguments.of(RequestContent.fromInputStream(new ByteArrayInputStream(emptyBytes)), false, emptyBytes), + Arguments.of(RequestContent.fromInputStream(new ByteArrayInputStream(emptyBytes), 0, 8092), true, + emptyBytes), + + Arguments.of(RequestContent.fromInputStream(new ByteArrayInputStream(randomBytes)), false, randomBytes), + Arguments.of(RequestContent.fromInputStream(new ByteArrayInputStream(randomBytes), randomBytes.length, + 8092), true, randomBytes) + ); + } + + @ParameterizedTest + @MethodSource("invalidArgumentSupplier") + public void invalidArgument(Executable requestContentSupplier, Class expectedException) { + assertThrows(expectedException, requestContentSupplier); + } + + private static Stream invalidArgumentSupplier() { + byte[] dummyBytes = new byte[0]; + + File mockFile = mock(File.class); + when(mockFile.length()).thenReturn(0L); + + Path mockPath = mock(Path.class); + when(mockPath.toFile()).thenReturn(mockFile); + + return Stream.of( + // bytes cannot be null + Arguments.of(createExecutable(() -> RequestContent.fromBytes(null)), NullPointerException.class), + Arguments.of(createExecutable(() -> RequestContent.fromBytes(null, 0, 0)), NullPointerException.class), + + // offset cannot be negative + Arguments.of(createExecutable(() -> RequestContent.fromBytes(dummyBytes, -1, 0)), + IllegalArgumentException.class), + + // length cannot be negative + Arguments.of(createExecutable(() -> RequestContent.fromBytes(dummyBytes, 0, -1)), + IllegalArgumentException.class), + + // offset + length cannot be greater than bytes.length + Arguments.of(createExecutable(() -> RequestContent.fromBytes(dummyBytes, 0, 1)), + IllegalArgumentException.class), + + // content cannot be null + Arguments.of(createExecutable(() -> RequestContent.fromString(null)), NullPointerException.class), + + // content cannot be null + Arguments.of(createExecutable(() -> RequestContent.fromBinaryData(null)), NullPointerException.class), + + // file cannot be null + Arguments.of(createExecutable(() -> RequestContent.fromFile(null)), NullPointerException.class), + Arguments.of(createExecutable(() -> RequestContent.fromFile(null, 0, 0, 0)), NullPointerException.class), + + // offset cannot be negative + Arguments.of(createExecutable(() -> RequestContent.fromFile(mockPath, -1, 0, 0)), + IllegalArgumentException.class), + + // length cannot be negative + Arguments.of(createExecutable(() -> RequestContent.fromFile(mockPath, 0, -1, 0)), + IllegalArgumentException.class), + + // offset + length cannot be greater than file size + Arguments.of(createExecutable(() -> RequestContent.fromFile(mockPath, 0, 1, 0)), + IllegalArgumentException.class), + + // chunkSize cannot be less than or equal to 0 + Arguments.of(createExecutable(() -> RequestContent.fromFile(mockPath, 0, 0, -1)), + IllegalArgumentException.class), + Arguments.of(createExecutable(() -> RequestContent.fromFile(mockPath, 0, 0, 0)), + IllegalArgumentException.class), + + // serializer cannot be null + Arguments.of(createExecutable(() -> RequestContent.fromObject(null, null)), NullPointerException.class), + + // content cannot be null + Arguments.of(createExecutable(() -> RequestContent.fromFlux(null)), NullPointerException.class), + + // length cannot be negative + Arguments.of(createExecutable(() -> RequestContent.fromFlux(Flux.empty(), -1)), + IllegalArgumentException.class), + + // content cannot be null + Arguments.of(createExecutable(() -> RequestContent.fromBufferedFlux(null)), NullPointerException.class), + + // length cannot be negative + Arguments.of( + createExecutable(() -> RequestContent.fromBufferedFlux(new BufferedFluxByteBuffer(Flux.empty()), -1)), + IllegalArgumentException.class), + + // content cannot be null + Arguments.of(createExecutable(() -> RequestContent.fromInputStream(null)), NullPointerException.class), + + // length cannot be negative + Arguments.of( + createExecutable(() -> RequestContent.fromInputStream(new ByteArrayInputStream(dummyBytes), -1, 0)), + IllegalArgumentException.class), + + // chunkSize cannot be zero or negative + Arguments.of( + createExecutable(() -> RequestContent.fromInputStream(new ByteArrayInputStream(dummyBytes), 0, -1)), + IllegalArgumentException.class), + Arguments.of( + createExecutable(() -> RequestContent.fromInputStream(new ByteArrayInputStream(dummyBytes), 0, 0)), + IllegalArgumentException.class) + ); + } + + private static Executable createExecutable(Runnable runnable) { + return runnable::run; + } +}