Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.6] Reduce memory required for search responses when many shards are unavailable (#91365) #92907

Merged
merged 1 commit into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/91365.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 91365
summary: Reduce memory required for search responses when many shards are unavailable
area: Search
type: bug
issues:
- 90622
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -67,6 +68,9 @@ public void testClusterAllowPartialsWithRedState() throws Exception {
assertThat("Expected total shards", searchResponse.getTotalShards(), equalTo(numShards));
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
assertThat(failure.getCause(), instanceOf(NoShardAvailableActionException.class));
assertThat(failure.getCause().getStackTrace(), emptyArray());
// We don't write out the entire, repetitive stacktrace in the reason
assertThat(failure.reason(), equalTo("org.elasticsearch.action.NoShardAvailableActionException\n"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,36 @@
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.io.PrintWriter;

public class NoShardAvailableActionException extends ElasticsearchException {

private static final StackTraceElement[] EMPTY_STACK_TRACE = new StackTraceElement[0];

// This is set so that no StackTrace is serialized in the scenario when we wrap other shard failures.
// It isn't necessary to serialize this field over the wire as the empty stack trace is serialized instead.
private final boolean onShardFailureWrapper;

public static NoShardAvailableActionException forOnShardFailureWrapper(String msg) {
return new NoShardAvailableActionException(null, msg, null, true);
}

public NoShardAvailableActionException(ShardId shardId) {
this(shardId, null);
this(shardId, null, null, false);
}

public NoShardAvailableActionException(ShardId shardId, String msg) {
this(shardId, msg, null);
this(shardId, msg, null, false);
}

public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause) {
this(shardId, msg, cause, false);
}

private NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause, boolean onShardFailureWrapper) {
super(msg, cause);
setShard(shardId);
this.onShardFailureWrapper = onShardFailureWrapper;
}

@Override
Expand All @@ -37,5 +53,22 @@ public RestStatus status() {

public NoShardAvailableActionException(StreamInput in) throws IOException {
super(in);
onShardFailureWrapper = false;
}

@Override
public StackTraceElement[] getStackTrace() {
return onShardFailureWrapper ? EMPTY_STACK_TRACE : super.getStackTrace();
}

@Override
public void printStackTrace(PrintWriter s) {
if (onShardFailureWrapper == false) {
super.printStackTrace(s);
} else {
// Override to simply print the first line of the trace, which is the current exception.
// Since we aren't serializing the repetitive stacktrace onShardFailureWrapper, we shouldn't print it out either
s.println(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ public final void onShardFailure(final int shardIndex, SearchShardTarget shardTa
if (TransportActions.isShardNotAvailableException(e)) {
// Groups shard not available exceptions under a generic exception that returns a SERVICE_UNAVAILABLE(503)
// temporary error.
e = new NoShardAvailableActionException(shardTarget.getShardId(), e.getMessage());
e = NoShardAvailableActionException.forOnShardFailureWrapper(e.getMessage());
}
// we don't aggregate shard on failures due to the internal cancellation,
// but do keep the header counts right
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public ShardSearchFailure(Exception e, @Nullable SearchShardTarget shardTarget)

/**
* The search shard target the failure occurred on.
* @return The shardTarget, may be null
*/
@Nullable
public SearchShardTarget shard() {
Expand All @@ -95,7 +96,6 @@ public String toString() {

public static ShardSearchFailure readShardSearchFailure(StreamInput in) throws IOException {
return new ShardSearchFailure(in);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.search;

import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -28,17 +29,17 @@

public class ShardSearchFailureTests extends ESTestCase {

private static SearchShardTarget randomShardTarget(String indexUuid) {
String nodeId = randomAlphaOfLengthBetween(5, 10);
String indexName = randomAlphaOfLengthBetween(5, 10);
String clusterAlias = randomBoolean() ? randomAlphaOfLengthBetween(5, 10) : null;
return new SearchShardTarget(nodeId, new ShardId(new Index(indexName, indexUuid), randomInt()), clusterAlias);
}

public static ShardSearchFailure createTestItem(String indexUuid) {
String randomMessage = randomAlphaOfLengthBetween(3, 20);
Exception ex = new ParsingException(0, 0, randomMessage, new IllegalArgumentException("some bad argument"));
SearchShardTarget searchShardTarget = null;
if (randomBoolean()) {
String nodeId = randomAlphaOfLengthBetween(5, 10);
String indexName = randomAlphaOfLengthBetween(5, 10);
String clusterAlias = randomBoolean() ? randomAlphaOfLengthBetween(5, 10) : null;
searchShardTarget = new SearchShardTarget(nodeId, new ShardId(new Index(indexName, indexUuid), randomInt()), clusterAlias);
}
return new ShardSearchFailure(ex, searchShardTarget);
return new ShardSearchFailure(ex, randomBoolean() ? randomShardTarget(indexUuid) : null);
}

public void testFromXContent() throws IOException {
Expand Down Expand Up @@ -110,6 +111,22 @@ public void testToXContent() throws IOException {
}"""), xContent.utf8ToString());
}

public void testToXContentForNoShardAvailable() throws IOException {
ShardId shardId = new ShardId(new Index("indexName", "indexUuid"), 123);
ShardSearchFailure failure = new ShardSearchFailure(
NoShardAvailableActionException.forOnShardFailureWrapper("shard unassigned"),
new SearchShardTarget("nodeId", shardId, null)
);
BytesReference xContent = toXContent(failure, XContentType.JSON, randomBoolean());
assertEquals(XContentHelper.stripWhitespace("""
{
"shard": 123,
"index": "indexName",
"node": "nodeId",
"reason":{"type":"no_shard_available_action_exception","reason":"shard unassigned"}
}"""), xContent.utf8ToString());
}

public void testToXContentWithClusterAlias() throws IOException {
ShardSearchFailure failure = new ShardSearchFailure(
new ParsingException(0, 0, "some message", null),
Expand All @@ -131,17 +148,25 @@ public void testToXContentWithClusterAlias() throws IOException {
}

public void testSerialization() throws IOException {
ShardSearchFailure testItem = createTestItem(randomAlphaOfLength(12));
ShardSearchFailure deserializedInstance = copyWriteable(
testItem,
writableRegistry(),
ShardSearchFailure::new,
VersionUtils.randomVersion(random())
);
assertEquals(testItem.index(), deserializedInstance.index());
assertEquals(testItem.shard(), deserializedInstance.shard());
assertEquals(testItem.shardId(), deserializedInstance.shardId());
assertEquals(testItem.reason(), deserializedInstance.reason());
assertEquals(testItem.status(), deserializedInstance.status());
for (int runs = 0; runs < 25; runs++) {
final ShardSearchFailure testItem;
if (randomBoolean()) {
testItem = createTestItem(randomAlphaOfLength(12));
} else {
SearchShardTarget target = randomShardTarget(randomAlphaOfLength(12));
testItem = new ShardSearchFailure(NoShardAvailableActionException.forOnShardFailureWrapper("unavailable"), target);
}
ShardSearchFailure deserializedInstance = copyWriteable(
testItem,
writableRegistry(),
ShardSearchFailure::new,
VersionUtils.randomVersion(random())
);
assertEquals(testItem.index(), deserializedInstance.index());
assertEquals(testItem.shard(), deserializedInstance.shard());
assertEquals(testItem.shardId(), deserializedInstance.shardId());
assertEquals(testItem.reason(), deserializedInstance.reason());
assertEquals(testItem.status(), deserializedInstance.status());
}
}
}