executorService;
+
+ private ExecutorServiceKeeper(Client client) {
+ final ClientConfig config = ((JerseyClient) client).getConfiguration();
+ executorService = Optional.ofNullable(config.getExecutorService());
+ }
+
+ private ExecutorService getExecutorService(ClientRequest request) {
+ if (!executorService.isPresent()) {
+ // cache for multiple requests
+ executorService = Optional.ofNullable(request.getInjectionManager()
+ .getInstance(ExecutorServiceProvider.class, ClientAsyncExecutorLiteral.INSTANCE).getExecutorService());
+ }
+
+ return executorService.get();
+ }
+ }
+}
diff --git a/jersey/connector/src/main/java/io/helidon/jersey/connector/HelidonConnectorProvider.java b/jersey/connector/src/main/java/io/helidon/jersey/connector/HelidonConnectorProvider.java
new file mode 100644
index 00000000000..acb14f5c7bf
--- /dev/null
+++ b/jersey/connector/src/main/java/io/helidon/jersey/connector/HelidonConnectorProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.jersey.connector;
+
+import org.glassfish.jersey.client.spi.Connector;
+import org.glassfish.jersey.client.spi.ConnectorProvider;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.core.Configuration;
+import java.io.OutputStream;
+
+/**
+ * Provider for Helidon WebClient {@link Connector} that utilizes the Helidon HTTP Client to send and receive
+ * HTTP request and responses.
+ *
+ * The following properties are only supported at construction of this class:
+ *
+ * - {@link org.glassfish.jersey.client.ClientProperties#CONNECT_TIMEOUT}
+ * - {@link org.glassfish.jersey.client.ClientProperties#FOLLOW_REDIRECTS}
+ * - {@link org.glassfish.jersey.client.ClientProperties#PROXY_URI}
+ * - {@link org.glassfish.jersey.client.ClientProperties#PROXY_USERNAME}
+ * - {@link org.glassfish.jersey.client.ClientProperties#PROXY_PASSWORD}
+ * - {@link org.glassfish.jersey.client.ClientProperties#READ_TIMEOUT}
+ * - {@link HelidonProperties#CONFIG}
+ *
+ *
+ * If a {@link org.glassfish.jersey.client.ClientResponse} is obtained and an
+ * entity is not read from the response then
+ * {@link org.glassfish.jersey.client.ClientResponse#close()} MUST be called
+ * after processing the response to release connection-based resources.
+ *
+ *
+ * Client operations are thread safe, the HTTP connection may
+ * be shared between different threads.
+ *
+ *
+ * If a response entity is obtained that is an instance of {@link java.io.Closeable}
+ * then the instance MUST be closed after processing the entity to release
+ * connection-based resources.
+ *
+ *
+ * This connector uses {@link org.glassfish.jersey.client.ClientProperties#OUTBOUND_CONTENT_LENGTH_BUFFER} to buffer the entity
+ * written for instance by {@link javax.ws.rs.core.StreamingOutput}. Should the buffer be small and
+ * {@link javax.ws.rs.core.StreamingOutput#write(OutputStream)} be called many times, the performance can drop. The Content-Length
+ * or the Content_Encoding header is set by the underlaying Helidon WebClient regardless of the
+ * {@link org.glassfish.jersey.client.ClientProperties#OUTBOUND_CONTENT_LENGTH_BUFFER} size, however.
+ *
+ *
+ */
+public class HelidonConnectorProvider implements ConnectorProvider {
+ @Override
+ public Connector getConnector(Client client, Configuration runtimeConfig) {
+ return new HelidonConnector(client, runtimeConfig);
+ }
+}
diff --git a/jersey/connector/src/main/java/io/helidon/jersey/connector/HelidonEntity.java b/jersey/connector/src/main/java/io/helidon/jersey/connector/HelidonEntity.java
new file mode 100644
index 00000000000..40881fad637
--- /dev/null
+++ b/jersey/connector/src/main/java/io/helidon/jersey/connector/HelidonEntity.java
@@ -0,0 +1,172 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.jersey.connector;
+
+import javax.ws.rs.ProcessingException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Flow;
+import java.util.function.Function;
+
+import io.helidon.common.GenericType;
+import io.helidon.common.http.DataChunk;
+import io.helidon.common.http.MediaType;
+import io.helidon.common.reactive.Multi;
+import io.helidon.common.reactive.MultiFromOutputStream;
+import io.helidon.common.reactive.Single;
+import io.helidon.media.common.ContentWriters;
+import io.helidon.media.common.MessageBodyWriter;
+import io.helidon.media.common.MessageBodyWriterContext;
+import io.helidon.webclient.WebClientRequestBuilder;
+import io.helidon.webclient.WebClientResponse;
+
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientRequest;
+
+/**
+ * A utility class that converts outbound client entity to a class understandable by Helidon.
+ * Based on the {@link HelidonEntityType} an entity writer is provided to be registered by Helidon client
+ * and an Entity is provided to be submitted by the Helidon Client.
+ */
+class HelidonEntity {
+ /**
+ * HelidonEnity type chosen by HelidonEntityType
+ */
+ enum HelidonEntityType {
+ /**
+ * Simplest structure. Loads all data to the memory.
+ */
+ BYTE_ARRAY_OUTPUT_STREAM,
+ /**
+ * Readable ByteChannel that is capable of sending data in chunks.
+ * Capable of caching of bytes before the data are consumed by Helidon.
+ */
+ READABLE_BYTE_CHANNEL,
+ /**
+ * Helidon most native entity. Could be slower than {@link #READABLE_BYTE_CHANNEL}.
+ */
+ // Check LargeDataTest with OUTPUT_STREAM_MULTI
+ OUTPUT_STREAM_MULTI
+ }
+
+ /**
+ * Get optional entity writer to be registered by the Helidon Client. For some default providers,
+ * nothing is needed to be registered.
+ * @param type the type of the entity class that works best for the Http Client request use case.
+ * @return possible writer to be registerd by the Helidon Client.
+ */
+ static Optional> helidonWriter(HelidonEntityType type) {
+ switch (type) {
+ case BYTE_ARRAY_OUTPUT_STREAM:
+ return Optional.of(new OutputStreamBodyWriter());
+ case OUTPUT_STREAM_MULTI:
+ //Helidon default
+ return Optional.empty();
+ case READABLE_BYTE_CHANNEL:
+ return Optional.empty();
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Convert Jersey {@code OutputStream} to an entity based on the client request use case and submits to the provided
+ * {@code WebClientRequestBuilder}.
+ * @param type the type of the Helidon entity.
+ * @param requestContext Jersey {@link ClientRequest} providing the entity {@code OutputStream}.
+ * @param requestBuilder Helidon {@code WebClientRequestBuilder} which is used to submit the entity
+ * @param executorService {@link ExecutorService} that fills the entity instance for Helidon with data from Jersey
+ * {@code OutputStream}.
+ * @return Helidon Client response completion stage.
+ */
+ static CompletionStage submit(HelidonEntityType type,
+ ClientRequest requestContext,
+ WebClientRequestBuilder requestBuilder,
+ ExecutorService executorService) {
+ CompletionStage stage = null;
+ if (type != null) {
+ final int bufferSize = requestContext.resolveProperty(
+ ClientProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 8192);
+ switch (type) {
+ case BYTE_ARRAY_OUTPUT_STREAM:
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
+ requestContext.setStreamProvider(contentLength -> baos);
+ ((ProcessingRunnable) () -> requestContext.writeEntity()).run();
+ stage = requestBuilder.submit(baos);
+ break;
+ case READABLE_BYTE_CHANNEL:
+ final OutputStreamChannel channel = new OutputStreamChannel(bufferSize);
+ requestContext.setStreamProvider(contentLength -> channel);
+ executorService.execute((ProcessingRunnable) () -> requestContext.writeEntity());
+ stage = requestBuilder.submit(channel);
+ break;
+ case OUTPUT_STREAM_MULTI:
+ final MultiFromOutputStream publisher = new MultiFromOutputStream() {};
+ requestContext.setStreamProvider(contentLength -> publisher);
+ executorService.execute((ProcessingRunnable) () -> {
+ requestContext.writeEntity();
+ publisher.close();
+ });
+ stage = requestBuilder.submit(Multi.create(publisher).map(DataChunk::create));
+ break;
+ }
+ }
+ return stage;
+ }
+
+ @FunctionalInterface
+ private interface ProcessingRunnable extends Runnable {
+ void runOrThrow() throws IOException;
+
+ @Override
+ default void run() {
+ try {
+ runOrThrow();
+ } catch (IOException e) {
+ throw new ProcessingException("Error writing entity:", e);
+ }
+ }
+ }
+
+ private static class OutputStreamBodyWriter implements MessageBodyWriter {
+ private OutputStreamBodyWriter() {
+ }
+
+ @Override
+ public Flow.Publisher write(
+ Single extends ByteArrayOutputStream> content,
+ GenericType extends ByteArrayOutputStream> type,
+ MessageBodyWriterContext context) {
+ context.contentType(MediaType.APPLICATION_OCTET_STREAM);
+ return content.flatMap(new ByteArrayOutputStreamToChunks());
+ }
+
+ @Override
+ public PredicateResult accept(GenericType> type, MessageBodyWriterContext messageBodyWriterContext) {
+ return PredicateResult.supports(ByteArrayOutputStream.class, type);
+ }
+
+ private static class ByteArrayOutputStreamToChunks implements Function> {
+ @Override
+ public Flow.Publisher apply(ByteArrayOutputStream byteArrayOutputStream) {
+ return ContentWriters.writeBytes(byteArrayOutputStream.toByteArray(), false);
+ }
+ }
+ }
+}
diff --git a/jersey/connector/src/main/java/io/helidon/jersey/connector/HelidonProperties.java b/jersey/connector/src/main/java/io/helidon/jersey/connector/HelidonProperties.java
new file mode 100644
index 00000000000..16bed24fbb3
--- /dev/null
+++ b/jersey/connector/src/main/java/io/helidon/jersey/connector/HelidonProperties.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.jersey.connector;
+
+import io.helidon.config.Config;
+import io.helidon.webclient.WebClient;
+
+/**
+ * Configuration options specific to the Client API that utilizes {@link HelidonConnector}
+ */
+public final class HelidonProperties {
+
+ /**
+ * A Helidon {@link Config} instance that is passed to {@link WebClient.Builder#config(Config)} if available.
+ * This property is settable on {@link javax.ws.rs.core.Configurable#property(String, Object)} objects.
+ */
+ public static final String CONFIG = "jersey.connector.helidon.config";
+}
diff --git a/jersey/connector/src/main/java/io/helidon/jersey/connector/HelidonStructures.java b/jersey/connector/src/main/java/io/helidon/jersey/connector/HelidonStructures.java
new file mode 100644
index 00000000000..13736ca9efa
--- /dev/null
+++ b/jersey/connector/src/main/java/io/helidon/jersey/connector/HelidonStructures.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package io.helidon.jersey.connector;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.core.Configuration;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+
+import io.helidon.common.http.Headers;
+import io.helidon.common.http.Http;
+import io.helidon.common.http.ReadOnlyParameters;
+import io.helidon.config.Config;
+import io.helidon.media.common.DefaultMediaSupport;
+import io.helidon.media.common.MessageBodyReader;
+import io.helidon.webclient.Proxy;
+import io.helidon.webclient.Ssl;
+import io.helidon.webclient.WebClientResponse;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientRequest;
+
+/**
+ * Helidon specific classes and implementations.
+ */
+class HelidonStructures {
+
+ static Headers createHeaders(Map> data) {
+ return new ReadOnlyHeaders(data);
+ }
+
+ static MessageBodyReader createInputStreamBodyReader() {
+ return DefaultMediaSupport.inputStreamReader();
+ }
+
+ static Optional helidonConfig(Configuration configuration) {
+ final Object helidonConfig = configuration.getProperty(HelidonProperties.CONFIG);
+ if (helidonConfig != null) {
+ if (!Config.class.isInstance(helidonConfig)) {
+ HelidonConnector.LOGGER.warning(
+ String.format("Given instance of %s is not Helidon config. Provided HelidonProperties.CONFIG is ignored.",
+ helidonConfig.getClass().getName())
+ );
+ return Optional.empty();
+ } else {
+ return Optional.of((Config) helidonConfig);
+ }
+ }
+ return Optional.empty();
+ }
+
+ static Optional createProxy(Configuration config) {
+ return ProxyBuilder.createProxy(config);
+ }
+
+ static Optional createProxy(ClientRequest request) {
+ return ProxyBuilder.createProxy(request);
+ }
+
+ static Optional createSSL(SSLContext context) {
+ return context == null ? Optional.empty() : Optional.of(Ssl.builder().sslContext(context).build());
+ }
+
+ static boolean hasEntity(WebClientResponse webClientResponse) {
+ final ReadOnlyParameters headers = webClientResponse.content().readerContext().headers();
+ final Optional contentLenth = headers.first(Http.Header.CONTENT_LENGTH);
+ final Optional encoding = headers.first(Http.Header.TRANSFER_ENCODING);
+
+ return ((contentLenth.isPresent() && !contentLenth.get().equals("0"))
+ || (encoding.isPresent() && encoding.get().equals(HttpHeaderValues.CHUNKED.toString())));
+ }
+
+ private static class ReadOnlyHeaders extends ReadOnlyParameters implements Headers {
+ public ReadOnlyHeaders(Map> data) {
+ super(data);
+ }
+ }
+
+ private static class ProxyBuilder {
+ private static Optional createProxy(Configuration config) {
+ final Object proxyUri = config.getProperty(ClientProperties.PROXY_URI);
+ final String userName
+ = ClientProperties.getValue(config.getProperties(), ClientProperties.PROXY_USERNAME, String.class);
+ final String password
+ = ClientProperties.getValue(config.getProperties(), ClientProperties.PROXY_PASSWORD, String.class);
+ return createProxy(proxyUri, userName, password);
+ }
+
+ private static Optional createProxy(ClientRequest clientRequest) {
+ final Object proxyUri = clientRequest.resolveProperty(ClientProperties.PROXY_URI, Object.class);
+ final String userName = clientRequest.resolveProperty(ClientProperties.PROXY_USERNAME, String.class);
+ final String password = clientRequest.resolveProperty(ClientProperties.PROXY_PASSWORD, String.class);
+ return createProxy(proxyUri, userName, password);
+ }
+
+ private static Optional createProxy(Object proxyUri, String userName, String password) {
+ if (proxyUri != null) {
+ final URI u = getProxyUri(proxyUri);
+ final Proxy.Builder builder = Proxy.builder();
+ if (u.getScheme().toUpperCase(Locale.ROOT).equals("DIRECT")) {
+ builder.type(Proxy.ProxyType.NONE);
+ } else {
+ builder.host(u.getHost()).port(u.getPort());
+ switch (u.getScheme().toUpperCase(Locale.ROOT)) {
+ case "HTTP":
+ builder.type(Proxy.ProxyType.HTTP);
+ break;
+ case "SOCKS":
+ builder.type(Proxy.ProxyType.SOCKS_4);
+ break;
+ case "SOCKS5":
+ builder.type(Proxy.ProxyType.SOCKS_5);
+ break;
+ default:
+ HelidonConnector.LOGGER.warning(String.format("Proxy schema %s not supported.", u.getScheme()));
+ return Optional.empty();
+ }
+ }
+ if (userName != null) {
+ builder.username(userName);
+
+ if (password != null) {
+ builder.password(password.toCharArray());
+ }
+ }
+ return Optional.of(builder.build());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ private static URI getProxyUri(final Object proxy) {
+ if (proxy instanceof URI) {
+ return (URI) proxy;
+ } else if (proxy instanceof String) {
+ return URI.create((String) proxy);
+ } else {
+ throw new ProcessingException("The proxy URI (" + proxy + ") property MUST be an instance of String or URI");
+ }
+ }
+ }
+}
diff --git a/jersey/connector/src/main/java/io/helidon/jersey/connector/OutputStreamChannel.java b/jersey/connector/src/main/java/io/helidon/jersey/connector/OutputStreamChannel.java
new file mode 100644
index 00000000000..1bf573d75fa
--- /dev/null
+++ b/jersey/connector/src/main/java/io/helidon/jersey/connector/OutputStreamChannel.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.jersey.connector;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A ReadableByteChannel implementation that provides cache to store {@link OutputStream#write(int) written}
+ * http entity before it is {@link ReadableByteChannel#read(ByteBuffer) read} by the {@link ReadableByteChannel} consumer.
+ *
+ *
+ * The implementation is backed by the {@link LinkedBlockingDeque}, with default {@link #READ_TIMEOUT} read timeout
+ * and {@link #WRITE_TIMEOUT} write timeout, by default 10 seconds.
+ *
+ *
+ * The {@link LinkedBlockingDeque queue} stores up to {@link #CAPACITY} of ByteBuffer, each of which is minimum
+ * 8192 bytes long by default. The default can be overridden by {@code ClientProperties#OUTBOUND_CONTENT_LENGTH_BUFFER}.
+ *
+ *
+ */
+class OutputStreamChannel extends OutputStream implements ReadableByteChannel {
+
+ private ReentrantLock lock = new ReentrantLock();
+ private static final ByteBuffer VOID = ByteBuffer.allocate(0);
+ private static final int CAPACITY = Integer.getInteger("helidon.connector.osc.capacity", 8);
+ private static final int WRITE_TIMEOUT = Integer.getInteger("helidon.connector.osc.read.timeout", 10000);
+ private static final int READ_TIMEOUT = Integer.getInteger("helidon.connector.osc.write.timeout", 10000);
+ private final int bufferSize;
+
+ /**
+ * The minimum capacity of a buffer in bytes. The {@link LinkedBlockingDeque queue} stores up to {@link #CAPACITY} of them.
+ * The actual buffer size can be greater than this {@code bufferSize}, if a greater amount of data is sent to any of
+ * {@link OutputStreamChannel#write(byte[]) write} methods.
+ * @param bufferSize
+ */
+ OutputStreamChannel(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ private final LinkedBlockingDeque queue = new LinkedBlockingDeque<>(CAPACITY);
+
+ private volatile boolean open = true;
+ private ByteBuffer remainingByteBuffer;
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ if (!open) {
+ throw new ClosedChannelException();
+ }
+
+ int sum = 0;
+
+ do {
+ ByteBuffer top;
+ try {
+ top = poll(READ_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ open = false;
+ throw new ClosedByInterruptException();
+ }
+
+ if (top == null) {
+ return sum;
+ }
+
+ if (top == VOID) {
+ if (sum == 0) {
+ open = false;
+ return -1;
+ } else {
+ queue.addFirst(top);
+ return sum;
+ }
+ }
+
+ final int topSize = top.remaining();
+ final int dstAvailable = dst.remaining();
+ final int minSize = Math.min(topSize, dstAvailable);
+
+ if (top.hasArray()) {
+ dst.put(top.array(), top.arrayOffset() + top.position(), minSize);
+ top.position(top.position() + minSize);
+ } else {
+ while (dst.hasRemaining() && top.hasRemaining()) {
+ dst.put(top.get());
+ }
+ }
+
+ sum += minSize;
+
+ if (top.hasRemaining()) {
+ remainingByteBuffer = top;
+ }
+ } while (dst.hasRemaining());
+
+ return sum;
+ }
+
+ private ByteBuffer poll(long timeout, TimeUnit unit) throws InterruptedException {
+ if (remainingByteBuffer != null) {
+ final ByteBuffer remaining = remainingByteBuffer;
+ remainingByteBuffer = null;
+ return remaining;
+ } else {
+ // do not modify head
+ lock.lock();
+ final ByteBuffer peek = queue.poll(timeout, unit);
+ // can modify head
+ lock.unlock();
+ return peek;
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ write(new byte[]{(byte) b}, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ super.write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ checkClosed();
+
+ if (lock.tryLock()) {
+ if (len < bufferSize && queue.size() > 0) {
+ final ByteBuffer buffer = queue.getLast();
+ if (buffer != null && (buffer.capacity() - buffer.limit()) > len) {
+ //set for write
+ buffer.position(buffer.limit());
+ buffer.limit(buffer.capacity());
+ buffer.put(b, off, len);
+ //set for read
+ buffer.flip();
+ lock.unlock();
+ return;
+ }
+ }
+ lock.unlock();
+ }
+
+ final int maxLen = Math.max(len, bufferSize);
+ final byte[] bytes = new byte[maxLen];
+ System.arraycopy(b, off, bytes, 0, len);
+
+ final ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ buffer.limit(len);
+ buffer.position(0);
+
+ write(buffer);
+ }
+
+ private void write(ByteBuffer buffer) throws IOException {
+ try {
+ boolean queued = queue.offer(buffer, WRITE_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!queued) {
+ throw new IOException("Buffer overflow.");
+ }
+
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ boolean offer = false;
+
+ try {
+ offer = queue.offer(VOID, WRITE_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // ignore.
+ }
+
+ if (!offer) {
+ lock.lock();
+ queue.removeLast();
+ queue.add(VOID);
+ lock.unlock();
+ }
+ }
+
+
+ @Override
+ public boolean isOpen() {
+ return open;
+ }
+
+ private void checkClosed() throws IOException {
+ if (!open) {
+ throw new IOException("Stream already closed.");
+ }
+ }
+}
diff --git a/jersey/connector/src/test/java/io/helidon/jersey/connector/AbstractTest.java b/jersey/connector/src/test/java/io/helidon/jersey/connector/AbstractTest.java
new file mode 100644
index 00000000000..e432860a401
--- /dev/null
+++ b/jersey/connector/src/test/java/io/helidon/jersey/connector/AbstractTest.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.jersey.connector;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.HttpHeaders;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.common.ConsoleNotifier;
+import com.github.tomakehurst.wiremock.common.FileSource;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import com.github.tomakehurst.wiremock.extension.Extension;
+import com.github.tomakehurst.wiremock.extension.Parameters;
+import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
+import com.github.tomakehurst.wiremock.http.HttpHeader;
+import com.github.tomakehurst.wiremock.http.Request;
+import com.github.tomakehurst.wiremock.http.Response;
+import org.glassfish.jersey.client.ClientConfig;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public abstract class AbstractTest {
+
+ protected static WireMockServer wireMock;
+ protected static ThreadLocal rules = new ThreadLocal<>();
+ protected static ThreadLocal extensions = new ThreadLocal<>();
+
+ // The port to match wiremock port in MP Rest Client TCK
+ protected static final int PORT = 8765;
+
+ protected WebTarget target(String uri) {
+ return target(uri, null);
+ }
+
+ protected WebTarget target(String uri, String entityType) {
+ final ClientConfig config = new ClientConfig();
+ config.connectorProvider(new HelidonConnectorProvider());
+ if (entityType != null) {
+ config.property(HelidonConnector.INTERNAL_ENTITY_TYPE, entityType);
+ }
+ final Client client = ClientBuilder.newClient(config);
+ return client.target(getBaseUri()).path(uri);
+ }
+
+ protected static String getBaseUri() {
+ return "http://localhost:" + PORT;
+ }
+
+ public static void setup() {
+ wireMock = new WireMockServer(
+ WireMockConfiguration.options()
+ .extensions(extensions.get())
+ // debug logging
+ // .notifier(new ConsoleNotifier(true))
+ .port(PORT)
+ );
+ rules.get().addRules();
+
+ wireMock.start();
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ wireMock.shutdown();
+ while(wireMock.isRunning()) {
+ try {
+ Thread.sleep(100L);
+ } catch (InterruptedException e) {
+ // We finish the tests
+ }
+ }
+ }
+
+ @Retention(RetentionPolicy.RUNTIME)
+ @ParameterizedTest
+ @ValueSource(strings = { "BYTE_ARRAY_OUTPUT_STREAM", "READABLE_BYTE_CHANNEL", "OUTPUT_STREAM_MULTI" })
+ @interface ParamTest { }
+
+ protected interface Rules {
+ void addRules();
+ }
+
+
+ protected static class ContentLengthSetter extends ResponseTransformer {
+ @Override
+ public com.github.tomakehurst.wiremock.http.Response transform(
+ Request request,
+ com.github.tomakehurst.wiremock.http.Response response,
+ FileSource files,
+ Parameters parameters) {
+ final String content = response.getBodyAsString();
+ com.github.tomakehurst.wiremock.http.Response.Builder builder =
+ com.github.tomakehurst.wiremock.http.Response.response();
+ if (content != null && content.length() != 0) {
+ builder = builder
+ .body(content)
+ .headers(response.getHeaders().plus(
+ HttpHeader.httpHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(content.length()))
+ ));
+ } else {
+ builder = builder.headers(response.getHeaders());
+ }
+ return builder.status(response.getStatus()).build();
+ }
+
+ @Override
+ public String getName() {
+ return "content-length-transformer";
+ }
+ }
+
+ /**
+ * Usable when the method contains an operation that needs to be executed everytime.
+ * The stub is cached by default.
+ */
+ static class UncachedResponseMethodExecutor extends ResponseTransformer {
+
+ private final Supplier methodSupplier;
+
+ UncachedResponseMethodExecutor(Supplier methodSupplier) {
+ this.methodSupplier = methodSupplier;
+ }
+
+ @Override
+ public Response transform(Request request, Response response, FileSource files, Parameters parameters) {
+ javax.ws.rs.core.Response original = methodSupplier.get();
+ com.github.tomakehurst.wiremock.http.Response.Builder builder =
+ com.github.tomakehurst.wiremock.http.Response.response()
+ .status(original.getStatus());
+ if (original.hasEntity()) {
+ builder = builder.body(String.valueOf(original.getEntity()));
+ }
+
+ com.github.tomakehurst.wiremock.http.HttpHeaders newHeaders = com.github.tomakehurst.wiremock.http.HttpHeaders.noHeaders();
+ for (Map.Entry> entry : original.getStringHeaders().entrySet()) {
+ if (javax.ws.rs.core.HttpHeaders.LOCATION.equals(entry.getKey())) {
+ newHeaders = newHeaders.plus(
+ HttpHeader.httpHeader(entry.getKey(), getBaseUri() + entry.getValue().get(0))
+ );
+ } else {
+ newHeaders = newHeaders.plus(
+ HttpHeader.httpHeader(entry.getKey(), entry.getValue().toArray(new String[entry.getValue().size()]))
+ );
+ }
+ }
+
+ builder = builder.headers(newHeaders);
+ return builder.build();
+ }
+
+ @Override
+ public String getName() {
+ return "uncached-response-executor";
+ }
+
+ @Override
+ public boolean applyGlobally() {
+ return false;
+ }
+ }
+
+ /**
+ * Usable when the method contains an operation that needs to be executed everytime.
+ * The stub is cached by default.
+ */
+ static class UncachedStringMethodExecutor extends ResponseTransformer {
+
+ private final Supplier methodSupplier;
+
+ UncachedStringMethodExecutor(Supplier methodSupplier) {
+ this.methodSupplier = methodSupplier;
+ }
+
+ @Override
+ public Response transform(Request request, Response response, FileSource files, Parameters parameters) {
+ return com.github.tomakehurst.wiremock.http.Response.response().body(methodSupplier.get()).build();
+ }
+
+ @Override
+ public String getName() {
+ return "uncached-string-executor";
+ }
+
+ @Override
+ public boolean applyGlobally() {
+ return false;
+ }
+ }
+}
diff --git a/jersey/connector/src/test/java/io/helidon/jersey/connector/AsyncRequestTest.java b/jersey/connector/src/test/java/io/helidon/jersey/connector/AsyncRequestTest.java
new file mode 100644
index 00000000000..05fa82faff7
--- /dev/null
+++ b/jersey/connector/src/test/java/io/helidon/jersey/connector/AsyncRequestTest.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.jersey.connector;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.core.Response;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.github.tomakehurst.wiremock.extension.Extension;
+import org.glassfish.jersey.client.ClientConfig;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+
+public class AsyncRequestTest extends AbstractTest {
+
+ private static AsyncResource asyncResource = new AsyncResource();
+
+ @Path("async")
+ public static class AsyncResource {
+ private CountDownLatch shortLong = null;
+
+ @GET
+ @Path("reset")
+ public String reset() {
+ shortLong = new CountDownLatch(1);
+ return null;
+ }
+
+ @Path("long")
+ @GET
+ public String longGet() {
+ try {
+ shortLong.await(10000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return shortLong.getCount() == 0 ? "long" : "shortLong CountDownLatch has not been hit";
+ }
+
+ @Path("short")
+ @GET
+ public String shortGet() {
+ shortLong.countDown();
+ return "short";
+ }
+ }
+
+ @BeforeAll
+ public static void setup() {
+ final UncachedStringMethodExecutor executor = new UncachedStringMethodExecutor(asyncResource::longGet);
+
+ AbstractTest.extensions.set(new Extension[] {
+ executor,
+ new ContentLengthSetter()
+ });
+
+ AbstractTest.rules.set(
+ () -> {
+ wireMock.stubFor(
+ WireMock.get(WireMock.urlEqualTo("/async/reset")).willReturn(
+ WireMock.ok(asyncResource.reset()).withStatus(204)
+ )
+ );
+ wireMock.stubFor(
+ WireMock.get(WireMock.urlEqualTo("/async/short")).willReturn(
+ WireMock.ok(asyncResource.shortGet())
+ )
+ );
+ wireMock.stubFor(
+ WireMock.get(WireMock.urlEqualTo("/async/long")).willReturn(
+ WireMock.ok().withTransformers(executor.getName())
+ )
+ );
+ });
+
+ AbstractTest.setup();
+ }
+
+ @ParamTest
+ public void testTwoClientsAsync(String entityType) throws ExecutionException, InterruptedException {
+ try (Response resetResponse = target("async", entityType).path("reset").request().get()) {
+ Assertions.assertEquals(204, resetResponse.getStatus());
+ }
+
+ ClientConfig config = new ClientConfig();
+ config.connectorProvider(new HelidonConnectorProvider());
+
+ Client longClient = ClientBuilder.newClient(config);
+ Invocation.Builder longRequest = longClient.target(getBaseUri()).path("async/long").request();
+
+ Client shortClient = ClientBuilder.newClient(config);
+ Invocation.Builder shortRequest = shortClient.target(getBaseUri()).path("async/short").request();
+
+ Future futureLongResponse = longRequest.async().get();
+ Future futureShortResponse = shortRequest.async().get();
+
+ try (Response shortResponse = futureShortResponse.get()) {
+ Assertions.assertEquals(200, shortResponse.getStatus());
+ Assertions.assertEquals("short", shortResponse.readEntity(String.class));
+ }
+
+ try (Response longResponse = futureLongResponse.get()) {
+ Assertions.assertEquals(200, longResponse.getStatus());
+ Assertions.assertEquals("long", longResponse.readEntity(String.class));
+ }
+
+ Assertions.assertEquals(0, asyncResource.shortLong.getCount());
+ }
+}
diff --git a/jersey/connector/src/test/java/io/helidon/jersey/connector/BasicRequestTest.java b/jersey/connector/src/test/java/io/helidon/jersey/connector/BasicRequestTest.java
new file mode 100644
index 00000000000..f07898be440
--- /dev/null
+++ b/jersey/connector/src/test/java/io/helidon/jersey/connector/BasicRequestTest.java
@@ -0,0 +1,361 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.jersey.connector;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Cookie;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.github.tomakehurst.wiremock.common.FileSource;
+import com.github.tomakehurst.wiremock.extension.Extension;
+import com.github.tomakehurst.wiremock.extension.Parameters;
+import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
+import com.github.tomakehurst.wiremock.http.HttpHeader;
+import com.github.tomakehurst.wiremock.http.Request;
+import com.github.tomakehurst.wiremock.matching.EqualToPattern;
+import org.glassfish.jersey.client.JerseyCompletionStageRxInvoker;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+public class BasicRequestTest extends AbstractTest {
+ private static BasicResource basicResource = new BasicResource();
+
+ @BeforeAll
+ public static void setup() {
+ AbstractTest.extensions.set(new Extension[] {
+ new HeadersSetter(), new ContentLengthSetter()
+ });
+
+ AbstractTest.rules.set(() -> {
+ wireMock.stubFor(
+ WireMock.get(WireMock.urlEqualTo("/basic/get")).willReturn(
+ WireMock.ok(basicResource.get())
+ )
+ );
+
+ wireMock.stubFor(
+ WireMock.get(WireMock.urlEqualTo("/basic/getquery?first=hello&second=world"))
+ .willReturn(WireMock.ok(basicResource.getQuery("hello", "world")))
+ );
+
+ wireMock.stubFor(
+ WireMock.post(WireMock.urlEqualTo("/basic/post"))
+ .withRequestBody(new EqualToPattern("ok"))
+ .willReturn(WireMock.ok(basicResource.post("ok")))
+ );
+
+ wireMock.stubFor(
+ WireMock.post(WireMock.urlEqualTo("/basic/post"))
+ .withRequestBody(new EqualToPattern("ok"))
+ .willReturn(WireMock.ok(basicResource.post("ok")))
+ );
+
+ wireMock.stubFor(
+ WireMock.get(WireMock.urlEqualTo("/basic/headers")).willReturn(
+ WireMock.ok().withTransformers("response-headers-setter")
+ )
+ );
+
+ wireMock.stubFor(
+ WireMock.put(WireMock.urlEqualTo("/basic/produces/consumes"))
+ .withHeader(HttpHeaders.ACCEPT, new EqualToPattern("test/z-test"))
+ .willReturn(WireMock.status(406))
+ );
+
+ wireMock.stubFor(
+ WireMock.put(WireMock.urlEqualTo("/basic/produces/consumes"))
+ .withHeader(HttpHeaders.CONTENT_TYPE, new EqualToPattern("test/z-test"))
+ .willReturn(WireMock.status(415))
+ );
+
+ wireMock.stubFor(
+ WireMock.put(WireMock.urlEqualTo("/basic/produces/consumes"))
+ .withHeader(HttpHeaders.CONTENT_TYPE, new EqualToPattern("test/x-test"))
+ .withHeader(HttpHeaders.ACCEPT, new EqualToPattern("test/y-test"))
+ .willReturn(WireMock.ok(basicResource.putConsumesProduces("ok"))
+ .withHeader(HttpHeaders.CONTENT_TYPE, "test/y-test"))
+ );
+ });
+
+ AbstractTest.setup();
+ }
+
+ @Path("basic")
+ public static class BasicResource {
+ @Path("get")
+ @GET
+ public String get() {
+ return "ok";
+ }
+
+ @Path("getquery")
+ @GET
+ public String getQuery(@QueryParam("first") String first, @QueryParam("second") String second) {
+ return first + second;
+ }
+
+ @POST
+ @Path("post")
+ public String post(String entity) {
+ return entity + entity;
+ }
+
+ @GET
+ @Path("headers")
+ public Response headers(@Context HttpHeaders headers) {
+ final Response.ResponseBuilder response = Response.ok("ok");
+ for (Map.Entry> set : headers.getRequestHeaders().entrySet()) {
+ if (set.getKey().toUpperCase(Locale.ROOT).startsWith("X-TEST")) {
+ response.header(set.getKey(), set.getValue().iterator().next());
+ }
+ }
+ return response.build();
+ }
+
+ @PUT
+ @Consumes("test/x-test")
+ @Produces("test/y-test")
+ @Path("produces/consumes")
+ public String putConsumesProduces(String content) {
+ return content + content;
+ }
+ }
+
+ @ParamTest
+ public void testBasicGet(String entityType) {
+ try (Response response = target("basic", entityType).path("get").request().get()) {
+ Assertions.assertEquals(200, response.getStatus());
+ Assertions.assertEquals("ok", response.readEntity(String.class));
+ }
+ }
+
+ @ParamTest
+ public void testBasicPost(String entityType) {
+ try (Response response = target("basic", entityType).path("post").request()
+ .buildPost(Entity.entity("ok", MediaType.TEXT_PLAIN_TYPE)).invoke()) {
+ Assertions.assertEquals(200, response.getStatus());
+ Assertions.assertEquals("okok", response.readEntity(String.class));
+ }
+ }
+
+ @ParamTest
+ public void queryGetTest(String entityType) {
+ try (Response response = target("basic", entityType).path("getquery")
+ .queryParam("first", "hello")
+ .queryParam("second", "world")
+ .request().get()) {
+ Assertions.assertEquals(200, response.getStatus());
+ Assertions.assertEquals("helloworld", response.readEntity(String.class));
+ }
+ }
+
+ @ParamTest
+ public void testHeaders(String entityType) {
+ String[][] headers = new String[][]{{"X-TEST-ONE", "ONE"}, {"X-TEST-TWO", "TWO"}, {"X-TEST-THREE", "THREE"}};
+ MultivaluedHashMap map = new MultivaluedHashMap<>();
+ Arrays.stream(headers).forEach(a -> map.add(a[0], a[1]));
+ try (Response response = target("basic", entityType).path("headers").request().headers(map).get()) {
+ Assertions.assertEquals(200, response.getStatus());
+ Assertions.assertEquals("ok", response.readEntity(String.class));
+ for (int i = 0; i != headers.length; i++) {
+ Assertions.assertTrue(response.getHeaders().containsKey(headers[i][0]));
+ Assertions.assertEquals(headers[i][1], response.getStringHeaders().getFirst(headers[i][0]));
+ }
+ }
+ }
+
+ @ParamTest
+ public void testProduces(String entityType) {
+ try (Response response = target("basic", entityType).path("produces/consumes").request("test/z-test")
+ .put(Entity.entity("ok", new MediaType("test", "x-test")))) {
+ Assertions.assertEquals(406, response.getStatus());
+ }
+
+ try (Response response = target("basic", entityType).path("produces/consumes").request("test/y-test")
+ .put(Entity.entity("ok", new MediaType("test", "x-test")))) {
+ Assertions.assertEquals(200, response.getStatus());
+ Assertions.assertEquals("okok", response.readEntity(String.class));
+ Assertions.assertEquals("test/y-test", response.getStringHeaders().getFirst(HttpHeaders.CONTENT_TYPE));
+ }
+ }
+
+ @ParamTest
+ public void testAsyncGet(String entityType) throws ExecutionException, InterruptedException {
+ Future futureResponse = target("basic", entityType).path("get").request().async().get();
+ try (Response response = futureResponse.get()) {
+ Assertions.assertEquals(200, response.getStatus());
+ Assertions.assertEquals("ok", response.readEntity(String.class));
+ }
+ }
+
+ @ParamTest
+ public void testConsumes(String entityType) {
+ try (Response response = target("basic", entityType).path("produces/consumes").request("test/y-test")
+ .put(Entity.entity("ok", new MediaType("test", "z-test")))) {
+ Assertions.assertEquals(415, response.getStatus());
+ }
+
+ try (Response response = target("basic", entityType).path("produces/consumes").request("test/y-test")
+ .put(Entity.entity("ok", new MediaType("test", "x-test")))) {
+ Assertions.assertEquals(200, response.getStatus());
+ Assertions.assertEquals("okok", response.readEntity(String.class));
+ Assertions.assertEquals("test/y-test", response.getStringHeaders().getFirst(HttpHeaders.CONTENT_TYPE));
+ }
+ }
+
+ @ParamTest
+ public void testRxGet(String entityType) throws ExecutionException, InterruptedException {
+ @SuppressWarnings("unchecked")
+ final CompletableFuture futureResponse =
+ target("basic", entityType).path("get").request().rx(JerseyCompletionStageRxInvoker.class).get();
+
+ try (Response response = futureResponse.get()) {
+ Assertions.assertEquals(200, response.getStatus());
+ Assertions.assertEquals("ok", response.readEntity(String.class));
+ }
+ }
+
+ @ParamTest
+ public void testInputStreamEntity(String entityType) throws IOException {
+ try (Response response = target("basic", entityType).path("get").request().get()) {
+ Assertions.assertEquals(200, response.getStatus());
+ final InputStream is = response.readEntity(InputStream.class);
+ Assertions.assertEquals('o', is.read());
+ Assertions.assertEquals('k', is.read());
+ is.close();
+ }
+ }
+
+ private static class HeadersSetter extends ResponseTransformer {
+
+ @Override
+ public com.github.tomakehurst.wiremock.http.Response transform(
+ Request request,
+ com.github.tomakehurst.wiremock.http.Response response,
+ FileSource files,
+ Parameters parameters) {
+
+ final com.github.tomakehurst.wiremock.http.HttpHeaders requestHeaders = request.getHeaders();
+
+ final HttpHeaders rsRequestHeaders = new HttpHeaders() {
+ @Override
+ public List getRequestHeader(String name) {
+ return requestHeaders.getHeader(name).values();
+ }
+
+ @Override
+ public String getHeaderString(String name) {
+ return requestHeaders.getHeader(name).firstValue();
+ }
+
+ @Override
+ public MultivaluedMap getRequestHeaders() {
+ MultivaluedMap mapHeaders = new MultivaluedHashMap<>(requestHeaders.size());
+ for (String key : requestHeaders.keys()) {
+ mapHeaders.addAll(key, requestHeaders.getHeader(key).values());
+ }
+ return mapHeaders;
+ }
+
+ @Override
+ public List getAcceptableMediaTypes() {
+ String accept = request.getHeader(HttpHeaders.ACCEPT);
+ String [] splitAccept = accept.split("/");
+ return Collections.singletonList(new MediaType(splitAccept[0], splitAccept[1]));
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List getAcceptableLanguages() {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public MediaType getMediaType() {
+ String content = request.getHeader(HttpHeaders.CONTENT_TYPE);
+ String [] splitContent = content.split("/");
+ return new MediaType(splitContent[0], splitContent[1]);
+ }
+
+ @Override
+ public Locale getLanguage() {
+ return Locale.ROOT;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Map getCookies() {
+ return Collections.EMPTY_MAP;
+ }
+
+ @Override
+ public Date getDate() {
+ return new Date();
+ }
+
+ @Override
+ public int getLength() {
+ return 0; //no entity
+ }
+ };
+
+ final Response rsResponse = basicResource.headers(rsRequestHeaders);
+ com.github.tomakehurst.wiremock.http.HttpHeaders responseHeaders =
+ com.github.tomakehurst.wiremock.http.HttpHeaders.noHeaders();
+ for (Map.Entry> entry : rsResponse.getHeaders().entrySet()) {
+ responseHeaders = responseHeaders.plus(new HttpHeader(entry.getKey(), entry.getValue().get(0).toString()));
+ }
+
+ return com.github.tomakehurst.wiremock.http.Response.response()
+ .headers(responseHeaders).body(rsResponse.getEntity().toString()).build();
+ }
+
+ @Override
+ public String getName() {
+ return "response-headers-setter";
+ }
+
+ @Override
+ public boolean applyGlobally() {
+ return false;
+ }
+ }
+}
diff --git a/jersey/connector/src/test/java/io/helidon/jersey/connector/FollowRedirectsTest.java b/jersey/connector/src/test/java/io/helidon/jersey/connector/FollowRedirectsTest.java
new file mode 100644
index 00000000000..e85d16954bc
--- /dev/null
+++ b/jersey/connector/src/test/java/io/helidon/jersey/connector/FollowRedirectsTest.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package io.helidon.jersey.connector;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.client.ClientRequestContext;
+import javax.ws.rs.client.ClientResponseContext;
+import javax.ws.rs.client.ClientResponseFilter;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.github.tomakehurst.wiremock.extension.Extension;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.ClientResponse;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+/**
+ * Helidon connector follow redirect tests.
+ */
+public class FollowRedirectsTest extends AbstractTest {
+
+ private static RedirectResource redirectResource;
+
+ @Path("/test")
+ public static class RedirectResource {
+ @GET
+ public String get() {
+ return "GET";
+ }
+
+ @GET
+ @Path("redirect")
+ public Response redirect() {
+ return Response.seeOther(UriBuilder.fromResource(RedirectResource.class).build()).build();
+ }
+ }
+
+ @BeforeAll
+ public static void setup() {
+ redirectResource = new RedirectResource();
+ UncachedResponseMethodExecutor executor = new UncachedResponseMethodExecutor(redirectResource::redirect);
+ AbstractTest.extensions.set(new Extension[] {
+ executor,
+ new ContentLengthSetter()
+ });
+
+ AbstractTest.rules.set(
+ () -> {
+ wireMock.stubFor(
+ WireMock.get(WireMock.urlEqualTo("/test/redirect")).willReturn(
+ WireMock.ok().withTransformers(executor.getName())
+ )
+ );
+ wireMock.stubFor(
+ WireMock.get(WireMock.urlEqualTo("/test")).willReturn(
+ WireMock.ok(redirectResource.get())
+ )
+ );
+ });
+
+ AbstractTest.setup();
+ }
+
+ @Override
+ protected WebTarget target(String uri, String entityType) {
+ WebTarget target = super.target(uri, entityType);
+ target.register(RedirectTestFilter.class);
+ return target;
+ }
+
+ private static class RedirectTestFilter implements ClientResponseFilter {
+ public static final String RESOLVED_URI_HEADER = "resolved-uri";
+
+ @Override
+ public void filter(ClientRequestContext requestContext, ClientResponseContext responseContext) throws IOException {
+ if (responseContext instanceof ClientResponse) {
+ ClientResponse clientResponse = (ClientResponse) responseContext;
+ responseContext.getHeaders().putSingle(RESOLVED_URI_HEADER, clientResponse.getResolvedRequestUri().toString());
+ }
+ }
+ }
+
+ @ParamTest
+ public void testDoFollow(String entityType) {
+ Response r = target("test/redirect", entityType).register(RedirectTestFilter.class).request().get();
+ Assertions.assertEquals(200, r.getStatus());
+ Assertions.assertEquals("GET", r.readEntity(String.class));
+
+ Assertions.assertEquals(
+ UriBuilder.fromUri(getBaseUri()).path(RedirectResource.class).build().toString(),
+ r.getHeaderString(RedirectTestFilter.RESOLVED_URI_HEADER));
+ }
+
+ @ParamTest
+ public void testDontFollow(String entityType) {
+ WebTarget t = target("test/redirect", entityType);
+ t.property(ClientProperties.FOLLOW_REDIRECTS, false);
+ Assertions.assertEquals(303, t.request().get().getStatus());
+ }
+}
diff --git a/jersey/connector/src/test/java/io/helidon/jersey/connector/LargeDataTest.java b/jersey/connector/src/test/java/io/helidon/jersey/connector/LargeDataTest.java
new file mode 100644
index 00000000000..ea99d120c12
--- /dev/null
+++ b/jersey/connector/src/test/java/io/helidon/jersey/connector/LargeDataTest.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.jersey.connector;
+
+import javax.ws.rs.ServerErrorException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.github.tomakehurst.wiremock.common.FileSource;
+import com.github.tomakehurst.wiremock.extension.Extension;
+import com.github.tomakehurst.wiremock.extension.Parameters;
+import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
+import com.github.tomakehurst.wiremock.http.Request;
+import org.glassfish.jersey.client.ClientProperties;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * The LargeDataTest reproduces a problem when bytes of large data sent are incorrectly sent.
+ * As a result, the request body is different than what was sent by the client.
+ *
+ * In order to be able to inspect the request body, the generated data is a sequence of numbers
+ * delimited with new lines. Such as
+ *
+ * 1
+ * 2
+ * 3
+ *
+ * ...
+ *
+ * 57234
+ * 57235
+ * 57236
+ *
+ * ...
+ *
+ * It is also possible to send the data to netcat: {@code nc -l 8080} and verify the problem is
+ * on the client side.
+ */
+public class LargeDataTest extends AbstractTest {
+
+ private static final String ENTITY_NAME = HelidonEntity.HelidonEntityType.READABLE_BYTE_CHANNEL.name();
+
+ private static final int LONG_DATA_SIZE = 100_000; // for large set around 5GB, try e.g.: 536_870_912;
+ private static volatile Throwable exception;
+ private static LongDataReceiver receiver = new LongDataReceiver();
+
+ @BeforeAll
+ public static void setup() {
+ AbstractTest.extensions.set(new Extension[] {
+ receiver, new ContentLengthSetter()
+ });
+
+ AbstractTest.rules.set(
+ () -> wireMock.stubFor(
+ WireMock.any(WireMock.anyUrl()).willReturn(
+ WireMock.ok()
+ )
+ )
+ );
+
+ AbstractTest.setup();
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ receiver.close();
+
+ AbstractTest.tearDown();
+ }
+
+ protected WebTarget target(String uri, String entityType) {
+ WebTarget target = super.target(uri, entityType);
+ target.property(ClientProperties.READ_TIMEOUT, (int) TimeUnit.MINUTES.toMillis(1L));
+ return target;
+ }
+
+ @Test
+ public void postWithLargeData() throws Throwable {
+ long milis = System.currentTimeMillis();
+ WebTarget webTarget = target("test", ENTITY_NAME);
+
+ Response response = webTarget.request().post(Entity.entity(longData(LONG_DATA_SIZE), MediaType.TEXT_PLAIN_TYPE));
+
+ try {
+ if (exception != null) {
+
+ // the reason to throw the exception is that IntelliJ gives you an option to compare the expected with the actual
+ throw exception;
+ }
+
+ Assertions.assertEquals(
+ Response.Status.Family.SUCCESSFUL,
+ response.getStatusInfo().getFamily(),
+ "Unexpected error: " + response.getStatus());
+ } finally {
+ response.close();
+ }
+ if (LONG_DATA_SIZE > 9_999) {
+ System.out.println("Large Data Test took " + (System.currentTimeMillis() - milis) + "milis");
+ }
+ }
+
+ private static StreamingOutput longData(long sequence) {
+ return out -> {
+ long offset = 0;
+ while (offset < sequence) {
+ out.write(Long.toString(offset).getBytes());
+ out.write('\n');
+ offset++;
+ }
+ };
+ }
+
+ static class LongDataReceiver extends ResponseTransformer implements AutoCloseable {
+
+ final BlockingQueue queue = new ArrayBlockingQueue<>(8192);
+ final DataVerifier verifier = new DataVerifier();
+ final Thread thread;
+ LongDataReceiver() {
+ thread = new Thread(verifier);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ @Override
+ public void close() {
+ thread.interrupt();
+ }
+
+ class DataVerifier implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ longData(LONG_DATA_SIZE).write(new OutputStream() {
+
+ private long position = 0;
+ // private long mbRead = 0;
+
+ @Override
+ public void write(final int generated) throws IOException {
+ int received = 0;
+ try {
+ received = queue.take().intValue();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ if (received != generated) {
+ throw new IOException("Bytes don't match at position " + position
+ + ": received=" + received
+ + ", generated=" + generated);
+ }
+
+ // position++;
+ // System.out.println("position" + position);
+ // if (position % (1024 * 1024) == 0) {
+ // mbRead++;
+ // System.out.println("MB read: " + mbRead);
+ // }
+ }
+ });
+
+ } catch (IOException e) {
+ exception = e;
+ throw new ServerErrorException(e.getMessage(), 500, e);
+ }
+ }
+ }
+
+ @Override
+ public com.github.tomakehurst.wiremock.http.Response transform(
+ Request request,
+ com.github.tomakehurst.wiremock.http.Response response,
+ FileSource files,
+ Parameters parameters) {
+
+ byte [] content = request.getBody();
+ for (Byte b : content) {
+ do {
+ if (0 < queue.remainingCapacity()) {
+ queue.add(b);
+ break;
+ }
+ } while (true);
+ }
+ return com.github.tomakehurst.wiremock.http.Response.response().build();
+ }
+
+ @Override
+ public String getName() {
+ return "long-data-transformer";
+ }
+ }
+}
diff --git a/jersey/connector/src/test/java/io/helidon/jersey/connector/ParallelTest.java b/jersey/connector/src/test/java/io/helidon/jersey/connector/ParallelTest.java
new file mode 100644
index 00000000000..1e3c78f2c4b
--- /dev/null
+++ b/jersey/connector/src/test/java/io/helidon/jersey/connector/ParallelTest.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.jersey.connector;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.github.tomakehurst.wiremock.extension.Extension;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+
+/**
+ * Tests the parallel execution of multiple requests.
+ */
+public class ParallelTest extends AbstractTest {
+ private static final Logger LOGGER = Logger.getLogger(ParallelTest.class.getName());
+
+ private static final int PARALLEL_CLIENTS = 10;
+ private static final String PATH = "/test";
+ private static final AtomicInteger receivedCounter = new AtomicInteger(0);
+ private static final AtomicInteger resourceCounter = new AtomicInteger(0);
+ private static final CyclicBarrier startBarrier = new CyclicBarrier(PARALLEL_CLIENTS + 1);
+ private static final CountDownLatch doneLatch = new CountDownLatch(PARALLEL_CLIENTS);
+ private static final MyResource resource = new MyResource();
+
+ @Path(PATH)
+ public static class MyResource {
+
+ @GET
+ public String get() {
+ sleep();
+ resourceCounter.addAndGet(1);
+ return "GET";
+ }
+
+ private void sleep() {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException ex) {
+ LOGGER.log(Level.SEVERE, null, ex);
+ }
+ }
+ }
+
+ @BeforeAll
+ public static void setup() {
+ LOGGER.addHandler(new ConsoleHandler());
+ final UncachedStringMethodExecutor executor = new UncachedStringMethodExecutor(resource::get);
+
+ AbstractTest.extensions.set(new Extension[] {
+ executor,
+ new ContentLengthSetter()
+ });
+
+ AbstractTest.rules.set(
+ () -> {
+ wireMock.stubFor(
+ WireMock.get(WireMock.urlEqualTo(PATH)).willReturn(
+ WireMock.ok().withTransformers(executor.getName())
+ )
+ );
+ });
+
+ AbstractTest.setup();
+ }
+
+
+ @Test
+ public void testParallel() throws BrokenBarrierException, InterruptedException, TimeoutException {
+ final ScheduledExecutorService executor = Executors.newScheduledThreadPool(PARALLEL_CLIENTS);
+
+ try {
+ final WebTarget target = target("");
+ for (int i = 1; i <= PARALLEL_CLIENTS; i++) {
+ final int id = i;
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ startBarrier.await();
+ Response response;
+ response = target.path(PATH).request().get();
+ Assertions.assertEquals("GET", response.readEntity(String.class));
+ receivedCounter.incrementAndGet();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ LOGGER.log(Level.WARNING, "Client thread " + id + " interrupted.", ex);
+ } catch (BrokenBarrierException ex) {
+ LOGGER.log(Level.INFO, "Client thread " + id + " failed on broken barrier.", ex);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ LOGGER.log(Level.WARNING, "Client thread " + id + " failed on unexpected exception.", t);
+ } finally {
+ doneLatch.countDown();
+ }
+ }
+ });
+ }
+
+ startBarrier.await(1, TimeUnit.SECONDS);
+
+ Assertions.assertTrue(
+ doneLatch.await(10, TimeUnit.SECONDS),
+ "Waiting for clients to finish has timed out."
+ );
+
+ Assertions.assertEquals(PARALLEL_CLIENTS, resourceCounter.get(), "Resource counter");
+
+ Assertions.assertEquals(PARALLEL_CLIENTS, receivedCounter.get(), "Received counter");
+ } finally {
+ executor.shutdownNow();
+ Assertions.assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS), "Executor termination");
+ }
+ }
+}
diff --git a/jersey/connector/src/test/java/io/helidon/jersey/connector/TimeoutTest.java b/jersey/connector/src/test/java/io/helidon/jersey/connector/TimeoutTest.java
new file mode 100644
index 00000000000..1f1d0ac1cbd
--- /dev/null
+++ b/jersey/connector/src/test/java/io/helidon/jersey/connector/TimeoutTest.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.jersey.connector;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.core.Response;
+
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.github.tomakehurst.wiremock.extension.Extension;
+import org.glassfish.jersey.client.ClientProperties;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class TimeoutTest extends AbstractTest {
+ private static TimeoutResource timeoutResource;
+
+ @Path("/test")
+ public static class TimeoutResource {
+ @GET
+ public String get() {
+ return "GET";
+ }
+
+ @GET
+ @Path("timeout")
+ public String getTimeout() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return "GET";
+ }
+ }
+
+ @BeforeAll
+ public static void setup() {
+ timeoutResource = new TimeoutResource();
+ UncachedStringMethodExecutor sleepExecutor = new UncachedStringMethodExecutor(timeoutResource::getTimeout);
+
+ AbstractTest.extensions.set(new Extension[] {
+ sleepExecutor,
+ new ContentLengthSetter()
+ });
+
+ AbstractTest.rules.set(
+ () -> {
+ wireMock.stubFor(
+ WireMock.get(WireMock.urlEqualTo("/test")).willReturn(
+ WireMock.ok(timeoutResource.get())
+ )
+ );
+ wireMock.stubFor(
+ WireMock.get(WireMock.urlEqualTo("/test/timeout")).willReturn(
+ WireMock.ok().withTransformers(sleepExecutor.getName())
+ )
+ );
+ });
+
+ AbstractTest.setup();
+ }
+
+ @Test
+ public void testFast() {
+ Response r = target("test").request().get();
+ Assertions.assertEquals(200, r.getStatus());
+ Assertions.assertEquals("GET", r.readEntity(String.class));
+ }
+
+ @Test
+ public void testSlow() {
+ try {
+ target("test/timeout").property(ClientProperties.READ_TIMEOUT, 1_000).request().get();
+ Assertions.fail("Timeout expected.");
+ } catch (ProcessingException e) {
+ assertTimeoutException(e);
+ }
+ }
+
+ @Test
+ public void testTimeoutInRequest() {
+ try {
+ target("test/timeout").request().property(ClientProperties.READ_TIMEOUT, 1_000).get();
+ Assertions.fail("Timeout expected.");
+ } catch (ProcessingException e) {
+ assertTimeoutException(e);
+ }
+ }
+
+ private void assertTimeoutException(Exception e) {
+ String exceptionName = "TimeoutException"; // check netty or JDK TimeoutException
+ Throwable t = e.getCause();
+ while (t != null) {
+ if (t.getClass().getSimpleName().contains(exceptionName)) {
+ break;
+ }
+ t = t.getCause();
+ }
+ if (t == null) {
+ if (e.getCause() != null) {
+ if (e.getCause().getCause() != null) {
+ Assertions.fail("Unexpected processing exception cause" + e.getCause().getCause().getMessage());
+ } else {
+ Assertions.fail("Unexpected processing exception cause" + e.getCause().getMessage());
+ }
+ } else {
+ Assertions.fail("Unexpected processing exception cause" + e.getMessage());
+ }
+ }
+ }
+}
diff --git a/jersey/pom.xml b/jersey/pom.xml
index 660e267f5c2..6f2541ee22f 100644
--- a/jersey/pom.xml
+++ b/jersey/pom.xml
@@ -37,6 +37,7 @@
client
+ connector
server
jsonp
common