Skip to content

Commit

Permalink
Revert "[Remote Store] Fix sleep time bug during remote store sync (o…
Browse files Browse the repository at this point in the history
…pensearch-project#14037)"

This reverts commit afeddc2.

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna committed Jun 13, 2024
1 parent 18c5bb6 commit 30aac19
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,18 @@
package org.opensearch.remotemigration;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.common.Priority;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
Expand All @@ -45,7 +39,6 @@
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.repositories.fs.ReloadableFsRepository.REPOSITORIES_FAILRATE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
Expand Down Expand Up @@ -121,10 +114,6 @@ public void initDocRepToRemoteMigration() {
);
}

public ClusterHealthStatus ensureGreen(String... indices) {
return ensureGreen(TimeValue.timeValueSeconds(60), indices);
}

public BulkResponse indexBulk(String indexName, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
Expand Down Expand Up @@ -192,12 +181,14 @@ private Thread getIndexingThread() {
long currentDocCount = indexedDocs.incrementAndGet();
if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) {
if (rarely()) {
logger.info("--> [iteration {}] flushing index", currentDocCount);
client().admin().indices().prepareFlush(indexName).get();
logger.info("Completed ingestion of {} docs. Flushing now", currentDocCount);
} else {
logger.info("--> [iteration {}] refreshing index", currentDocCount);
client().admin().indices().prepareRefresh(indexName).get();
}
}
logger.info("Completed ingestion of {} docs", currentDocCount);
}
});
}
Expand Down Expand Up @@ -227,21 +218,4 @@ public void stopShardRebalancing() {
.get()
);
}

public ClusterHealthStatus waitForRelocation() {
ClusterHealthRequest request = Requests.clusterHealthRequest()
.waitForNoRelocatingShards(true)
.timeout(TimeValue.timeValueSeconds(60))
.waitForEvents(Priority.LANGUID);
ClusterHealthResponse actionGet = client().admin().cluster().health(request).actionGet();
if (actionGet.isTimedOut()) {
logger.info(
"waitForRelocation timed out, cluster state:\n{}\n{}",
client().admin().cluster().prepareState().get().getState(),
client().admin().cluster().preparePendingClusterTasks().get()
);
assertThat("timed out waiting for relocation", actionGet.isTimedOut(), equalTo(false));
}
return actionGet.getStatus();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,16 @@ public void testRemotePrimaryRelocation() throws Exception {
.add(new MoveAllocationCommand("test", 0, primaryNodeName("test"), remoteNode))
.execute()
.actionGet();
waitForRelocation();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
assertEquals(remoteNode, primaryNodeName("test"));
logger.info("--> relocation from docrep to remote complete");

Expand All @@ -114,7 +123,16 @@ public void testRemotePrimaryRelocation() throws Exception {
.add(new MoveAllocationCommand("test", 0, remoteNode, remoteNode2))
.execute()
.actionGet();
waitForRelocation();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setTimeout(TimeValue.timeValueSeconds(60))
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.execute()
.actionGet();

assertEquals(0, clusterHealthResponse.getRelocatingShards());
assertEquals(remoteNode2, primaryNodeName("test"));

logger.info("--> relocation from remote to remote complete");
Expand All @@ -137,6 +155,7 @@ public void testRemotePrimaryRelocation() throws Exception {

public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
String docRepNode = internalCluster().startNode();
Client client = internalCluster().client(docRepNode);
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2146,7 +2146,7 @@ public void waitForRemoteStoreSync(Runnable onProgress) throws IOException {
segmentUploadeCount = directory.getSegmentsUploadedToRemoteStore().size();
}
try {
Thread.sleep(TimeValue.timeValueSeconds(30).millis());
Thread.sleep(TimeValue.timeValueSeconds(30).seconds());
} catch (InterruptedException ie) {
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
}
Expand Down

0 comments on commit 30aac19

Please sign in to comment.