Skip to content

Commit

Permalink
Segment Replication - Fix ShardLockObtained error during corruption c…
Browse files Browse the repository at this point in the history
…ases

This change fixes a bug where shards could not be recreated locally after corruption.
This occured because the store was not decref'd to 0 if the commit on close would fail
with a corruption exception.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Oct 4, 2023
1 parent 5aa1863 commit 42f5728
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,10 @@ protected IndexShard getIndexShard(String node, ShardId shardId, String indexNam
protected IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
IndexService indexService = indicesService.indexService(index);
assertNotNull(indexService);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return indexService.getShard(shardId.get());
return shardId.map(indexService::getShard).orElse(null);
}

protected boolean segmentReplicationWithRemoteEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
Expand All @@ -70,6 +73,8 @@
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
Expand All @@ -82,6 +87,7 @@
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;
import org.junit.Before;

Expand All @@ -94,6 +100,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -1777,4 +1784,139 @@ public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException {

}

public void testSendCorruptBytesToReplica() throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
);
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.refresh_interval", -1)
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
primaryTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK) && failed.get() == false) {
failed.compareAndSet(false, true);
FileChunkRequest req = (FileChunkRequest) request;
logger.info("SENDING CORRUPT file chunk [{}] lastChunk: {}", req, req.lastChunk());
TransportRequest corrupt = new FileChunkRequest(
req.recoveryId(),
((FileChunkRequest) request).requestSeqNo(),
((FileChunkRequest) request).shardId(),
((FileChunkRequest) request).metadata(),
((FileChunkRequest) request).position(),
new BytesArray("test"),
false,
0,
0L
);
connection.sendRequest(requestId, action, corrupt, options);
latch.countDown();
} else {
connection.sendRequest(requestId, action, request, options);
}
}
);
for (int i = 0; i < 100; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
latch.await();
assertTrue(failed.get());
final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
assertBusy(() -> {
// wait until the original shard is closed.
assertEquals(IndexShardState.CLOSED, indexShard.state());
assertTrue(indexShard.store().refCount() == 0);
});
waitForActiveShardOnNode(replicaNode);
// reset checkIndex to ensure our original shard doesn't throw
resetCheckIndexStatus();
assertDocCounts(100, primaryNode, replicaNode);
}

public void testWipeSegmentBetweenSyncs() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.refresh_interval", -1)
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < 100; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
ensureGreen(INDEX_NAME);

final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
waitForSearchableDocs(INDEX_NAME, 100, List.of(replicaNode));
logger.info("All files {}", List.of(indexShard.store().directory().listAll()));
indexShard.store().directory().deleteFile("_0.si");
logger.info("post files {}", List.of(indexShard.store().directory().listAll()));

for (int i = 101; i < 201; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
assertBusy(() -> {
// wait until the original shard is closed.
assertEquals(IndexShardState.CLOSED, indexShard.state());
assertTrue(indexShard.store().refCount() == 0);
});
waitForActiveShardOnNode(replicaNode);
// reset checkIndex to ensure our original shard doesn't throw
resetCheckIndexStatus();
ensureGreen(INDEX_NAME);
assertDocCounts(200, primaryNode, replicaNode);
}

private void waitForActiveShardOnNode(String replicaNode) throws Exception {
assertBusy(() -> {
// wait until the shard is created
final Index index = resolveIndex(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode);
IndexService indexService = indicesService.indexService(index);
assertNotNull(indexService);
IndexShard indexShard1 = getIndexShard(replicaNode, INDEX_NAME);
assertNotNull(indexShard1);
assertFalse(indexShard1.store().isMarkedCorrupted());
assertEquals(IndexShardState.STARTED, indexShard1.state());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -437,13 +437,38 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
latestSegmentInfos.changed();
}
commitSegmentInfos(latestSegmentInfos);
IOUtils.close(readerManager, translogManager, store::decRef);
try {
commitSegmentInfos(latestSegmentInfos);
} catch (IOException e) {
// mark the store corrupted unless we are closing as result of engine failure.
// in this case Engine#failShard will handle store corruption.
if (failEngineLock.isHeldByCurrentThread() == false && store.isMarkedCorrupted() == false) {
try {
store.markStoreCorrupted(e);
} catch (IOException ex) {
logger.warn("Unable to mark store corrupted", ex);
}
}
}
try {
IOUtils.close(readerManager);
} catch (Exception e) {
logger.warn("Failed to close reader manager");
}
try {
IOUtils.close(translogManager);
} catch (Exception e) {
logger.warn("Failed to close translog");
}
} catch (Exception e) {
logger.warn("failed to close engine", e);
} finally {
logger.debug("engine closed [{}]", reason);
closedLatch.countDown();
try {
store.decRef();
logger.warn("engine closed [{}]", reason);
} finally {
closedLatch.countDown();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ThreadInterruptedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.admin.indices.flush.FlushRequest;
Expand Down Expand Up @@ -1771,6 +1772,8 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
public Map<String, StoreFileMetadata> getSegmentMetadataMap() throws IOException {
try (final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot()) {
return store.getSegmentMetadataMap(snapshot.get());
} catch (IOException e) {
throw new OpenSearchCorruptionException("Error fetching local metadata");
}
}

Expand Down
8 changes: 7 additions & 1 deletion server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,13 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio
*/
public Map<String, StoreFileMetadata> getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException {
assert indexSettings.isSegRepEnabled();
return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
failIfCorrupted();
try {
return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
} catch (NoSuchFileException | CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
markStoreCorrupted(ex);
throw ex;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.test.IndexSettingsModule;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -577,6 +578,29 @@ public void testDecrefToZeroRemovesFile() throws IOException {
}
}

public void testCommitOnCloseThrowsException_decRefStore() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS);
List<Engine.Operation> operations = generateHistoryOnReplica(
randomIntBetween(1, 10),
randomBoolean(),
randomBoolean(),
randomBoolean()
);
indexOperations(nrtEngine, operations.subList(0, 2));
// wipe the nrt directory initially so we can sync with primary.
cleanAndCopySegmentsFromPrimary(nrtEngine);
nrtEngineStore.directory().deleteFile("_0.si");
assertEquals(2, nrtEngineStore.refCount());
nrtEngine.close();
assertEquals(1, nrtEngineStore.refCount());
assertTrue(nrtEngineStore.isMarkedCorrupted());
// store will throw when eventually closed, not handled here.
assertThrows(UncheckedIOException.class, nrtEngineStore::close);
}

private void copySegments(Collection<String> latestPrimaryFiles, Engine nrtEngine) throws IOException {
final Store store = nrtEngine.store;
final List<String> replicaFiles = List.of(store.directory().listAll());
Expand Down

0 comments on commit 42f5728

Please sign in to comment.