Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [BUG] Generic HTTP Actions in Java Client does not work with AwsSdk2Transport #984

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

### Fixed
- Fix the deserialization of SortOptions ([#981](https://github.com/opensearch-project/opensearch-java/pull/981))
- Generic HTTP Actions in Java Client does not work with AwsSdk2Transport ([#978](https://github.com/opensearch-project/opensearch-java/pull/978))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,27 +95,17 @@ public GenericResponse responseDeserializer(
@Nullable String contentType,
@Nullable InputStream body
) {
if (isError(status)) {
// Fully consume the response body since the it will be propagated as an exception with possible no chance to be closed
try (Body b = Body.from(body, contentType)) {
if (b != null) {
return new GenericResponse(
uri,
protocol,
method,
status,
reason,
headers,
Body.from(b.bodyAsBytes(), b.contentType())
);
} else {
return new GenericResponse(uri, protocol, method, status, reason, headers);
}
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
try (Body b = Body.from(body, contentType)) {
if (b != null) {
// Fully consume the response body:
// - if it will be propagated as an exception with possible no chance to be closed
// - the entity stream will be consumed and become unavailable
return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(b.bodyAsBytes(), b.contentType()));
} else {
return new GenericResponse(uri, protocol, method, status, reason, headers);
}
} else {
return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(body, contentType));
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
Expand All @@ -35,6 +37,7 @@
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.GenericEndpoint;
import org.opensearch.client.transport.JsonEndpoint;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.TransportException;
Expand All @@ -47,6 +50,7 @@
import software.amazon.awssdk.auth.signer.Aws4Signer;
import software.amazon.awssdk.auth.signer.params.Aws4SignerParams;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.Header;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
import software.amazon.awssdk.http.SdkHttpClient;
Expand Down Expand Up @@ -393,7 +397,15 @@ private <ResponseT> ResponseT executeSync(
try {
bodyStream = executeResponse.responseBody().orElse(null);
SdkHttpResponse httpResponse = executeResponse.httpResponse();
return parseResponse(httpResponse, bodyStream, endpoint, options);
return parseResponse(
httpRequest.getUri(),
httpRequest.method(),
httpRequest.protocol(),
httpResponse,
bodyStream,
endpoint,
options
);
} finally {
if (bodyStream != null) {
bodyStream.close();
Expand Down Expand Up @@ -421,7 +433,17 @@ private <ResponseT> CompletableFuture<ResponseT> executeAsync(
CompletableFuture<ResponseT> ret = new CompletableFuture<>();
try {
InputStream bodyStream = new ByteArrayInputStream(responseBody);
ret.complete(parseResponse(response, bodyStream, endpoint, options));
ret.complete(
parseResponse(
httpRequest.getUri(),
httpRequest.method(),
httpRequest.protocol(),
response,
bodyStream,
endpoint,
options
)
);
} catch (Throwable e) {
ret.completeExceptionally(e);
}
Expand All @@ -430,6 +452,9 @@ private <ResponseT> CompletableFuture<ResponseT> executeAsync(
}

private <ResponseT, ErrorT> ResponseT parseResponse(
URI uri,
@Nonnull SdkHttpMethod method,
String protocol,
@Nonnull SdkHttpResponse httpResponse,
@CheckForNull InputStream bodyStream,
@Nonnull Endpoint<?, ResponseT, ErrorT> endpoint,
Expand Down Expand Up @@ -478,24 +503,51 @@ private <ResponseT, ErrorT> ResponseT parseResponse(
}

if (endpoint.isError(statusCode)) {
JsonpDeserializer<ErrorT> errorDeserializer = endpoint.errorDeserializer(statusCode);
if (errorDeserializer == null || bodyStream == null) {
throw new TransportException("Request failed with status code '" + statusCode + "'");
}
try {
try (JsonParser parser = mapper.jsonProvider().createParser(bodyStream)) {
ErrorT error = errorDeserializer.deserialize(parser, mapper);
throw new OpenSearchException((ErrorResponse) error);
if (endpoint instanceof GenericEndpoint) {
@SuppressWarnings("unchecked")
final GenericEndpoint<?, ResponseT> rawEndpoint = (GenericEndpoint<?, ResponseT>) endpoint;

String contentType = null;
if (bodyStream != null) {
contentType = httpResponse.firstMatchingHeader(Header.CONTENT_TYPE).orElse(null);
}

final ResponseT error = rawEndpoint.responseDeserializer(
uri.toString(),
method.name(),
protocol,
httpResponse.statusCode(),
httpResponse.statusText().orElse(null),
httpResponse.headers()
.entrySet()
.stream()
.map(h -> new AbstractMap.SimpleEntry<String, String>(h.getKey(), Objects.toString(h.getValue())))
.collect(Collectors.toList()),
contentType,
bodyStream
);

throw rawEndpoint.exceptionConverter(statusCode, error);
} else {
JsonpDeserializer<ErrorT> errorDeserializer = endpoint.errorDeserializer(statusCode);
if (errorDeserializer == null || bodyStream == null) {
throw new TransportException("Request failed with status code '" + statusCode + "'");
}
try {
try (JsonParser parser = mapper.jsonProvider().createParser(bodyStream)) {
ErrorT error = errorDeserializer.deserialize(parser, mapper);
throw new OpenSearchException((ErrorResponse) error);
}
} catch (OpenSearchException e) {
throw e;
} catch (Exception e) {
// can't parse the error - use a general exception
ErrorCause.Builder cause = new ErrorCause.Builder();
cause.type("http_exception");
cause.reason("server returned " + statusCode);
ErrorResponse error = ErrorResponse.of(err -> err.status(statusCode).error(cause.build()));
throw new OpenSearchException(error);
}
} catch (OpenSearchException e) {
throw e;
} catch (Exception e) {
// can't parse the error - use a general exception
ErrorCause.Builder cause = new ErrorCause.Builder();
cause.type("http_exception");
cause.reason("server returned " + statusCode);
ErrorResponse error = ErrorResponse.of(err -> err.status(statusCode).error(cause.build()));
throw new OpenSearchException(error);
}
} else {
if (endpoint instanceof BooleanEndpoint) {
Expand Down Expand Up @@ -523,6 +575,29 @@ private <ResponseT, ErrorT> ResponseT parseResponse(
;
}
return response;
} else if (endpoint instanceof GenericEndpoint) {
@SuppressWarnings("unchecked")
final GenericEndpoint<?, ResponseT> rawEndpoint = (GenericEndpoint<?, ResponseT>) endpoint;

String contentType = null;
if (bodyStream != null) {
contentType = httpResponse.firstMatchingHeader(Header.CONTENT_TYPE).orElse(null);
}

return rawEndpoint.responseDeserializer(
uri.toString(),
method.name(),
protocol,
httpResponse.statusCode(),
httpResponse.statusText().orElse(null),
httpResponse.headers()
.entrySet()
.stream()
.map(h -> new AbstractMap.SimpleEntry<String, String>(h.getKey(), Objects.toString(h.getValue())))
.collect(Collectors.toList()),
contentType,
bodyStream
);
} else {
throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ private <ResponseT> ResponseT decodeResponse(
InputStream content = null;
if (entity != null) {
// We may have to replay it.
entity = new BufferedHttpEntity(entity);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (entity.getContentType() != null) {
contentType = entity.getContentType().getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import java.util.zip.GZIPOutputStream;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import org.apache.hc.core5.http.ContentType;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.NdJsonpSerializable;
import org.opensearch.client.transport.GenericSerializable;
import org.opensearch.client.transport.OpenSearchTransport;

/**
Expand Down Expand Up @@ -71,6 +73,11 @@ public void addContent(Object content) throws IOException {
if (content instanceof NdJsonpSerializable) {
isMulti = true;
addNdJson(((NdJsonpSerializable) content));
} else if (content instanceof GenericSerializable) {
ContentType.parse(((GenericSerializable) content).serialize(captureBuffer));
if (isMulti) {
captureBuffer.write((byte) '\n');
}
} else {
mapper.serialize(content, jsonGenerator);
jsonGenerator.flush();
Expand Down
Loading