Skip to content

Commit

Permalink
Refactor constructors to take sync/async explicitly.
Browse files Browse the repository at this point in the history
Signed-off-by: dblock <dblock@amazon.com>
  • Loading branch information
dblock committed Jan 18, 2023
1 parent 27bffc9 commit 170d383
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ public class AwsSdk2Transport implements OpenSearchTransport {
private final JsonpMapper defaultMapper;
private final AwsSdk2TransportOptions transportOptions;

/**
* Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client.
* <p>
* Note that asynchronous OpenSearch requests sent through this transport will be dispatched
* *synchronously* on the calling thread.
*
* @param httpClient HTTP client to use for OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@CheckForNull SdkAsyncHttpClient httpClient,
@Nonnull String host,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(httpClient, host, "es", signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client.
* <p>
Expand All @@ -92,15 +113,15 @@ public class AwsSdk2Transport implements OpenSearchTransport {
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkAutoCloseable httpClient,
@CheckForNull SdkHttpClient httpClient,
@Nonnull String host,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(httpClient, host, "es", signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with both synchronous and asynchronous AWS HTTP clients.
* Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP clients.
* <p>
* The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client
* will be used for asynchronous HTTP requests.
Expand All @@ -114,6 +135,38 @@ public AwsSdk2Transport(
* compression options, etc.
*/
public AwsSdk2Transport(
@CheckForNull SdkAsyncHttpClient httpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this((SdkAutoCloseable) httpClient, host, signingServiceName, signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with a synchronous AWS HTTP clients.
* <p>
* The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client
* will be used for asynchronous HTTP requests.
*
* @param httpClient HTTP client to use for OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@CheckForNull SdkHttpClient httpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this((SdkAutoCloseable) httpClient, host, signingServiceName, signingRegion, options);
}

private AwsSdk2Transport(
@CheckForNull SdkAutoCloseable httpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@

package org.opensearch.client.opensearch.integTest.aws;


import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.opensearch.OpenSearchClient;
Expand Down Expand Up @@ -42,7 +43,6 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;


public abstract class AwsSdk2TransportTestCase {
public static final String TEST_INDEX = "opensearch-java-integtest";

Expand Down Expand Up @@ -85,16 +85,14 @@ protected OpenSearchClient getClient(
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
} else {
transport = new AwsSdk2Transport(
getHttpClient(),
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
}
return new OpenSearchClient(transport);
}
Expand All @@ -111,16 +109,14 @@ protected OpenSearchAsyncClient getAsyncClient(
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
} else {
transport = new AwsSdk2Transport(
getHttpClient(),
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
}
return new OpenSearchAsyncClient(transport);
}
Expand All @@ -137,16 +133,14 @@ protected OpenSearchIndicesClient getIndexesClient(
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
} else {
transport = new AwsSdk2Transport(
getHttpClient(),
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
}
return new OpenSearchIndicesClient(transport);
}
Expand Down Expand Up @@ -207,8 +201,7 @@ public void resetTestIndex(boolean async) throws Exception {
if (indexInfo != null) {
indexExists = true;
}
} catch (
OpenSearchException e) {
} catch (OpenSearchException e) {
if (e.status() != 404) {
throw e;
}
Expand Down Expand Up @@ -238,17 +231,14 @@ protected SearchResponse<SimplePojo> query(OpenSearchClient client, String title
.ignoreThrottled(false)
.sort(
new SortOptions.Builder().score(o -> o.order(SortOrder.Desc)).build(),
new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build()
)
new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build())
.query(query);


return client.search(req.build(), SimplePojo.class);
}

protected CompletableFuture<SearchResponse<SimplePojo>> query(
OpenSearchAsyncClient client, String title, String text
) {
OpenSearchAsyncClient client, String title, String text) {
var query = Query.of(qb -> {
if (title != null) {
qb.match(mb -> mb.field("title").query(vb -> vb.stringValue(title)));
Expand All @@ -265,8 +255,7 @@ protected CompletableFuture<SearchResponse<SimplePojo>> query(
.ignoreThrottled(false)
.sort(
new SortOptions.Builder().score(o -> o.order(SortOrder.Desc)).build(),
new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build()
)
new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build())
.query(query);

try {
Expand Down

0 comments on commit 170d383

Please sign in to comment.