Skip to content

Commit

Permalink
WIP - debug restore failures with replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
Bhumika Saini committed Jul 6, 2023
1 parent c1c23b4 commit 53e44b6
Showing 1 changed file with 215 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
Expand All @@ -42,6 +44,9 @@
public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
private static final String INDEX_NAME_WILDCARD = "remote-store-test-*";
private static final String INDEX_NAMES = "remote-store-test-idx-1,remote-store-test-idx-2,remote-store-test-index-1,remote-store-test-index-2";
private static final String INDEX_NAMES_WILDCARD = "remote-store-test-idx-*,remote-store-test-index-*";
private static final String TOTAL_OPERATIONS = "total-operations";
private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations";
private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total";
Expand Down Expand Up @@ -69,16 +74,16 @@ private IndexResponse indexSingleDoc() {
.get();
}

private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush) {
private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush, String index) {
long totalOperations = 0;
long refreshedOrFlushedOperations = 0;
long maxSeqNo = -1;
long maxSeqNoRefreshedOrFlushed = -1;
for (int i = 0; i < numberOfIterations; i++) {
if (invokeFlush) {
flush(INDEX_NAME);
flush(index);
} else {
refresh(INDEX_NAME);
refresh(index);
}
maxSeqNoRefreshedOrFlushed = maxSeqNo;
refreshedOrFlushedOperations = totalOperations;
Expand All @@ -97,76 +102,257 @@ private Map<String, Long> indexData(int numberOfIterations, boolean invokeFlush)
return indexingStats;
}

private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal) {
private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal, String indexName) {
String statsGranularity = checkTotal ? TOTAL_OPERATIONS : REFRESHED_OR_FLUSHED_OPERATIONS;
String maxSeqNoGranularity = checkTotal ? MAX_SEQ_NO_TOTAL : MAX_SEQ_NO_REFRESHED_OR_FLUSHED;
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity));
ensureYellowAndNoInitializingShards(indexName);
ensureGreen(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity));
IndexResponse response = indexSingleDoc();
assertEquals(indexStats.get(maxSeqNoGranularity) + 1, response.getSeqNo());
refresh(INDEX_NAME);
assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(statsGranularity) + 1);
refresh(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity) + 1);
}

private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException {
internalCluster().startDataOnlyNodes(3);
if (remoteTranslog) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
} else {
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, boolean remoteTranslogEnabled, String indices, int replicaCount) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
for (String index : indices.split(",")) {
if (remoteTranslogEnabled) {
createIndex(index, remoteTranslogIndexSettings(replicaCount));
} else {
createIndex(index, remoteStoreIndexSettings(replicaCount));
}

ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
}

/**
* Helper function to test restoring an index with no replication from remote store. Only primary node is dropped.
* @param remoteTranslog If true, Remote Translog Store is also enabled in addition to Remote Segment Store.
* @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data.
* @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked.
* @throws IOException IO Exception.
*/
private void testRestoreFlow(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException {
prepareCluster(0, 3, remoteTranslog, INDEX_NAME, 0);

Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush);
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME);

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));

assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());

ensureGreen(INDEX_NAME);
verifyRestoredData(indexStats, remoteTranslog, INDEX_NAME);
}

if (remoteTranslog) {
verifyRestoredData(indexStats, true);
} else {
verifyRestoredData(indexStats, false);
}
/**
* Helper function to test restoring an index having replicas from remote store when all the nodes housing the primary/replica drop.
* @param remoteTranslog If true, Remote Translog Store is also enabled in addition to Remote Segment Store.
* @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data.
* @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked.
* @throws IOException IO Exception.
*/
private void testRestoreFlowBothPrimaryReplicasDown(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException, InterruptedException {
prepareCluster(1, 2, remoteTranslog, INDEX_NAME, 1);

Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME);

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNodeName(INDEX_NAME)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureRed(INDEX_NAME);

internalCluster().startDataOnlyNodes(2);
Thread.sleep(10000);

assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());

ensureYellow(INDEX_NAME);
verifyRestoredData(indexStats, remoteTranslog, INDEX_NAME);
}

// /**
// * Helper function to test restoring multiple indices from remote store when all the nodes housing the primary/replica drop.
// * @param remoteTranslog If true, Remote Translog Store is also enabled in addition to Remote Segment Store.
// * @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data.
// * @param invokeFlush If true, a flush is invoked. Otherwise, a refresh is invoked.
// * @throws IOException IO Exception.
// */
// private void testRestoreFlowMultipleIndices(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws IOException {
// internalCluster().startDataOnlyNodes(3);
// if (remoteTranslog) {
// createIndex(INDEX_NAME, remoteTranslogIndexSettings(1));
// } else {
// createIndex(INDEX_NAME, remoteStoreIndexSettings(1));
// }
// ensureYellowAndNoInitializingShards(INDEX_NAME);
// ensureGreen(INDEX_NAME);
//
// Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME);
//
// internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
// internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNodeName(INDEX_NAME)));
// assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
//
// client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
// ensureGreen(INDEX_NAME);
//
// if (remoteTranslog) {
// verifyRestoredData(indexStats, true, INDEX_NAME);
// } else {
// verifyRestoredData(indexStats, false, INDEX_NAME);
// }
// }

/**
* Simulates full data loss due to unrefreshed data, with no data restored from Remote Segment Store.
* @throws IOException IO Exception.
*/
public void testRemoteSegmentStoreRestoreWithNoDataPostCommit() throws IOException {
testRestoreFlow(false, 1, true);
}

