diff --git a/lra/coordinator/client/narayana-client/pom.xml b/lra/coordinator/client/narayana-client/pom.xml
index e124e0d21fa..533610b310c 100644
--- a/lra/coordinator/client/narayana-client/pom.xml
+++ b/lra/coordinator/client/narayana-client/pom.xml
@@ -37,12 +37,12 @@
compile
- io.helidon.reactive.webclient
- helidon-reactive-webclient
+ io.helidon.nima.webclient
+ helidon-nima-webclient
- io.helidon.reactive.fault-tolerance
- helidon-reactive-fault-tolerance
+ io.helidon.nima.fault-tolerance
+ helidon-nima-fault-tolerance
io.helidon.microprofile.config
diff --git a/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/LraStatusReader.java b/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/LraStatusReader.java
deleted file mode 100644
index d43165ec0f4..00000000000
--- a/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/LraStatusReader.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2021, 2022 Oracle and/or its affiliates.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.helidon.lra.coordinator.client.narayana;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.Flow;
-import java.util.stream.Collectors;
-
-import io.helidon.common.GenericType;
-import io.helidon.common.http.DataChunk;
-import io.helidon.common.reactive.Multi;
-import io.helidon.common.reactive.Single;
-import io.helidon.reactive.media.common.MessageBodyReader;
-import io.helidon.reactive.media.common.MessageBodyReaderContext;
-
-import org.eclipse.microprofile.lra.annotation.LRAStatus;
-
-class LraStatusReader implements MessageBodyReader {
- @Override
- public PredicateResult accept(GenericType> type, MessageBodyReaderContext context) {
- return PredicateResult.supports(LRAStatus.class, type);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public Single read(Flow.Publisher pub,
- GenericType type,
- MessageBodyReaderContext context) {
- return (Single) Multi.create(pub)
- .map(DataChunk::data)
- .flatMapIterable(List::of)
- .collectStream(Collectors.reducing(ByteBuffer.allocate(0), (ident, val) ->
- ByteBuffer.allocate(val.capacity() + ident.capacity())
- .put(ident)
- .put(val.asReadOnlyBuffer())
- .rewind()))
- .map(ByteBuffer::array)
- .map(String::new)
- .map(LRAStatus::valueOf);
- }
-}
diff --git a/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/LraStatusSupport.java b/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/LraStatusSupport.java
new file mode 100644
index 00000000000..28a79f69aa3
--- /dev/null
+++ b/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/LraStatusSupport.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2021, 2023 Oracle and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.helidon.lra.coordinator.client.narayana;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+
+import io.helidon.common.GenericType;
+import io.helidon.common.http.Headers;
+import io.helidon.common.http.HttpMediaType;
+import io.helidon.nima.http.media.EntityReader;
+import io.helidon.nima.http.media.MediaSupport;
+
+import org.eclipse.microprofile.lra.annotation.LRAStatus;
+
+class LraStatusSupport implements MediaSupport {
+
+ private static final GenericType LRA_STATUS_TYPE = GenericType.create(LRAStatus.class);
+ private final EntityReader reader = new LRAStatusReader();
+
+ @Override
+ public ReaderResponse reader(GenericType type, Headers headers) {
+ if (type.equals(LRA_STATUS_TYPE)) {
+ return new ReaderResponse<>(SupportLevel.SUPPORTED, this::reader);
+ }
+ return ReaderResponse.unsupported();
+ }
+
+ @Override
+ public ReaderResponse reader(GenericType type, Headers requestHeaders, Headers responseHeaders) {
+ if (type.equals(LRA_STATUS_TYPE)) {
+ return new ReaderResponse<>(SupportLevel.SUPPORTED, this::reader);
+ }
+ return ReaderResponse.unsupported();
+ }
+
+ private EntityReader reader(){
+ return reader;
+ }
+
+ private static class LRAStatusReader implements EntityReader {
+ @Override
+ public LRAStatus read(GenericType type, InputStream stream, Headers headers) {
+ return read(stream, headers.contentType());
+ }
+
+ @Override
+ public LRAStatus read(GenericType type, InputStream stream, Headers requestHeaders, Headers responseHeaders) {
+ return read(stream, responseHeaders.contentType());
+ }
+
+ private LRAStatus read(InputStream stream, Optional contentType) {
+ Charset charset = contentType
+ .flatMap(HttpMediaType::charset)
+ .map(Charset::forName)
+ .orElse(StandardCharsets.UTF_8);
+ try (stream) {
+ String stringStatus = new String(stream.readAllBytes(), charset);
+ return LRAStatus.valueOf(stringStatus);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+}
diff --git a/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/NarayanaClient.java b/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/NarayanaClient.java
index f65a20ba925..e20060ca0b2 100644
--- a/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/NarayanaClient.java
+++ b/lra/coordinator/client/narayana-client/src/main/java/io/helidon/lra/coordinator/client/narayana/NarayanaClient.java
@@ -27,17 +27,20 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import io.helidon.common.http.Headers;
+import io.helidon.common.http.ClientRequestHeaders;
import io.helidon.common.http.Http;
-import io.helidon.common.reactive.Single;
+import io.helidon.common.http.WritableHeaders;
+import io.helidon.common.socket.SocketOptions;
import io.helidon.lra.coordinator.client.CoordinatorClient;
import io.helidon.lra.coordinator.client.CoordinatorConnectionException;
import io.helidon.lra.coordinator.client.Participant;
import io.helidon.lra.coordinator.client.PropagatedHeaders;
-import io.helidon.reactive.faulttolerance.Retry;
-import io.helidon.reactive.webclient.WebClient;
-import io.helidon.reactive.webclient.WebClientRequestHeaders;
-import io.helidon.reactive.webclient.WebClientResponse;
+import io.helidon.nima.faulttolerance.Retry;
+import io.helidon.nima.http.media.MediaContext;
+import io.helidon.nima.webclient.WebClient;
+import io.helidon.nima.webclient.http1.Http1Client;
+import io.helidon.nima.webclient.http1.Http1ClientRequest;
+import io.helidon.nima.webclient.http1.Http1ClientResponse;
import org.eclipse.microprofile.lra.annotation.LRAStatus;
import org.eclipse.microprofile.lra.annotation.ws.rs.LRA;
@@ -62,6 +65,15 @@ public class NarayanaClient implements CoordinatorClient {
private Duration coordinatorTimeout;
private Retry retry;
+ static URI parseBaseUri(String lraUri) {
+ Matcher m = LRA_ID_PATTERN.matcher(lraUri);
+ if (!m.matches()) {
+ //LRA id uri format
+ throw new RuntimeException("Error when parsing lra uri: " + lraUri);
+ }
+ return URI.create(m.group(1));
+ }
+
@Override
public void init(Supplier coordinatorUriSupplier, Duration timeout) {
this.coordinatorUriSupplier = coordinatorUriSupplier;
@@ -75,217 +87,211 @@ public void init(Supplier coordinatorUriSupplier, Duration timeout) {
}
@Override
- public Single start(String clientID, PropagatedHeaders headers, long timeout) {
+ public URI start(String clientID, PropagatedHeaders headers, long timeout) {
return startInternal(null, clientID, headers, timeout);
}
@Override
- public Single start(URI parentLRAUri, String clientID, PropagatedHeaders headers, long timeout) {
+ public URI start(URI parentLRAUri, String clientID, PropagatedHeaders headers, long timeout) {
return startInternal(parentLRAUri, clientID, headers, timeout);
}
- private Single startInternal(URI parentLRA, String clientID, PropagatedHeaders headers, long timeout) {
+ private URI startInternal(URI parentLRA, String clientID, PropagatedHeaders headers, long timeout) {
// We need to call coordinator which knows parent LRA
URI baseUri = Optional.ofNullable(parentLRA)
.map(p -> parseBaseUri(p.toASCIIString()))
.orElse(coordinatorUriSupplier.get());
- return retry.invoke(() -> prepareWebClient(baseUri)
- .post()
- .path("start")
- .headers(copyHeaders(headers)) // header propagation
- .queryParam(QUERY_PARAM_CLIENT_ID, Optional.ofNullable(clientID).orElse(""))
- .queryParam(QUERY_PARAM_TIME_LIMIT, String.valueOf(timeout))
- .queryParam(QUERY_PARAM_PARENT_LRA, parentLRA == null ? "" : parentLRA.toASCIIString())
- .submit()
- .flatMap(res -> {
- Http.Status status = res.status();
- if (status.code() != 201) {
- return res.content().as(String.class).flatMap(cont ->
- connectionError("Unexpected response " + status + " from coordinator "
- + res.lastEndpointURI() + ": " + cont, null));
- } else {
- //propagate supported headers from coordinator
- headers.scan(res.headers().toMap());
- return Single.just(res);
- }
- })
- .map(res -> res
- .headers()
- .location()
+ return retry.invoke(() -> {
+ Http1ClientRequest req = prepareWebClient(baseUri)
+ .post()
+ .path("start")
+ .headers(copyHeaders(headers)) // header propagation
+ .queryParam(QUERY_PARAM_CLIENT_ID, Optional.ofNullable(clientID).orElse(""))
+ .queryParam(QUERY_PARAM_TIME_LIMIT, String.valueOf(timeout))
+ .queryParam(QUERY_PARAM_PARENT_LRA, parentLRA == null ? "" : parentLRA.toASCIIString());
+
+ try (Http1ClientResponse res = req.request()) {
+ Http.Status status = res.status();
+ if (status.code() != 201) {
+ throw connectionError("Unexpected response " + status + " from coordinator "
+ + req.resolvedUri() + ": " + res.as(String.class), null);
+ }
+ //propagate supported headers from coordinator
+ headers.scan(res.headers().toMap());
+ URI lraId = res.headers().first(Http.Header.LOCATION)
// TMM doesn't send lraId as LOCATION
- .or(() -> res.headers()
- .first(LRA_HTTP_CONTEXT_HEADER)
- .map(URI::create))
+ .or(() -> res.headers().first(LRA_HTTP_CONTEXT_HEADER))
+ .map(URI::create)
.orElseThrow(() ->
new IllegalArgumentException(
"Coordinator needs to return lraId either as 'Location' or "
- + "'Long-Running-Action' header."))
- )
- .onErrorResumeWith(t -> connectionError("Unable to start LRA", t))
- .peek(lraId -> logF("LRA started - LRAID: {0} parent: {1}", lraId, parentLRA))
- .first());
+ + "'Long-Running-Action' header."));
+ logF("LRA started - LRAID: {0} parent: {1}", lraId, parentLRA);
+ return lraId;
+
+ } catch (Exception e) {
+ throw connectionError("Unable to start LRA", e);
+ }
+ });
}
@Override
- public Single cancel(URI lraId, PropagatedHeaders headers) {
- return retry.invoke(() -> prepareWebClient(lraId)
- .put()
- .path("/cancel")
- .headers(copyHeaders(headers)) // header propagation
- .submit()
- .map(WebClientResponse::status)
- .flatMap(status -> {
- switch (status.family()) {
- case SUCCESSFUL:
- logF("LRA cancelled - LRAID: {0}", lraId);
- return Single.empty();
- case CLIENT_ERROR:
- logF("Unexpected client error during LRA cancel - LRAID: {0}, Status: {1}", lraId, status.code());
- return Single.empty();
- default:
- return connectionError("Unable to cancel lra " + lraId, status.code());
- }
- })
- .onErrorResumeWith(t -> connectionError("Unable to cancel LRA " + lraId, t))
- .first()
- .map(Void.TYPE::cast)
- );
+ public void cancel(URI lraId, PropagatedHeaders headers) {
+ retry.invoke(() -> {
+ var req = prepareWebClient(lraId)
+ .put()
+ .path("/cancel")
+ .headers(copyHeaders(headers)); // header propagation
+
+ try (var res = req.request()) {
+ switch (res.status().family()) {
+ case SUCCESSFUL:
+ logF("LRA cancelled - LRAID: {0}", lraId);
+ return null;
+ case CLIENT_ERROR:
+ logF("Unexpected client error during LRA cancel - LRAID: {0}, Status: {1}", lraId, res.status().code());
+ return null;
+ default:
+ throw connectionError("Unable to cancel lra " + lraId, res.status().code());
+ }
+ } catch (Exception e) {
+ throw connectionError("Unable to cancel LRA " + lraId, e);
+ }
+ });
}
@Override
- public Single close(URI lraId, PropagatedHeaders headers) {
- return retry.invoke(() -> prepareWebClient(lraId)
- .put()
- .path("/close")
- .headers(copyHeaders(headers)) // header propagation
- .submit()
- .map(WebClientResponse::status)
- .flatMap(status -> {
- switch (status.family()) {
- case SUCCESSFUL:
- logF("LRA closed - LRAID: {0}", lraId);
- return Single.empty();
- case CLIENT_ERROR:
- default:
- // 404 can happen when coordinator already cleaned terminated lra's
- if (List.of(410, 404).contains(status.code())) {
- logF("LRA already closed - LRAID: {0}", lraId);
- return Single.empty();
- }
- return connectionError("Unable to close lra - LRAID: " + lraId, status.code());
+ public void close(URI lraId, PropagatedHeaders headers) {
+ retry.invoke(() -> {
+ var req = prepareWebClient(lraId)
+ .put()
+ .path("/close")
+ .headers(copyHeaders(headers)); // header propagation
+
+ try (var res = req.request()) {
+ switch (res.status().family()) {
+ case SUCCESSFUL:
+ logF("LRA closed - LRAID: {0}", lraId);
+ return null;
+ case CLIENT_ERROR:
+ default:
+ // 404 can happen when coordinator already cleaned terminated lra's
+ if (List.of(410, 404).contains(res.status().code())) {
+ logF("LRA already closed - LRAID: {0}", lraId);
+ }
+ return connectionError("Unable to close lra - LRAID: " + lraId, res.status().code());
+ }
+ } catch (Exception e) {
+ throw connectionError("Unable to close LRA " + lraId, e);
}
- })
- .onErrorResumeWith(t -> connectionError("Unable to close LRA " + lraId, t))
- .first()
- .map(Void.TYPE::cast)
+ }
);
}
@Override
- public Single> join(URI lraId,
- PropagatedHeaders headers,
- long timeLimit,
- Participant p) {
+ public Optional join(URI lraId,
+ PropagatedHeaders headers,
+ long timeLimit,
+ Participant p) {
String links = compensatorLinks(p);
- return retry.invoke(() -> prepareWebClient(lraId)
- .put()
- .queryParam(QUERY_PARAM_TIME_LIMIT, String.valueOf(timeLimit))
- .headers(h -> {
- h.add(HEADER_LINK, links); // links are expected either in header
- headers.toMap().forEach((name, value) -> h.set(Http.Header.create(name), value)); // header propagation
- return h;
- })
- .submit(links) // or as a body
- .flatMap(res -> {
- switch (res.status().code()) {
- case 412:
- return connectionError(res.lastEndpointURI()
- + " Too late to join LRA - LRAID: " + lraId, 412);
- case 404:
- // Narayana returns 404 for already terminated lras
- case 410:
- return connectionError("Not found " + lraId, 410);
- case 200:
- return Single.just(res
- .headers()
- .first(LRA_HTTP_RECOVERY_HEADER)
- .map(URI::create));
-
- default:
- return connectionError("Unexpected coordinator response ", res.status().code());
- }
- })
- .onErrorResumeWith(t -> connectionError("Unable to join LRA", t))
- .peek(uri -> logF("Participant {0} joined - LRAID: {1}", p, lraId))
- .first());
+ return retry.invoke(() -> {
+ var req = prepareWebClient(lraId)
+ .put()
+ .queryParam(QUERY_PARAM_TIME_LIMIT, String.valueOf(timeLimit))
+ .headers(h -> {
+ h.add(Http.Header.createCached(HEADER_LINK, links)); // links are expected either in header
+ headers.toMap().forEach((name, value) -> h.set(Http.Header.create(name), value)); // header propagation
+ return h;
+ });
+
+ try (var res = req.submit(links)) {
+ switch (res.status().code()) {
+ case 412:
+ throw connectionError(req.resolvedUri()
+ + " Too late to join LRA - LRAID: " + lraId, 412);
+ case 404:
+ // Narayana returns 404 for already terminated lras
+ throw connectionError("Not found " + lraId, 404);
+ case 410:
+ throw connectionError("Not found " + lraId, 410);
+ case 200:
+ logF("Participant {0} joined - LRAID: {1}", p, lraId);
+ return res.headers()
+ .first(LRA_HTTP_RECOVERY_HEADER)
+ .map(URI::create);
+ default:
+ throw connectionError("Unexpected coordinator response ", res.status().code());
+ }
+ } catch (Exception e) {
+ throw connectionError("Unable to join LRA", e);
+ }
+ });
}
@Override
- public Single leave(URI lraId, PropagatedHeaders headers, Participant p) {
- return retry.invoke(() -> prepareWebClient(lraId)
- .put()
- .path("/remove")
- .headers(copyHeaders(headers)) // header propagation
- .submit(compensatorLinks(p))
- .flatMap(res -> {
- switch (res.status().code()) {
- case 404:
- LOGGER.log(Level.WARNING,
- "Participant {0} leaving LRA - Coordinator can't find id - LRAID: {1}",
- new Object[] {p, lraId});
- return Single.empty();
- case 200:
- logF("Participant {0} left - LRAID: {1}", p, lraId);
- return Single.empty();
- default:
- throw new IllegalStateException("Unexpected coordinator response " + res.status());
- }
- })
- .onErrorResumeWith(t -> connectionError("Unable to leave LRA " + lraId, t))
- .first()
- .map(Void.TYPE::cast)
- );
- }
+ public void leave(URI lraId, PropagatedHeaders headers, Participant p) {
+ retry.invoke(() -> {
+ var req = prepareWebClient(lraId)
+ .put()
+ .path("/remove")
+ .headers(copyHeaders(headers)); // header propagation
+ try (var res = req.submit(compensatorLinks(p))) {
+ switch (res.status().code()) {
+ case 404:
+ LOGGER.log(Level.WARNING,
+ "Participant {0} leaving LRA - Coordinator can't find id - LRAID: {1}", p, lraId);
+ return null;
+ case 200:
+ logF("Participant {0} left - LRAID: {1}", p, lraId);
+ return null;
+ default:
+ throw new IllegalStateException("Unexpected coordinator response " + res.status());
+ }
+ } catch (Exception e) {
+ throw connectionError("Unable to leave LRA " + lraId, e);
+ }
+ });
+ }
@Override
- public Single status(URI lraId, PropagatedHeaders headers) {
- return retry.invoke(() -> prepareWebClient(lraId)
- .get()
- .path("/status")
- .headers(copyHeaders(headers)) // header propagation
- .request()
- .flatMap(res -> {
- switch (res.status().code()) {
- case 404:
- LOGGER.log(Level.WARNING, "Status LRA - Coordinator can't find id - LRAID: " + lraId);
- return Single.just(LRAStatus.Closed);
- case 200:
- case 202:
- return res
- .content()
- .as(LRAStatus.class)
- .peek(status -> logF("LRA status {0} retrieved - LRAID: {1}", status, lraId));
- default:
- throw new IllegalStateException("Unexpected coordinator response " + res.status());
- }
- })
- .onErrorResumeWith(t ->
- connectionError("Unable to retrieve LRA status of " + lraId, t))
- .first()
- );
+ public LRAStatus status(URI lraId, PropagatedHeaders headers) {
+ return retry.invoke(() -> {
+ var req = prepareWebClient(lraId)
+ .get()
+ .path("/status")
+ .headers(copyHeaders(headers)); // header propagation
+ try (var res = req.request()) {
+ switch (res.status().code()) {
+ case 404:
+ LOGGER.log(Level.WARNING, "Status LRA - Coordinator can't find id - LRAID: " + lraId);
+ return LRAStatus.Closed;
+ case 200:
+ case 202:
+ var status = res.as(LRAStatus.class);
+ logF("LRA status {0} retrieved - LRAID: {1}", status, lraId);
+ return status;
+ default:
+ throw new IllegalStateException("Unexpected coordinator response " + res.status());
+ }
+ } catch (Exception e) {
+ throw connectionError("Unable to retrieve LRA status of " + lraId, e);
+ }
+ });
}
- private WebClient prepareWebClient(URI uri) {
+ private Http1Client prepareWebClient(URI uri) {
return WebClient.builder()
.baseUri(uri)
- // Workaround for #3242
- .keepAlive(false)
- .connectTimeout(coordinatorTimeout)
- .readTimeout(coordinatorTimeout)
- .addReader(new LraStatusReader())
+ .channelOptions(SocketOptions.builder()
+ .connectTimeout(coordinatorTimeout)
+ .readTimeout(coordinatorTimeout)
+ .build())
+ .mediaContext(MediaContext.builder()
+ .addMediaSupport(new LraStatusSupport())
+ .build())
.build();
}
@@ -307,13 +313,13 @@ private WebClient prepareWebClient(URI uri) {
*/
private String compensatorLinks(Participant p) {
return Map.of(
- "compensate", p.compensate(),
- "complete", p.complete(),
- "forget", p.forget(),
- "leave", p.leave(),
- "after", p.after(),
- "status", p.status()
- )
+ "compensate", p.compensate(),
+ "complete", p.complete(),
+ "forget", p.forget(),
+ "leave", p.leave(),
+ "after", p.after(),
+ "status", p.status()
+ )
.entrySet()
.stream()
.filter(e -> e.getValue().isPresent())
@@ -326,33 +332,24 @@ private String compensatorLinks(Participant p) {
.collect(Collectors.joining(","));
}
- static URI parseBaseUri(String lraUri) {
- Matcher m = LRA_ID_PATTERN.matcher(lraUri);
- if (!m.matches()) {
- //LRA id uri format
- throw new RuntimeException("Error when parsing lra uri: " + lraUri);
- }
- return URI.create(m.group(1));
- }
-
- private Function copyHeaders(PropagatedHeaders headers) {
+ private Function> copyHeaders(PropagatedHeaders headers) {
return wcHeaders -> {
headers.toMap().forEach((key, value) -> wcHeaders.set(Http.Header.create(key), value));
return wcHeaders;
};
}
- private Single connectionError(String message, int status) {
+ private CoordinatorConnectionException connectionError(String message, int status) {
LOGGER.log(Level.WARNING, message);
- return Single.error(new CoordinatorConnectionException(message, status));
+ return new CoordinatorConnectionException(message, status);
}
- private Single connectionError(String message, Throwable cause) {
+ private CoordinatorConnectionException connectionError(String message, Throwable cause) {
LOGGER.log(Level.WARNING, message, cause);
if (cause instanceof CoordinatorConnectionException) {
- return Single.error(cause);
+ return (CoordinatorConnectionException) cause;
}
- return Single.error(new CoordinatorConnectionException(message, cause, 500));
+ return new CoordinatorConnectionException(message, cause, 500);
}
private void logF(String msg, Object... params) {
diff --git a/lra/coordinator/client/narayana-client/src/main/java/module-info.java b/lra/coordinator/client/narayana-client/src/main/java/module-info.java
index acebadec155..108b8af5d4f 100644
--- a/lra/coordinator/client/narayana-client/src/main/java/module-info.java
+++ b/lra/coordinator/client/narayana-client/src/main/java/module-info.java
@@ -22,8 +22,8 @@
requires microprofile.lra.api;
requires io.helidon.microprofile.config;
requires io.helidon.lra.coordinator.client;
- requires io.helidon.reactive.webclient;
- requires io.helidon.reactive.faulttolerance;
+ requires io.helidon.nima.webclient;
+ requires io.helidon.nima.faulttolerance;
provides io.helidon.lra.coordinator.client.CoordinatorClient
with io.helidon.lra.coordinator.client.narayana.NarayanaClient;
diff --git a/lra/coordinator/client/spi/pom.xml b/lra/coordinator/client/spi/pom.xml
index d677ced4c59..2b27d378d61 100644
--- a/lra/coordinator/client/spi/pom.xml
+++ b/lra/coordinator/client/spi/pom.xml
@@ -36,13 +36,5 @@
microprofile-lra-api
compile
-
- io.helidon.common
- helidon-common-reactive
-
-
- io.helidon.common
- helidon-common-http
-
\ No newline at end of file
diff --git a/lra/coordinator/client/spi/src/main/java/io/helidon/lra/coordinator/client/CoordinatorClient.java b/lra/coordinator/client/spi/src/main/java/io/helidon/lra/coordinator/client/CoordinatorClient.java
index 1a1e135f514..4eab2c97c9e 100644
--- a/lra/coordinator/client/spi/src/main/java/io/helidon/lra/coordinator/client/CoordinatorClient.java
+++ b/lra/coordinator/client/spi/src/main/java/io/helidon/lra/coordinator/client/CoordinatorClient.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021, 2022 Oracle and/or its affiliates.
+ * Copyright (c) 2021, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,11 +19,8 @@
import java.net.URI;
import java.time.Duration;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-import io.helidon.common.reactive.Single;
-
import org.eclipse.microprofile.lra.annotation.LRAStatus;
/**
@@ -54,19 +51,6 @@ public interface CoordinatorClient {
*/
String CONF_DEFAULT_COORDINATOR_URL = "http://localhost:8070/lra-coordinator";
- /**
- * Initialization of the properties provided by LRA client.
- *
- * @param coordinatorUriSupplier url of the coordinator
- * @param timeout general timeout for coordinator calls
- * @param timeoutUnit timeout unit for coordinator calls
- * @deprecated use {@link #init(Supplier, Duration)} instead
- */
- @Deprecated(forRemoval = true, since = "4.0.0")
- default void init(Supplier coordinatorUriSupplier, long timeout, TimeUnit timeoutUnit){
- init(coordinatorUriSupplier, Duration.ofMillis(timeoutUnit.toMillis(timeout)));
- }
-
/**
* Initialization of the properties provided by LRA client.
*
@@ -83,7 +67,7 @@ default void init(Supplier coordinatorUriSupplier, long timeout, TimeUnit t
* @param timeout after what time should be LRA cancelled automatically
* @return id of the new LRA
*/
- Single start(String clientID, PropagatedHeaders headers, long timeout);
+ URI start(String clientID, PropagatedHeaders headers, long timeout);
/**
* Ask coordinator to start new LRA and return its id.
@@ -94,7 +78,7 @@ default void init(Supplier coordinatorUriSupplier, long timeout, TimeUnit t
* @param timeout after what time should be LRA cancelled automatically
* @return id of the new LRA
*/
- Single start(URI parentLRA, String clientID, PropagatedHeaders headers, long timeout);
+ URI start(URI parentLRA, String clientID, PropagatedHeaders headers, long timeout);
/**
* Join existing LRA with participant.
@@ -105,25 +89,23 @@ default void init(Supplier coordinatorUriSupplier, long timeout, TimeUnit t
* @param participant participant metadata with URLs to be called when complete/compensate ...
* @return recovery URI if supported by coordinator or empty
*/
- Single> join(URI lraId, PropagatedHeaders headers, long timeLimit, Participant participant);
+ Optional join(URI lraId, PropagatedHeaders headers, long timeLimit, Participant participant);
/**
* Cancel LRA if its active. Should cause coordinator to compensate its participants.
*
* @param lraId id of the LRA to be cancelled
* @param headers headers to be propagated to the coordinator
- * @return single future of the cancel call
*/
- Single cancel(URI lraId, PropagatedHeaders headers);
+ void cancel(URI lraId, PropagatedHeaders headers);
/**
* Close LRA if its active. Should cause coordinator to complete its participants.
*
* @param lraId id of the LRA to be closed
* @param headers headers to be propagated to the coordinator
- * @return single future of the cancel call
*/
- Single close(URI lraId, PropagatedHeaders headers);
+ void close(URI lraId, PropagatedHeaders headers);
/**
* Leave LRA. Supplied participant won't be part of specified LRA any more,
@@ -132,9 +114,8 @@ default void init(Supplier coordinatorUriSupplier, long timeout, TimeUnit t
* @param lraId id of the LRA that should be left by supplied participant
* @param headers headers to be propagated to the coordinator
* @param participant participant which will leave
- * @return single future of the cancel call
*/
- Single leave(URI lraId, PropagatedHeaders headers, Participant participant);
+ void leave(URI lraId, PropagatedHeaders headers, Participant participant);
/**
* Return status of specified LRA.
@@ -143,5 +124,5 @@ default void init(Supplier coordinatorUriSupplier, long timeout, TimeUnit t
* @param headers headers to be propagated to the coordinator
* @return {@link org.eclipse.microprofile.lra.annotation.LRAStatus} of the queried LRA
*/
- Single status(URI lraId, PropagatedHeaders headers);
+ LRAStatus status(URI lraId, PropagatedHeaders headers);
}
diff --git a/lra/coordinator/client/spi/src/main/java/io/helidon/lra/coordinator/client/CoordinatorConnectionException.java b/lra/coordinator/client/spi/src/main/java/io/helidon/lra/coordinator/client/CoordinatorConnectionException.java
index 8fb1b689029..ca5645eff98 100644
--- a/lra/coordinator/client/spi/src/main/java/io/helidon/lra/coordinator/client/CoordinatorConnectionException.java
+++ b/lra/coordinator/client/spi/src/main/java/io/helidon/lra/coordinator/client/CoordinatorConnectionException.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021 Oracle and/or its affiliates.
+ * Copyright (c) 2021, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,7 +20,7 @@
* Exception in communication with coordinator.
*/
public class CoordinatorConnectionException extends RuntimeException {
- private int status;
+ private final int status;
/**
* Creates exception describing an error in communication with coordinator.
diff --git a/lra/coordinator/client/spi/src/main/java/module-info.java b/lra/coordinator/client/spi/src/main/java/module-info.java
index 5c9db978c0f..55faeb85a3a 100644
--- a/lra/coordinator/client/spi/src/main/java/module-info.java
+++ b/lra/coordinator/client/spi/src/main/java/module-info.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021, 2022 Oracle and/or its affiliates.
+ * Copyright (c) 2021, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,8 +19,6 @@
*/
module io.helidon.lra.coordinator.client {
requires microprofile.lra.api;
- requires io.helidon.common.reactive;
- requires io.helidon.common.http;
exports io.helidon.lra.coordinator.client;
}
\ No newline at end of file
diff --git a/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/CoordinatorLocatorService.java b/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/CoordinatorLocatorService.java
index c0c9c01d192..b35b2e1fb38 100644
--- a/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/CoordinatorLocatorService.java
+++ b/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/CoordinatorLocatorService.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2021, 2022 Oracle and/or its affiliates.
+ * Copyright (c) 2021, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package io.helidon.microprofile.lra;
import java.net.URI;
+import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
@@ -102,7 +103,7 @@ CoordinatorClient coordinatorClient() {
+ " not found."));
}
- client.init(() -> coordinatorUriSupplier.get(), coordinatorTimeout, coordinatorTimeoutUnit);
+ client.init(() -> coordinatorUriSupplier.get(), Duration.ofMillis(coordinatorTimeoutUnit.toMillis(coordinatorTimeout)));
return client;
}
diff --git a/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/LraAnnotationHandler.java b/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/LraAnnotationHandler.java
index 7a032e3f670..37664c4158a 100644
--- a/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/LraAnnotationHandler.java
+++ b/microprofile/lra/jax-rs/src/main/java/io/helidon/microprofile/lra/LraAnnotationHandler.java
@@ -22,9 +22,9 @@
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletionException;
+import java.util.function.Supplier;
import io.helidon.common.context.Contexts;
-import io.helidon.common.reactive.Single;
import io.helidon.lra.coordinator.client.CoordinatorClient;
import io.helidon.lra.coordinator.client.CoordinatorConnectionException;
import io.helidon.lra.coordinator.client.Participant;
@@ -171,11 +171,11 @@ public void handleJaxRsAfter(ContainerRequestContext reqCtx,
}
private URI start(String clientId, PropagatedHeaders headers, long timeOut) {
- return awaitCoordinator(coordinatorClient.start(clientId, headers, timeOut));
+ return callCoordinator(() -> coordinatorClient.start(clientId, headers, timeOut));
}
private URI start(URI parentLraId, String clientId, PropagatedHeaders headers, long timeOut) {
- return awaitCoordinator(coordinatorClient.start(parentLraId, clientId, headers, timeOut));
+ return callCoordinator(() -> coordinatorClient.start(parentLraId, clientId, headers, timeOut));
}
private void join(ContainerRequestContext reqCtx,
@@ -183,16 +183,16 @@ private void join(ContainerRequestContext reqCtx,
PropagatedHeaders propagatedHeaders,
long timeLimit,
Participant participant) {
- awaitCoordinator(coordinatorClient.join(lraId, propagatedHeaders, timeLimit, participant))
+ callCoordinator(() -> coordinatorClient.join(lraId, propagatedHeaders, timeLimit, participant))
.ifPresent(uri -> reqCtx.getHeaders().add(LRA_HTTP_RECOVERY_HEADER, uri.toASCIIString()));
}
private void close(URI lraId, PropagatedHeaders headers) {
- awaitCoordinator(coordinatorClient.close(lraId, headers));
+ callCoordinator(() -> coordinatorClient.close(lraId, headers));
}
private void cancel(URI lraId, PropagatedHeaders headers) {
- awaitCoordinator(coordinatorClient.cancel(lraId, headers));
+ callCoordinator(() -> coordinatorClient.cancel(lraId, headers));
}
private void setHeaderPropagationContext(ContainerRequestContext reqCtx, PropagatedHeaders propagatedHeaders) {
@@ -216,10 +216,17 @@ private void setParentContext(ContainerRequestContext reqCtx, URI lraId) {
.ifPresent(context -> context.register(LRA_HTTP_PARENT_CONTEXT_HEADER, lraId));
}
- private T awaitCoordinator(Single single) {
+ private void callCoordinator(Runnable supplier) {
+ callCoordinator(() -> {
+ supplier.run();
+ return null;
+ });
+ }
+
+ private T callCoordinator(Supplier supplier) {
try {
// Connection timeout should be handled by client impl separately
- return single.await(coordinatorTimeout);
+ return supplier.get();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof CoordinatorConnectionException) {
diff --git a/microprofile/lra/jax-rs/src/test/java/io/helidon/microprofile/lra/LoadBalancedCoordinatorTest.java b/microprofile/lra/jax-rs/src/test/java/io/helidon/microprofile/lra/LoadBalancedCoordinatorTest.java
index 5ab6d9398fe..75b76ccc77f 100644
--- a/microprofile/lra/jax-rs/src/test/java/io/helidon/microprofile/lra/LoadBalancedCoordinatorTest.java
+++ b/microprofile/lra/jax-rs/src/test/java/io/helidon/microprofile/lra/LoadBalancedCoordinatorTest.java
@@ -436,7 +436,7 @@ void firstNotEnding(WebTarget target) throws Exception {
.get(TIMEOUT_SEC, TimeUnit.SECONDS);
assertThat(response.getStatus(), AnyOf.anyOf(is(200), is(204)));
URI lraId = await(DontEnd.CS_START_LRA);
- assertThat(coordinatorClient.status(lraId, PropagatedHeaders.noop()).await(TIMEOUT_SEC, TimeUnit.SECONDS), is(LRAStatus.Active));
+ assertThat(coordinatorClient.status(lraId, PropagatedHeaders.noop()), is(LRAStatus.Active));
assertThat(target.path(DontEnd.PATH_BASE)
.path(DontEnd.PATH_START_SECOND_LRA)
.request()
@@ -551,7 +551,7 @@ void statusNonRecoveryTest(WebTarget target) throws ExecutionException, Interrup
private void assertClosedOrNotFound(URI lraId) {
try {
- assertThat(coordinatorClient.status(lraId, PropagatedHeaders.noop()).await(TIMEOUT_SEC, TimeUnit.SECONDS), is(LRAStatus.Closed));
+ assertThat(coordinatorClient.status(lraId, PropagatedHeaders.noop()), is(LRAStatus.Closed));
} catch (NotFoundException e) {
// in case coordinator don't retain closed lra long enough
}
diff --git a/nima/webclient/webclient/src/main/java/io/helidon/nima/webclient/http1/ClientRequestImpl.java b/nima/webclient/webclient/src/main/java/io/helidon/nima/webclient/http1/ClientRequestImpl.java
index adfa07cd8ba..648270b9a3b 100644
--- a/nima/webclient/webclient/src/main/java/io/helidon/nima/webclient/http1/ClientRequestImpl.java
+++ b/nima/webclient/webclient/src/main/java/io/helidon/nima/webclient/http1/ClientRequestImpl.java
@@ -346,6 +346,7 @@ private ClientResponseImpl invokeServices(WebClientService.Chain callChain,
serviceResponse.headers(),
serviceResponse.connection(),
serviceResponse.reader(),
+ this.clientConfig().mediaContext(),
complete);
}
diff --git a/nima/webclient/webclient/src/main/java/io/helidon/nima/webclient/http1/ClientResponseImpl.java b/nima/webclient/webclient/src/main/java/io/helidon/nima/webclient/http1/ClientResponseImpl.java
index a4958883f74..4911e2c2d19 100644
--- a/nima/webclient/webclient/src/main/java/io/helidon/nima/webclient/http1/ClientResponseImpl.java
+++ b/nima/webclient/webclient/src/main/java/io/helidon/nima/webclient/http1/ClientResponseImpl.java
@@ -62,7 +62,7 @@ class ClientResponseImpl implements Http1ClientResponse {
private final DataReader reader;
// todo configurable
private final ContentEncodingContext encodingSupport = ContentEncodingContext.create();
- private final MediaContext mediaContext = MediaContext.create();
+ private final MediaContext mediaContext;
private final String channelId;
private final CompletableFuture whenComplete;
private final boolean hasTrailers;
@@ -78,6 +78,7 @@ class ClientResponseImpl implements Http1ClientResponse {
ClientResponseHeaders responseHeaders,
ClientConnection connection,
DataReader reader,
+ MediaContext mediaContext,
CompletableFuture whenComplete) {
this.responseStatus = responseStatus;
this.requestHeaders = requestHeaders;
@@ -86,6 +87,7 @@ class ClientResponseImpl implements Http1ClientResponse {
this.reader = reader;
this.channelId = connection.channelId();
this.whenComplete = whenComplete;
+ this.mediaContext = mediaContext;
if (responseHeaders.contains(Header.CONTENT_LENGTH)) {
this.entityLength = Long.parseLong(responseHeaders.get(Header.CONTENT_LENGTH).value());