Skip to content

Commit

Permalink
Do not fail replica shard due to primary closure
Browse files Browse the repository at this point in the history
This commit prevents a replica shard from being failed in the case that
a replication action to a replica is canceled due to the primary shard
being closed.

Signed-off-by: Andrew Ross <andrross@amazon.com>
  • Loading branch information
andrross committed Aug 24, 2022
1 parent 5dd7947 commit 692e140
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Fixed
- `opensearch-service.bat start` and `opensearch-service.bat manager` failing to run ([#4289](https://github.com/opensearch-project/OpenSearch/pull/4289))
- PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296))
- Do not fail replica shard due to primary closure ([#4133](https://github.com/opensearch-project/OpenSearch/pull/4133))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;

import org.hamcrest.MatcherAssert;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand All @@ -48,6 +49,7 @@
import org.opensearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.health.ClusterHealthStatus;
Expand Down Expand Up @@ -108,6 +110,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
import static org.opensearch.common.util.CollectionUtils.iterableAsArrayList;
Expand Down Expand Up @@ -698,6 +701,104 @@ public void testReplicaCorruption() throws Exception {
ensureGreen(TimeValue.timeValueSeconds(60));
}

public void testPrimaryCorruptionDuringReplicationDoesNotFailReplicaShard() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
final NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
final List<NodeStats> dataNodeStats = nodeStats.getNodes()
.stream()
.filter(stat -> stat.getNode().isDataNode())
.collect(Collectors.toUnmodifiableList());
MatcherAssert.assertThat(dataNodeStats.size(), greaterThanOrEqualTo(2));

final NodeStats primaryNode = dataNodeStats.get(0);
final NodeStats replicaNode = dataNodeStats.get(1);
assertAcked(
prepareCreate("test").setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put("index.routing.allocation.include._name", primaryNode.getNode().getName())
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)
.put("index.allocation.max_retries", Integer.MAX_VALUE) // keep on retrying

)
);
ensureGreen();

// Add custom send behavior between primary and replica that will
// count down a latch to indicate that a replication operation is
// currently in flight, and then block on a second latch that will
// be released once the primary shard has been corrupted.
final CountDownLatch indexingInFlight = new CountDownLatch(1);
final CountDownLatch corruptionHasHappened = new CountDownLatch(1);
final MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode.getNode().getName()
));
mockTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
if (request instanceof TransportReplicationAction.ConcreteShardRequest) {
indexingInFlight.countDown();
try {
corruptionHasHappened.await();
} catch (InterruptedException e) {
logger.info("Interrupted while waiting for corruption");
}
}
connection.sendRequest(requestId, action, request, options);
}
);

// Configure the modified data node as a replica
final Settings build = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1")
.put("index.routing.allocation.include._name", primaryNode.getNode().getName() + "," + replicaNode.getNode().getName())
.build();
client().admin().indices().prepareUpdateSettings("test").setSettings(build).get();
client().admin().cluster().prepareReroute().get();
ensureGreen();

// Create a snapshot repository. This repo is used to take a snapshot after
// corrupting a file, which causes the node to notice the corrupt data and
// close the shard.
assertAcked(
client().admin()
.cluster()
.preparePutRepository("test-repo")
.setType("fs")
.setSettings(
Settings.builder()
.put("location", randomRepoPath().toAbsolutePath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
)
);

client().prepareIndex("test").setSource("field", "value").execute();
indexingInFlight.await();

// Corrupt a file on the primary then take a snapshot. Snapshot should
// finish in the PARTIAL state since the corrupted file will cause a checksum
// validation failure.
final ShardRouting corruptedShardRouting = corruptRandomPrimaryFile();
logger.info("--> {} corrupted", corruptedShardRouting);
final CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices("test")
.get();
final SnapshotState snapshotState = createSnapshotResponse.getSnapshotInfo().state();
MatcherAssert.assertThat("Expect file corruption to cause PARTIAL snapshot state", snapshotState, equalTo(SnapshotState.PARTIAL));

// Unblock the blocked indexing thread now that corruption on the primary has been confirmed
corruptionHasHappened.countDown();

// Assert the cluster returns to green status because the replica will be promoted to primary
ensureGreen();
}

private int numShards(String... index) {
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(index, false);
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.Version.V_2_1_0;
import static org.opensearch.Version.V_3_0_0;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureFieldName;
Expand Down Expand Up @@ -1601,6 +1602,12 @@ private enum OpenSearchExceptionHandle {
org.opensearch.indices.replication.common.ReplicationFailedException::new,
161,
V_2_1_0
),
PRIMARY_SHARD_CLOSED_EXCEPTION(
org.opensearch.index.shard.PrimaryShardClosedException.class,
org.opensearch.index.shard.PrimaryShardClosedException::new,
162,
V_3_0_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.action.support.RetryableAction;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.shard.PrimaryShardClosedException;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.ReplicationGroup;
import org.opensearch.index.shard.ShardId;
Expand All @@ -45,6 +46,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Pending Replication Actions
Expand Down Expand Up @@ -121,23 +123,19 @@ synchronized void acceptNewTrackedAllocationIds(Set<String> trackedAllocationIds
}
}

cancelActions(toCancel, "Replica left ReplicationGroup");
cancelActions(toCancel, () -> new IndexShardClosedException(shardId, "Replica left ReplicationGroup"));
}

