Skip to content

Commit

Permalink
Refactor two-client usage.
Browse files Browse the repository at this point in the history
Signed-off-by: dblock <dblock@amazon.com>
  • Loading branch information
dblock committed Jan 17, 2023
1 parent 37d4800 commit a6d4617
Showing 1 changed file with 15 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.utils.SdkAutoCloseable;

import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -70,8 +71,7 @@ public class AwsSdk2Transport implements OpenSearchTransport {
public static final Integer DEFAULT_REQUEST_COMPRESSION_SIZE = 8192;

private static final byte[] NO_BYTES = new byte[0];
private final SdkHttpClient httpClient;
private final SdkAsyncHttpClient asyncHttpClient;
private final SdkAutoCloseable httpClient;
private final String host;
private final Region signingRegion;
private final JsonpMapper defaultMapper;
Expand All @@ -92,33 +92,11 @@ public class AwsSdk2Transport implements OpenSearchTransport {
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkHttpClient httpClient,
@Nonnull SdkAutoCloseable httpClient,
@Nonnull String host,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(httpClient, null, host, signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with an ASYNCHRONOUS AWS Http client
* <p>
* Note that synchronous OpenSearch requests sent through this transport will be dispatched
* using the asynchronous client, but the calling thread will block until they are complete.
*
* @param asyncHttpClient HTTP client to use for OpenSearch requests
* @param host The target host
* @param signingRegion The AWS region for which requests will be signed. This should typically match
* the region in `host`.
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkAsyncHttpClient asyncHttpClient,
@Nonnull String host,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(null, asyncHttpClient, host, signingRegion, options);
this(httpClient, host, "es", signingRegion, options);
}

/**
Expand All @@ -127,28 +105,22 @@ public AwsSdk2Transport(
* The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client
* will be used for asynchronous HTTP requests.
*
* @param httpClient HTTP client to use for OpenSearch requests
* @param asyncHttpClient HTTP client to use for synchronous OpenSearch requests
* @param host The fully qualified domain name to connect to
* @param signingRegion The AWS region for which requests will be signed. This should typically match
* the region in `host`.
* @param options Options that apply to all requests. Can be null. Create with
* @param httpClient HTTP client to use for OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@CheckForNull SdkHttpClient httpClient,
@CheckForNull SdkAsyncHttpClient asyncHttpClient,
@CheckForNull SdkAutoCloseable httpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
if (httpClient == null && asyncHttpClient == null)
{
throw new IllegalArgumentException("At least one SdkHttpClient or SdkAsyncHttpClient must be provided");
}
Objects.requireNonNull(host, "Target OpenSearch service host must not be null");
this.httpClient = httpClient;
this.asyncHttpClient = asyncHttpClient;
this.host = host;
this.signingRegion = signingRegion;
this.transportOptions = options != null ? options : AwsSdk2TransportOptions.builder().build();
Expand Down Expand Up @@ -199,7 +171,7 @@ public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequest
try {
OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options);
SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody);
if (asyncHttpClient != null) {
if (this.httpClient instanceof SdkAsyncHttpClient) {
return executeAsync(clientReq, requestBody, endpoint, options);
} else {
ResponseT result = executeSync(clientReq, endpoint, options);
Expand Down Expand Up @@ -357,7 +329,8 @@ private <ResponseT> ResponseT executeSync(
if (httpRequest.contentStreamProvider().isPresent()) {
executeRequest.contentStreamProvider(httpRequest.contentStreamProvider().get());
}
HttpExecuteResponse executeResponse = httpClient.prepareRequest(executeRequest.build()).call();
SdkHttpClient syncHttpClient = (SdkHttpClient) httpClient;
HttpExecuteResponse executeResponse = syncHttpClient.prepareRequest(executeRequest.build()).call();
AbortableInputStream bodyStream = null;
try {
bodyStream = executeResponse.responseBody().orElse(null);
Expand All @@ -383,6 +356,7 @@ private <ResponseT> CompletableFuture<ResponseT> executeAsync(
.request(httpRequest)
.requestContentPublisher(new AsyncByteArrayContentPublisher(requestBodyArray))
.responseHandler(responseHandler);
SdkAsyncHttpClient asyncHttpClient = (SdkAsyncHttpClient) httpClient;
CompletableFuture<Void> executeFuture = asyncHttpClient.execute(executeRequest.build());
return executeFuture
.thenCompose(_v -> responseHandler.getHeaderPromise())
Expand Down

0 comments on commit a6d4617

Please sign in to comment.