Skip to content

Commit

Permalink
Cross Cluster Search: preserve remote status code (#30976)
Browse files Browse the repository at this point in the history
In case an error is returned when calling search_shards on a remote
cluster, which will lead to throwing an exception in the coordinating
 node, we should make sure that the status code returned by the
 coordinating node is the same as the one returned by the remote
 cluster. Up until now a 500 - Internal Server Error was always
 returned. This commit changes this behaviour so that for instance if an
 index is not found, which causes an 404, a 404 is also returned by the
 coordinating node to the client.

 Closes #27461
  • Loading branch information
javanna authored Jun 1, 2018
1 parent 31351ab commit 70749e0
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
"Search with missing remote index pattern":
- do:
catch: "request"
catch: "missing"
search:
index: "my_remote_cluster:foo"

Expand Down Expand Up @@ -34,7 +34,7 @@
- match: { aggregations.cluster.buckets.0.doc_count: 6 }

- do:
catch: "request"
catch: "missing"
search:
index: "my_remote_cluster:test_index,my_remote_cluster:foo"
body:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void collectSearchShards(IndicesOptions indicesOptions, String preference
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
final AtomicReference<TransportException> transportException = new AtomicReference<>();
final AtomicReference<RemoteTransportException> transportException = new AtomicReference<>();
for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
final String clusterName = entry.getKey();
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName);
Expand All @@ -232,7 +232,7 @@ public void collectSearchShards(IndicesOptions indicesOptions, String preference
public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
if (responsesCountDown.countDown()) {
TransportException exception = transportException.get();
RemoteTransportException exception = transportException.get();
if (exception == null) {
listener.onResponse(searchShardsResponses);
} else {
Expand All @@ -243,8 +243,8 @@ public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse)

@Override
public void onFailure(Exception e) {
TransportException exception = new TransportException("unable to communicate with remote cluster [" +
clusterName + "]", e);
RemoteTransportException exception = new RemoteTransportException("error while communicating with remote cluster ["
+ clusterName + "]", e);
if (transportException.compareAndSet(null, exception) == false) {
exception = transportException.accumulateAndGet(exception, (previous, current) -> {
current.addSuppressed(previous);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,9 @@
package org.elasticsearch.transport;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
Expand All @@ -42,17 +36,16 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
Expand Down Expand Up @@ -121,8 +114,12 @@ public static MockTransportService startTransport(
try {
newService.registerRequestHandler(ClusterSearchShardsAction.NAME,ThreadPool.Names.SAME, ClusterSearchShardsRequest::new,
(request, channel) -> {
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
if ("index_not_found".equals(request.preference())) {
channel.sendResponse(new IndexNotFoundException("index"));
} else {
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
}
});
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
(request, channel) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -469,7 +470,6 @@ public void onFailure(Exception e) {
assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage());
}
{

logger.info("closing all source nodes");
// close all targets and check for the transport level failure path
IOUtils.close(c1N1, c1N2, c2N1, c2N2);
Expand Down Expand Up @@ -559,7 +559,20 @@ public void testCollectSearchShards() throws Exception {
assertEquals(1, shardsResponse.getNodes().length);
}
}

{
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
AtomicReference<Exception> failure = new AtomicReference<>();
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found",
null, remoteIndicesByCluster,
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertNull(response.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get();
assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status());
}
int numDisconnectedClusters = randomIntBetween(1, numClusters);
Set<DiscoveryNode> disconnectedNodes = new HashSet<>(numDisconnectedClusters);
Set<Integer> disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters);
Expand Down Expand Up @@ -593,8 +606,9 @@ public void onNodeDisconnected(DiscoveryNode node) {
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertNull(response.get());
assertNotNull(failure.get());
assertThat(failure.get(), instanceOf(TransportException.class));
assertThat(failure.get().getMessage(), containsString("unable to communicate with remote cluster"));
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster ["));
assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class));
}

//setting skip_unavailable to true for all the disconnected clusters will make the request succeed again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ teardown:
- match: { hits.total: 0 }

- do:
catch: "request"
catch: "forbidden"
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
search:
index: "*:foo-bar"

- do:
catch: "request"
catch: "forbidden"
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
search:
index: "my_remote_cluster:foo-bar"

0 comments on commit 70749e0

Please sign in to comment.