@Override
public synchronized void close() {
ArrayList<Set<RetryableAction<?>>> toCancel = new ArrayList<>(onGoingReplicationActions.values());
onGoingReplicationActions.clear();

cancelActions(toCancel, "Primary closed.");
cancelActions(toCancel, () -> new PrimaryShardClosedException(shardId));
}

private void cancelActions(ArrayList<Set<RetryableAction<?>>> toCancel, String message) {
private void cancelActions(ArrayList<Set<RetryableAction<?>>> toCancel, Supplier<IndexShardClosedException> exceptionSupplier) {
threadPool.executor(ThreadPool.Names.GENERIC)
.execute(
() -> toCancel.stream()
.flatMap(Collection::stream)
.forEach(action -> action.cancel(new IndexShardClosedException(shardId, message)))
);
.execute(() -> toCancel.stream().flatMap(Collection::stream).forEach(action -> action.cancel(exceptionSupplier.get())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.MapperParsingException;
import org.opensearch.index.shard.PrimaryShardClosedException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -514,15 +515,20 @@ public void failShardIfNeeded(
if (TransportActions.isShardNotAvailableException(exception) == false) {
logger.warn(new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
}
shardStateAction.remoteShardFailed(
replica.shardId(),
replica.allocationId().getId(),
primaryTerm,
true,
message,
exception,
listener
);
// If a write action fails due to the closure of the primary shard
// then the replicas should not be marked as failed since they are
// still up-to-date with the (now closed) primary shard
if (exception instanceof PrimaryShardClosedException == false) {
shardStateAction.remoteShardFailed(
replica.shardId(),
replica.allocationId().getId(),
primaryTerm,
true,
message,
exception,
listener
);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.index.shard;

import java.io.IOException;

import org.opensearch.common.io.stream.StreamInput;

/**
* Exception to indicate failures are caused due to the closure of the primary
* shard.
*
* @opensearch.internal
*/
public class PrimaryShardClosedException extends IndexShardClosedException {
public PrimaryShardClosedException(ShardId shardId) {
super(shardId, "Primary closed");
}

public PrimaryShardClosedException(StreamInput in) throws IOException {
super(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.opensearch.index.seqno.RetentionLeaseNotFoundException;
import org.opensearch.index.shard.IllegalIndexShardStateException;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.PrimaryShardClosedException;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
import org.opensearch.indices.IndexTemplateMissingException;
Expand Down Expand Up @@ -858,6 +859,7 @@ public void testIds() {
ids.put(159, NodeHealthCheckFailureException.class);
ids.put(160, NoSeedNodeLeftException.class);
ids.put(161, ReplicationFailedException.class);
ids.put(162, PrimaryShardClosedException.class);

Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends OpenSearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.common.UUIDs;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.PrimaryShardClosedException;
import org.opensearch.index.shard.ShardId;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -102,7 +103,7 @@ public void testAllocationIdActionWillBeCancelledOnClose() {
pendingReplication.addPendingAction(allocationId, action);
action.run();
pendingReplication.close();
expectThrows(IndexShardClosedException.class, future::actionGet);
expectThrows(PrimaryShardClosedException.class, future::actionGet);
}

private class TestAction extends RetryableAction<Void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.support.replication;

import org.hamcrest.MatcherAssert;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
Expand All @@ -57,6 +58,7 @@
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.PrimaryShardClosedException;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.index.translog.Translog;
Expand Down Expand Up @@ -91,6 +93,7 @@
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.emptyArray;
import static org.opensearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -395,6 +398,48 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
}
}

public void testPrimaryClosedDoesNotFailShard() {
final CapturingTransport transport = new CapturingTransport();
final TransportService transportService = transport.createTransportService(
clusterService.getSettings(),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> clusterService.localNode(),
null,
Collections.emptySet()
);
transportService.start();
transportService.acceptIncomingRequests();
final ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
final TestAction action = new TestAction(
Settings.EMPTY,
"internal:testAction",
transportService,
clusterService,
shardStateAction,
threadPool
);
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
final ClusterState state = ClusterStateCreationUtils.stateWithActivePrimary(index, true, 1, 0);
ClusterServiceUtils.setState(clusterService, state);
final long primaryTerm = state.metadata().index(index).primaryTerm(0);
final ShardRouting shardRouting = state.routingTable().shardRoutingTable(shardId).replicaShards().get(0);

// Assert that failShardIfNeeded is a no-op for the PrimaryShardClosedException failure
final AtomicInteger callbackCount = new AtomicInteger(0);
action.newReplicasProxy()
.failShardIfNeeded(
shardRouting,
primaryTerm,
"test",
new PrimaryShardClosedException(shardId),
ActionListener.wrap(callbackCount::incrementAndGet)
);
MatcherAssert.assertThat(transport.getCapturedRequestsAndClear(), emptyArray());
MatcherAssert.assertThat(callbackCount.get(), equalTo(0));
}

private class TestAction extends TransportWriteAction<TestRequest, TestRequest, TestResponse> {

private final boolean withDocumentFailureOnPrimary;
Expand Down

0 comments on commit 692e140

Please sign in to comment.