Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross Cluster Search: preserve remote status code #30976

Merged
merged 4 commits into from
Jun 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
"Search with missing remote index pattern":
- do:
catch: "request"
catch: "missing"
search:
index: "my_remote_cluster:foo"

Expand Down Expand Up @@ -34,7 +34,7 @@
- match: { aggregations.cluster.buckets.0.doc_count: 6 }

- do:
catch: "request"
catch: "missing"
search:
index: "my_remote_cluster:test_index,my_remote_cluster:foo"
body:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void collectSearchShards(IndicesOptions indicesOptions, String preference
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
final AtomicReference<TransportException> transportException = new AtomicReference<>();
final AtomicReference<RemoteTransportException> transportException = new AtomicReference<>();
Copy link
Member Author

@javanna javanna May 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoteTransportException implements ElasticsearchWrapperException, then unwrapCause will unwrap the inner cause and status() will return the inner status see https://github.com/elastic/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/ElasticsearchException.java#L235

This behaviour is tricky. I think how ElasticsearchException#status works may cause problems when for instance it is thrown in the high-level REST client, I will have to dig deeper on that and probably open a follow-up PR, but I think this change is enough for cross-cluster search.

for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
final String clusterName = entry.getKey();
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName);
Expand All @@ -232,7 +232,7 @@ public void collectSearchShards(IndicesOptions indicesOptions, String preference
public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
if (responsesCountDown.countDown()) {
TransportException exception = transportException.get();
RemoteTransportException exception = transportException.get();
if (exception == null) {
listener.onResponse(searchShardsResponses);
} else {
Expand All @@ -243,8 +243,8 @@ public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse)

@Override
public void onFailure(Exception e) {
TransportException exception = new TransportException("unable to communicate with remote cluster [" +
clusterName + "]", e);
RemoteTransportException exception = new RemoteTransportException("error while communicating with remote cluster ["
+ clusterName + "]", e);
if (transportException.compareAndSet(null, exception) == false) {
exception = transportException.accumulateAndGet(exception, (previous, current) -> {
current.addSuppressed(previous);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,9 @@
package org.elasticsearch.transport;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
Expand All @@ -42,17 +36,16 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
Expand Down Expand Up @@ -121,8 +114,12 @@ public static MockTransportService startTransport(
try {
newService.registerRequestHandler(ClusterSearchShardsAction.NAME,ThreadPool.Names.SAME, ClusterSearchShardsRequest::new,
(request, channel) -> {
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
if ("index_not_found".equals(request.preference())) {
channel.sendResponse(new IndexNotFoundException("index"));
} else {
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
}
});
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
(request, channel) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -469,7 +470,6 @@ public void onFailure(Exception e) {
assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage());
}
{

logger.info("closing all source nodes");
// close all targets and check for the transport level failure path
IOUtils.close(c1N1, c1N2, c2N1, c2N2);
Expand Down Expand Up @@ -559,7 +559,20 @@ public void testCollectSearchShards() throws Exception {
assertEquals(1, shardsResponse.getNodes().length);
}
}

{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicReference<Exception> failure = new AtomicReference<>();
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found",
null, remoteIndicesByCluster,
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertNull(response.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get();
assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status());
}
int numDisconnectedClusters = randomIntBetween(1, numClusters);
Set<DiscoveryNode> disconnectedNodes = new HashSet<>(numDisconnectedClusters);
Set<Integer> disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters);
Expand Down Expand Up @@ -593,8 +606,9 @@ public void onNodeDisconnected(DiscoveryNode node) {
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertNull(response.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(TransportException.class));
assertThat(failure.get().getMessage(), containsString("unable to communicate with remote cluster"));
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster ["));
assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class));
}

//setting skip_unavailable to true for all the disconnected clusters will make the request succeed again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ teardown:
- match: { hits.total: 0 }

- do:
catch: "request"
catch: "forbidden"
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
search:
index: "*:foo-bar"

- do:
catch: "request"
catch: "forbidden"
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
search:
index: "my_remote_cluster:foo-bar"