/**
* Simulates full data loss due to unrefreshed data, with no data restored from Remote Segment Store.
* @throws IOException IO Exception.
*/
public void testRemoteSegmentStoreRestoreWithNoDataPostRefresh() throws IOException {
testRestoreFlow(false, 1, false);
}

/**
* Simulates data restored until the refreshed data in Remote Segment Store
* and data loss for the unrefreshed data.
* @throws IOException IO Exception.
*/
public void testRemoteSegmentStoreRestoreWithRefreshedData() throws IOException {
testRestoreFlow(false, randomIntBetween(2, 5), false);
}

/**
* Simulates data restored until the refreshed data in Remote Segment Store
* and data loss for the unrefreshed data.
* @throws IOException IO Exception.
*/
public void testRemoteSegmentStoreRestoreWithCommittedData() throws IOException {
testRestoreFlow(false, randomIntBetween(2, 5), true);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188")
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188")
/**
* Simulates all data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
public void testRemoteTranslogRestoreWithNoDataPostCommit() throws IOException {
testRestoreFlow(true, 1, true);
}

/**
* Simulates all data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
public void testRemoteTranslogRestoreWithNoDataPostRefresh() throws IOException {
testRestoreFlow(true, 1, false);
}

/**
* Simulates refreshed data restored using Remote Segment Store
* and unrefreshed data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
public void testRemoteTranslogRestoreWithRefreshedData() throws IOException {
testRestoreFlow(true, randomIntBetween(2, 5), false);
}

/**
* Simulates refreshed data restored using Remote Segment Store
* and unrefreshed data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
public void testRemoteTranslogRestoreWithCommittedData() throws IOException {
testRestoreFlow(true, randomIntBetween(2, 5), true);
}

// /**
// * Simulates full data loss due to unrefreshed data, with no data restored from Remote Segment Store.
// * @throws IOException IO Exception.
// */
// public void testRSSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOException {
// testRestoreFlowBothPrimaryReplicasDown(false, 1, true);
// }
//
// /**
// * Simulates full data loss due to unrefreshed data, with no data restored from Remote Segment Store.
// * @throws IOException IO Exception.
// */
// public void testRSSRestoreWithNoDataPostRefreshPrimaryReplicaDown() throws IOException {
// testRestoreFlowBothPrimaryReplicasDown(false, 1, false);
// }

/**
* Simulates data restored until the refreshed data in Remote Segment Store
* and data loss for the unrefreshed data.
* @throws IOException IO Exception.
*/
public void testRSSRestoreWithRefreshedDataPrimaryReplicaDown() throws IOException, InterruptedException {
testRestoreFlowBothPrimaryReplicasDown(false, randomIntBetween(2, 5), false);
}

// /**
// * Simulates data restored until the refreshed data in Remote Segment Store
// * and data loss for the unrefreshed data.
// * @throws IOException IO Exception.
// */
// public void testRSSRestoreWithCommittedDataPrimaryReplicaDown() throws IOException {
// testRestoreFlowBothPrimaryReplicasDown(false, randomIntBetween(2, 5), true);
// }
//
//// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188")
// /**
// * Simulates all data restored using Remote Translog Store.
// * @throws IOException IO Exception.
// */
// public void testRTSWithNoDataPostCommitPrimaryReplicaDown() throws IOException {
// testRestoreFlowBothPrimaryReplicasDown(true, 1, true);
// }
//
// /**
// * Simulates all data restored using Remote Translog Store.
// * @throws IOException IO Exception.
// */
// public void testRTSWithNoDataPostRefreshPrimaryReplicaDown() throws IOException {
// testRestoreFlowBothPrimaryReplicasDown(true, 1, false);
// }
//
// /**
// * Simulates refreshed data restored using Remote Segment Store
// * and unrefreshed data restored using Remote Translog Store.
// * @throws IOException IO Exception.
// */
// public void testRTSWithRefreshedDataPrimaryReplicaDown() throws IOException {
// testRestoreFlowBothPrimaryReplicasDown(true, randomIntBetween(2, 5), false);
// }
//
// /**
// * Simulates refreshed data restored using Remote Segment Store
// * and unrefreshed data restored using Remote Translog Store.
// * @throws IOException IO Exception.
// */
// public void testRTSWithCommittedDataPrimaryReplicaDown() throws IOException {
// testRestoreFlowBothPrimaryReplicasDown(true, randomIntBetween(2, 5), true);
// }



// public void testRemoteTranslogRestoreWithCommittedDataIndexPattern() throws IOException {
// testRestoreFlow(true, randomIntBetween(2, 5), true);
// }

private void testPeerRecovery(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws Exception {
internalCluster().startDataOnlyNodes(3);
if (remoteTranslog) {
Expand All @@ -177,7 +363,7 @@ private void testPeerRecovery(boolean remoteTranslog, int numberOfIterations, bo
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush);
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME);

client().admin()
.indices()
Expand Down Expand Up @@ -260,7 +446,7 @@ private void verifyRemoteStoreCleanup(boolean remoteTranslog) throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1));
}

indexData(5, randomBoolean());
indexData(5, randomBoolean(), INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
Expand Down Expand Up @@ -289,7 +475,7 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
internalCluster().startDataOnlyNodes(3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
Expand All @@ -313,7 +499,7 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
internalCluster().startDataOnlyNodes(3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, false);
indexData(numberOfIterations, false, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
Expand Down

0 comments on commit 53e44b6

Please sign in to comment.