diff --git a/sdk/core/azure-core-http-netty/pom.xml b/sdk/core/azure-core-http-netty/pom.xml
index d7b0225dfce88..f3393f91520e4 100644
--- a/sdk/core/azure-core-http-netty/pom.xml
+++ b/sdk/core/azure-core-http-netty/pom.xml
@@ -82,6 +82,14 @@
reactor-netty
+
+ com.azure
+ azure-core
+ 1.0.0-preview.3
+ test-jar
+ test
+
+
io.projectreactor
reactor-test
diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyWithHttpProxyNettyTests.java b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/RestProxyWithHttpProxyNettyTests.java
similarity index 88%
rename from sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyWithHttpProxyNettyTests.java
rename to sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/RestProxyWithHttpProxyNettyTests.java
index 4cd0e9cb04a42..15bb5cb6fc1e7 100644
--- a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyWithHttpProxyNettyTests.java
+++ b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/RestProxyWithHttpProxyNettyTests.java
@@ -1,11 +1,12 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
-package com.azure.core.implementation;
+package com.azure.core.http.netty;
import com.azure.core.http.HttpClient;
import com.azure.core.http.ProxyOptions;
import com.azure.core.http.ProxyOptions.Type;
+import com.azure.core.implementation.RestProxyTests;
import org.junit.Ignore;
import java.net.InetSocketAddress;
diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyWithNettyTests.java b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/RestProxyWithNettyTests.java
similarity index 78%
rename from sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyWithNettyTests.java
rename to sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/RestProxyWithNettyTests.java
index d5fec7bdffbd0..ae8dad6de72d4 100644
--- a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyWithNettyTests.java
+++ b/sdk/core/azure-core-http-netty/src/test/java/com/azure/core/http/netty/RestProxyWithNettyTests.java
@@ -1,9 +1,10 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
-package com.azure.core.implementation;
+package com.azure.core.http.netty;
import com.azure.core.http.HttpClient;
+import com.azure.core.implementation.RestProxyTests;
public class RestProxyWithNettyTests extends RestProxyTests {
diff --git a/sdk/core/azure-core-management/src/test/java/com/azure/core/management/http/MockAzureHttpClient.java b/sdk/core/azure-core-management/src/test/java/com/azure/core/management/http/MockAzureHttpClient.java
index cada0ba5cd118..db3c251be2302 100644
--- a/sdk/core/azure-core-management/src/test/java/com/azure/core/management/http/MockAzureHttpClient.java
+++ b/sdk/core/azure-core-management/src/test/java/com/azure/core/management/http/MockAzureHttpClient.java
@@ -288,7 +288,7 @@ private static Map queryToMap(String url) {
}
private static String bodyToString(HttpRequest request) throws IOException {
- Mono asyncString = FluxUtil.collectBytesInByteBufferStream(request.body(), false)
+ Mono asyncString = FluxUtil.collectBytesInByteBufferStream(request.body())
.map(bytes -> new String(bytes, StandardCharsets.UTF_8));
return asyncString.block();
}
diff --git a/sdk/core/azure-core/pom.xml b/sdk/core/azure-core/pom.xml
index 400e4255bbc99..a7f6746740d87 100644
--- a/sdk/core/azure-core/pom.xml
+++ b/sdk/core/azure-core/pom.xml
@@ -147,6 +147,20 @@
test
+
+
+
+ maven-jar-plugin
+
+
+ test-jar
+ test-compile
+
+ test-jar
+
+
+
+
diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/http/HttpPipelineTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/http/HttpPipelineTests.java
index a4070bdec0bd0..e942dabd6f066 100644
--- a/sdk/core/azure-core/src/test/java/com/azure/core/http/HttpPipelineTests.java
+++ b/sdk/core/azure-core/src/test/java/com/azure/core/http/HttpPipelineTests.java
@@ -23,7 +23,14 @@
public class HttpPipelineTests {
@Test
public void constructorWithNoArguments() {
- HttpPipeline pipeline = HttpPipeline.builder().build();
+ HttpPipeline pipeline = HttpPipeline.builder()
+ .httpClient(new MockHttpClient() {
+ @Override
+ public Mono send(HttpRequest request) {
+ // do nothing
+ return null;
+ }
+ }).build();
assertEquals(0, pipeline.getPolicyCount());
assertNotNull(pipeline.httpClient());
}
@@ -34,7 +41,13 @@ public void withRequestPolicy() {
.policies(new PortPolicy(80, true),
new ProtocolPolicy("ftp", true),
new RetryPolicy())
- .build();
+ .httpClient(new MockHttpClient() {
+ @Override
+ public Mono send(HttpRequest request) {
+ // do nothing
+ return null;
+ }
+ }).build();
assertEquals(3, pipeline.getPolicyCount());
assertEquals(PortPolicy.class, pipeline.getPolicy(0).getClass());
@@ -49,7 +62,13 @@ public void withRequestOptions() throws MalformedURLException {
.policies(new PortPolicy(80, true),
new ProtocolPolicy("ftp", true),
new RetryPolicy())
- .build();
+ .httpClient(new MockHttpClient() {
+ @Override
+ public Mono send(HttpRequest request) {
+ // do nothing
+ return null;
+ }
+ }).build();
HttpPipelineCallContext context = new HttpPipelineCallContext(new HttpRequest(HttpMethod.GET, new URL("http://foo.com")));
assertNotNull(context);
diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/http/MockHttpClient.java b/sdk/core/azure-core/src/test/java/com/azure/core/http/MockHttpClient.java
index ed5f7e7f0986d..8628798cd4ddf 100644
--- a/sdk/core/azure-core/src/test/java/com/azure/core/http/MockHttpClient.java
+++ b/sdk/core/azure-core/src/test/java/com/azure/core/http/MockHttpClient.java
@@ -12,9 +12,7 @@
import com.azure.core.implementation.util.FluxUtil;
import reactor.core.publisher.Mono;
-import java.io.ByteArrayOutputStream;
import java.net.URL;
-import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.OffsetDateTime;
@@ -25,7 +23,6 @@
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
/**
* This HttpClient attempts to mimic the behavior of http://httpbin.org without ever making a network call.
@@ -143,7 +140,7 @@ public Mono send(HttpRequest request) {
json.data(createHttpBinResponseDataForRequest(request));
response = new MockHttpResponse(request, 200, json);
} else if (requestPathLower.equals("/post")) {
- if ("x-www-form-urlencoded".equalsIgnoreCase(contentType)) {
+ if (contentType != null && contentType.contains("x-www-form-urlencoded")) {
Map parsed = bodyToMap(request);
final HttpBinFormDataJSON json = new HttpBinFormDataJSON();
Form form = new Form();
@@ -153,6 +150,7 @@ public Mono send(HttpRequest request) {
form.pizzaSize(PizzaSize.valueOf(parsed.get("size")));
form.toppings(Arrays.asList(parsed.get("toppings").split(",")));
json.form(form);
+ response = new MockHttpResponse(request, 200, RESPONSE_HEADERS, json);
} else {
final HttpBinJSON json = new HttpBinJSON();
json.url(request.url().toString());
diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/FluxTestUtils.java b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/FluxTestUtils.java
deleted file mode 100644
index 68f2d0b13ea5f..0000000000000
--- a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/FluxTestUtils.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package com.azure.core.implementation;
-
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.MonoSink;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousFileChannel;
-import java.nio.channels.CompletionHandler;
-
-public class FluxTestUtils {
-
- /**
- * Collects byte buffers emitted by a Flux into a ByteBuf.
- *
- * @param stream A stream which emits ByteBuf instances.
- * @param autoReleaseEnabled if ByteBuf instances in stream gets automatically released as they consumed
- * @return A Mono which emits the concatenation of all the byte buffers given by the source Flux.
- */
- public static Mono collectByteBufStream(Flux stream, boolean autoReleaseEnabled) {
-// if (autoReleaseEnabled) {
-// Mono mergedCbb = Mono.using(
-// // Resource supplier
-// () -> {
-// CompositeByteBuf initialCbb = Unpooled.compositeBuffer();
-// return initialCbb;
-// },
-// // source Mono creator
-// (CompositeByteBuf initialCbb) -> {
-// Mono reducedCbb = stream.reduce(initialCbb, (CompositeByteBuf currentCbb, ByteBuf nextBb) -> {
-// CompositeByteBuf updatedCbb = currentCbb.addComponent(nextBb.retain());
-// return updatedCbb;
-// });
-// //
-// return reducedCbb
-// .doOnNext((CompositeByteBuf cbb) -> cbb.writerIndex(cbb.capacity()))
-// .filter((CompositeByteBuf cbb) -> cbb.isReadable());
-// },
-// // Resource cleaner
-// (CompositeByteBuf finalCbb) -> finalCbb.release());
-// return mergedCbb;
-// } else {
-// return stream.collect(Unpooled::compositeBuffer,
-// (cbb1, buffer) -> cbb1.addComponent(true, Unpooled.wrappedBuffer(buffer)))
-// .filter((CompositeByteBuf cbb) -> cbb.isReadable())
-// .map(bb -> bb);
-// }
-
- // TODO
- throw new IllegalStateException("This method is not yet re-implemented");
- }
-
- private static final int DEFAULT_CHUNK_SIZE = 1024 * 64;
-
- /**
- * Writes the bytes emitted by a Flux to an AsynchronousFileChannel.
- *
- * @param content the Flux content
- * @param outFile the file channel
- * @return a Completable which performs the write operation when subscribed
- */
- public static Mono bytebufStreamToFile(Flux content, AsynchronousFileChannel outFile) {
- return bytebufStreamToFile(content, outFile, 0);
- }
-
- /**
- * Writes the bytes emitted by a Flux to an AsynchronousFileChannel
- * starting at the given position in the file.
- *
- * @param content the Flux content
- * @param outFile the file channel
- * @param position the position in the file to begin writing
- * @return a Mono<Void> which performs the write operation when subscribed
- */
- public static Mono bytebufStreamToFile(Flux content, AsynchronousFileChannel outFile, long position) {
- return Mono.create(emitter -> content.subscribe(new ByteBufToFileSubscriber(outFile, position, emitter)));
- }
-
- private static class ByteBufToFileSubscriber implements Subscriber {
- private ByteBufToFileSubscriber(AsynchronousFileChannel outFile, long position, MonoSink emitter) {
- this.outFile = outFile;
- this.pos = position;
- this.emitter = emitter;
- }
-
- // volatile ensures that writes to these fields by one thread will be immediately visible to other threads.
- // An I/O pool thread will write to isWriting and read isCompleted,
- // while another thread may read isWriting and write to isCompleted.
- volatile boolean isWriting = false;
- volatile boolean isCompleted = false;
- volatile Subscription subscription;
- volatile long pos;
- AsynchronousFileChannel outFile;
- MonoSink emitter;
-
- @Override
- public void onSubscribe(Subscription s) {
- subscription = s;
- s.request(1);
- }
-
- @Override
- public void onNext(ByteBuffer bytes) {
- isWriting = true;
- outFile.write(bytes, pos, null, onWriteCompleted);
- }
-
- CompletionHandler onWriteCompleted = new CompletionHandler() {
- @Override
- public void completed(Integer bytesWritten, Object attachment) {
- isWriting = false;
- if (isCompleted) {
- emitter.success();
- }
- //noinspection NonAtomicOperationOnVolatileField
- pos += bytesWritten;
- if (subscription != null) {
- subscription.request(1);
- }
- }
-
- @Override
- public void failed(Throwable exc, Object attachment) {
- if (subscription != null) {
- subscription.cancel();
- }
- emitter.error(exc);
- }
- };
-
- @Override
- public void onError(Throwable throwable) {
- if (subscription != null) {
- subscription.cancel();
- }
- emitter.error(throwable);
- }
-
- @Override
- public void onComplete() {
- isCompleted = true;
- if (!isWriting) {
- emitter.success();
- }
- }
- }
-
- /**
- * Creates a {@link Flux} from an {@link AsynchronousFileChannel}
- * which reads the entire file.
- *
- * @param fileChannel The file channel.
- * @return The AsyncInputStream.
- */
- public static Flux byteBufStreamFromFile(AsynchronousFileChannel fileChannel) {
-// try {
-// long size = fileChannel.size();
-// return byteBufStreamFromFile(fileChannel, DEFAULT_CHUNK_SIZE, 0, size);
-// } catch (IOException e) {
-// return Flux.error(e);
-// }
-
- // TODO
- throw new IllegalStateException("This method is not yet re-implemented");
- }
-}
diff --git a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyTests.java b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyTests.java
index 07d7b45e90999..d0d08272afcb8 100644
--- a/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyTests.java
+++ b/sdk/core/azure-core/src/test/java/com/azure/core/implementation/RestProxyTests.java
@@ -1486,30 +1486,24 @@ interface DownloadService {
@Test
public void simpleDownloadTest() {
-// try (StreamResponse response = createService(DownloadService.class).getBytes()) {
-// int count = 0;
-// for (ByteBuffer byteBuf : response.value().doOnNext(b -> b.retain()).toIterable()) {
-// // assertEquals(1, byteBuf.refCnt());
-// count += byteBuf.readableBytes();
-// ReferenceCountUtil.refCnt(byteBuf);
-// }
-// assertEquals(30720, count);
-// }
-
- Assert.fail("Need to implement this test again");
+ try (StreamResponse response = createService(DownloadService.class).getBytes()) {
+ int count = 0;
+ for (ByteBuffer byteBuf : response.value().toIterable()) {
+ // assertEquals(1, byteBuf.refCnt());
+ count += byteBuf.remaining();
+ }
+ assertEquals(30720, count);
+ }
}
@Test
public void rawFlowableDownloadTest() {
-// Flux response = createService(DownloadService.class).getBytesFlowable();
-// int count = 0;
-// for (ByteBuffer byteBuf : response.doOnNext(b -> b.retain()).toIterable()) {
-// count += byteBuf.readableBytes();
-// ReferenceCountUtil.refCnt(byteBuf);
-// }
-// assertEquals(30720, count);
-
- Assert.fail("Need to implement this test again");
+ Flux response = createService(DownloadService.class).getBytesFlowable();
+ int count = 0;
+ for (ByteBuffer byteBuf : response.toIterable()) {
+ count += byteBuf.remaining();
+ }
+ assertEquals(30720, count);
}
@Host("https://httpbin.org")
@@ -1520,37 +1514,33 @@ interface FlowableUploadService {
}
@Test
- public void flowableUploadTest() throws Exception {
-// Path filePath = Paths.get(getClass().getClassLoader().getResource("upload.txt").toURI());
-// Flux stream = FluxUtil.byteBufferStreamFromFile(AsynchronousFileChannel.open(filePath));
-//
-// final HttpClient httpClient = createHttpClient();
-// // Scenario: Log the body so that body buffering/replay behavior is exercised.
-// //
-// // Order in which policies applied will be the order in which they added to builder
-// //
-// final HttpPipeline httpPipeline = HttpPipeline.builder()
-// .httpClient(httpClient)
-// .policies(new HttpLoggingPolicy(HttpLogDetailLevel.BODY_AND_HEADERS, true))
-// .build();
-// //
-// Response response = RestProxy.create(FlowableUploadService.class, httpPipeline, SERIALIZER).put(stream, Files.size(filePath));
-//
-// assertEquals("The quick brown fox jumps over the lazy dog", response.value().data());
+ public void fluxUploadTest() throws Exception {
+ Path filePath = Paths.get(getClass().getClassLoader().getResource("upload.txt").toURI());
+ Flux stream = FluxUtil.readFile(AsynchronousFileChannel.open(filePath));
+
+ final HttpClient httpClient = createHttpClient();
+ // Scenario: Log the body so that body buffering/replay behavior is exercised.
+ //
+ // Order in which policies applied will be the order in which they added to builder
+ //
+ final HttpPipeline httpPipeline = HttpPipeline.builder()
+ .httpClient(httpClient)
+ .policies(new HttpLoggingPolicy(HttpLogDetailLevel.BODY_AND_HEADERS, true))
+ .build();
+ //
+ Response response = RestProxy.create(FlowableUploadService.class, httpPipeline, SERIALIZER).put(stream, Files.size(filePath));
- Assert.fail("Need to implement this test again");
+ assertEquals("The quick brown fox jumps over the lazy dog", response.value().data());
}
@Test
public void segmentUploadTest() throws Exception {
-// Path filePath = Paths.get(getClass().getClassLoader().getResource("upload.txt").toURI());
-// AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(filePath, StandardOpenOption.READ);
-// Response response = createService(FlowableUploadService.class)
-// .put(FluxUtil.byteBufStreamFromFile(fileChannel, 4, 15), 15);
-//
-// assertEquals("quick brown fox", response.value().data());
+ Path filePath = Paths.get(getClass().getClassLoader().getResource("upload.txt").toURI());
+ AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(filePath, StandardOpenOption.READ);
+ Response response = createService(FlowableUploadService.class)
+ .put(FluxUtil.readFile(fileChannel, 4, 15), 15);
- Assert.fail("Need to implement this test again");
+ assertEquals("quick brown fox", response.value().data());
}
@Host("{url}")
@@ -1646,7 +1636,7 @@ interface Service26 {
@Test
public void postUrlFormEncoded() {
- Service26 service = RestProxy.create(Service26.class, HttpPipeline.builder().build());
+ Service26 service = createService(Service26.class);
HttpBinFormDataJSON response = service.postForm("Foo", "123", "foo@bar.com", PizzaSize.LARGE, Arrays.asList("Bacon", "Onion"));
assertNotNull(response);
assertNotNull(response.form());