diff --git a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java index 2269b8e13b..62578b6f8a 100644 --- a/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java +++ b/java-client/src/main/java/org/opensearch/client/transport/aws/AwsSdk2Transport.java @@ -17,6 +17,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLEncoder; +import java.util.AbstractMap; import java.util.Collection; import java.util.Map; import java.util.Objects; @@ -24,10 +25,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import javax.annotation.Nullable; + import org.opensearch.client.json.JsonpDeserializer; import org.opensearch.client.json.JsonpMapper; import org.opensearch.client.json.jackson.JacksonJsonpMapper; @@ -35,6 +38,7 @@ import org.opensearch.client.opensearch._types.ErrorResponse; import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.transport.Endpoint; +import org.opensearch.client.transport.GenericEndpoint; import org.opensearch.client.transport.JsonEndpoint; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.TransportException; @@ -47,6 +51,7 @@ import software.amazon.awssdk.auth.signer.Aws4Signer; import software.amazon.awssdk.auth.signer.params.Aws4SignerParams; import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.http.Header; import software.amazon.awssdk.http.HttpExecuteRequest; import software.amazon.awssdk.http.HttpExecuteResponse; import software.amazon.awssdk.http.SdkHttpClient; @@ -393,7 +398,15 @@ private ResponseT executeSync( try { bodyStream = executeResponse.responseBody().orElse(null); SdkHttpResponse httpResponse = executeResponse.httpResponse(); - return parseResponse(httpResponse, bodyStream, endpoint, options); + return parseResponse( + httpRequest.getUri(), + httpRequest.method(), + httpRequest.protocol(), + httpResponse, + bodyStream, + endpoint, + options + ); } finally { if (bodyStream != null) { bodyStream.close(); @@ -421,7 +434,17 @@ private CompletableFuture executeAsync( CompletableFuture ret = new CompletableFuture<>(); try { InputStream bodyStream = new ByteArrayInputStream(responseBody); - ret.complete(parseResponse(response, bodyStream, endpoint, options)); + ret.complete( + parseResponse( + httpRequest.getUri(), + httpRequest.method(), + httpRequest.protocol(), + response, + bodyStream, + endpoint, + options + ) + ); } catch (Throwable e) { ret.completeExceptionally(e); } @@ -430,6 +453,9 @@ private CompletableFuture executeAsync( } private ResponseT parseResponse( + URI uri, + @Nonnull SdkHttpMethod method, + String protocol, @Nonnull SdkHttpResponse httpResponse, @CheckForNull InputStream bodyStream, @Nonnull Endpoint endpoint, @@ -523,6 +549,31 @@ private ResponseT parseResponse( ; } return response; + } else if (endpoint instanceof GenericEndpoint) { + @SuppressWarnings("unchecked") + final GenericEndpoint rawEndpoint = (GenericEndpoint) endpoint; + + String contentType = null; + if (bodyStream != null) { + contentType = httpResponse + .firstMatchingHeader(Header.CONTENT_TYPE) + .orElse(null); + } + + return rawEndpoint.responseDeserializer( + uri.toString(), + method.name(), + protocol, + httpResponse.statusCode(), + httpResponse.statusText().orElse(null), + httpResponse.headers() + .entrySet() + .stream() + .map(h -> new AbstractMap.SimpleEntry(h.getKey(), Objects.toString(h.getValue()))) + .collect(Collectors.toList()), + contentType, + bodyStream + ); } else { throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'"); }