Skip to content

Commit

Permalink
Fix bugs causing red indexes with remote indexes during translog uplo…
Browse files Browse the repository at this point in the history
…ad & store recovery (#10449) (#10498)

---------


(cherry picked from commit 8bb11a6)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 6c04459 commit c5b64ee
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private String getLocalSegmentFilename(String remoteFilename) {
return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0];
}

private IndexResponse indexSingleDoc() {
protected IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AbstractAsyncTask;
import org.opensearch.common.util.concurrent.UncategorizedExecutionException;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.IndexService;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -33,7 +39,7 @@
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
public class RemoteStoreBackpressureAndResiliencyIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
public void testWritesRejectedDueToConsecutiveFailureBreach() throws Exception {
// Here the doc size of the request remains same throughout the test. After initial indexing, all remote store interactions
// fail leading to consecutive failure limit getting exceeded and leading to rejections.
Expand Down Expand Up @@ -156,4 +162,70 @@ private String generateString(int sizeInBytes) {
sb.append("}");
return sb.toString();
}

/**
* Fixes <a href="https://github.com/opensearch-project/OpenSearch/issues/10398">Github#10398</a>
*/
public void testAsyncTrimTaskSucceeds() {
Path location = randomRepoPath().toAbsolutePath();
String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE);

logger.info("Increasing the frequency of async trim task to ensure it runs in background while indexing");
IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next();
((AbstractAsyncTask) indexService.getTrimTranslogTask()).setInterval(TimeValue.timeValueMillis(100));

logger.info("--> Indexing data");
indexData(randomIntBetween(2, 5), true);
logger.info("--> Indexing succeeded");

MockRepository translogRepo = (MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName)
.repository(TRANSLOG_REPOSITORY_NAME);
logger.info("--> Failing all remote store interaction");
translogRepo.setRandomControlIOExceptionRate(1d);

for (int i = 0; i < randomIntBetween(5, 10); i++) {
UncategorizedExecutionException exception = assertThrows(UncategorizedExecutionException.class, this::indexSingleDoc);
assertEquals("Failed execution", exception.getMessage());
}

translogRepo.setRandomControlIOExceptionRate(0d);
indexSingleDoc();
logger.info("Indexed single doc successfully");
}

/**
* Fixes <a href="https://github.com/opensearch-project/OpenSearch/issues/10400">Github#10400</a>
*/
public void testSkipLoadGlobalCheckpointToReplicationTracker() {
Path location = randomRepoPath().toAbsolutePath();
String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE);

logger.info("--> Indexing data");
indexData(randomIntBetween(1, 2), true);
logger.info("--> Indexing succeeded");

IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNodeName).iterator().next();
IndexShard indexShard = indexService.getShard(0);
indexShard.failShard("failing shard", null);

ensureRed(INDEX_NAME);

MockRepository translogRepo = (MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName)
.repository(TRANSLOG_REPOSITORY_NAME);
logger.info("--> Failing all remote store interaction");
translogRepo.setRandomControlIOExceptionRate(1d);
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
// CLuster stays red still as the remote interactions are still failing
ensureRed(INDEX_NAME);

logger.info("Retrying to allocate failed shards");
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
// CLuster stays red still as the remote interactions are still failing
ensureRed(INDEX_NAME);

logger.info("Stop failing all remote store interactions");
translogRepo.setRandomControlIOExceptionRate(0d);
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
ensureGreen(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1286,7 +1286,7 @@ AsyncTranslogFSync getFsyncTask() { // for tests
return fsyncTask;
}

AsyncTrimTranslogTask getTrimTranslogTask() { // for tests
public AsyncTrimTranslogTask getTrimTranslogTask() { // for tests
return trimTranslogTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,9 @@ public void flush(FlushRequest request) {
* {@link org.opensearch.index.translog.TranslogDeletionPolicy} for details
*/
public void trimTranslog() {
if (isRemoteTranslogEnabled()) {
return;
}
verifyNotClosed();
final Engine engine = getEngine();
engine.trimUnreferencedTranslogFiles();
Expand Down Expand Up @@ -2320,7 +2323,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
};

// Do not load the global checkpoint if this is a remote snapshot index
if (indexSettings.isRemoteSnapshot() == false) {
if (indexSettings.isRemoteSnapshot() == false && indexSettings.isRemoteTranslogStoreEnabled() == false) {
loadGlobalCheckpointToReplicationTracker();
}

Expand Down

0 comments on commit c5b64ee

Please sign in to comment.