Skip to content

Commit

Permalink
Add integ tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Jul 24, 2023
1 parent 3bc7f16 commit f7de81e
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.index.IndexResponse;
Expand All @@ -33,6 +34,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.comparesEqualTo;
Expand Down Expand Up @@ -239,6 +241,38 @@ private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invo
}
}

public void testRestoreFlowAllShardsNoRedIndex() throws InterruptedException {
int shardCount = randomIntBetween(1, 5);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);

PlainActionFuture<RestoreRemoteStoreResponse> future = PlainActionFuture.newFuture();
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(true), future);
try {
future.get();
} catch (ExecutionException e) {
// If the request goes to co-ordinator, e.getCause() can be RemoteTransportException
assertTrue(e.getCause() instanceof IllegalStateException || e.getCause().getCause() instanceof IllegalStateException);
}
}

public void testRestoreFlowNoRedIndex() {
int shardCount = randomIntBetween(1, 5);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
Map<String, Long> indexStats = indexData(randomIntBetween(2, 5), true, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);

client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(false), PlainActionFuture.newFuture());

ensureGreen(INDEX_NAME);
// This is required to get updated number from already active shards which were not restored
refresh(INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
verifyRestoredData(indexStats, true, INDEX_NAME);
}

/**
* Simulates all data restored using Remote Translog Store.
* @throws IOException IO Exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ public Builder initializeAsRestore(IndexMetadata indexMetadata, SnapshotRecovery
public Builder initializeAsRemoteStoreRestore(
IndexMetadata indexMetadata,
RemoteStoreRecoverySource recoverySource,
Map<ShardId, ShardRouting> activeShards
Map<ShardId, ShardRouting> activeInitializingShards
) {
final UnassignedInfo unassignedInfo = new UnassignedInfo(
UnassignedInfo.Reason.EXISTING_INDEX_RESTORED,
Expand All @@ -466,8 +466,8 @@ public Builder initializeAsRemoteStoreRestore(
for (int shardNumber = 0; shardNumber < indexMetadata.getNumberOfShards(); shardNumber++) {
ShardId shardId = new ShardId(index, shardNumber);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
if (activeShards.containsKey(shardId)) {
indexShardRoutingBuilder.addShard(activeShards.get(shardId));
if (activeInitializingShards.containsKey(shardId)) {
indexShardRoutingBuilder.addShard(activeInitializingShards.get(shardId));
} else {
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, true, recoverySource, unassignedInfo));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,10 +565,10 @@ public Builder addAsFromOpenToClose(IndexMetadata indexMetadata) {
public Builder addAsRemoteStoreRestore(
IndexMetadata indexMetadata,
RemoteStoreRecoverySource recoverySource,
Map<ShardId, ShardRouting> activeShards
Map<ShardId, ShardRouting> activeInitializingShards
) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetadata.getIndex())
.initializeAsRemoteStoreRestore(indexMetadata, recoverySource, activeShards);
.initializeAsRemoteStoreRestore(indexMetadata, recoverySource, activeInitializingShards);
add(indexRoutingBuilder);
return this;
}
Expand Down
19 changes: 12 additions & 7 deletions server/src/main/java/org/opensearch/snapshots/RestoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.cluster.metadata.MetadataIndexUpgradeService;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.RecoverySource.RemoteStoreRecoverySource;
Expand Down Expand Up @@ -235,6 +236,7 @@ public ClusterState execute(ClusterState currentState) {
}
if (currentIndexMetadata.getSettings().getAsBoolean(SETTING_REMOTE_STORE_ENABLED, false)) {
IndexMetadata updatedIndexMetadata = currentIndexMetadata;
Map<ShardId, ShardRouting> activeInitializingShards = new HashMap<>();
if (request.restoreAllShards()) {
if (currentIndexMetadata.getState() != IndexMetadata.State.CLOSE) {
throw new IllegalStateException(
Expand All @@ -251,6 +253,15 @@ public ClusterState execute(ClusterState currentState) {
.settingsVersion(1 + currentIndexMetadata.getSettingsVersion())
.aliasesVersion(1 + currentIndexMetadata.getAliasesVersion())
.build();
} else {
activeInitializingShards = currentState.routingTable()
.index(index)
.shards()
.values()
.stream()
.map(IndexShardRoutingTable::primaryShard)
.filter(shardRouting -> shardRouting.unassigned() == false)
.collect(Collectors.toMap(ShardRouting::shardId, Function.identity()));
}

IndexId indexId = new IndexId(index, updatedIndexMetadata.getIndexUUID());
Expand All @@ -260,13 +271,7 @@ public ClusterState execute(ClusterState currentState) {
updatedIndexMetadata.getCreationVersion(),
indexId
);
Map<ShardId, ShardRouting> activeShards = currentState.routingTable()
.index(index)
.randomAllActiveShardsIt()
.getShardRoutings()
.stream()
.collect(Collectors.toMap(ShardRouting::shardId, Function.identity()));
rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, activeShards);
rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, activeInitializingShards);
blocks.updateBlocks(updatedIndexMetadata);
mdBuilder.put(updatedIndexMetadata, true);
indicesToBeRestored.add(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,18 +522,21 @@ public void testAddAsRemoteStoreRestoreWithActiveShards() {
Version.CURRENT,
new IndexId(TEST_INDEX_1, "1")
);
Map<ShardId, ShardRouting> activeShards = new HashMap<>();
Map<ShardId, ShardRouting> activeInitializingShards = new HashMap<>();
for (int i = 0; i < randomIntBetween(1, this.numberOfShards); i++) {
activeShards.put(new ShardId(indexMetadata.getIndex(), i), mock(ShardRouting.class));
activeInitializingShards.put(new ShardId(indexMetadata.getIndex(), i), mock(ShardRouting.class));
}
final RoutingTable routingTable = new RoutingTable.Builder().addAsRemoteStoreRestore(
indexMetadata,
remoteStoreRecoverySource,
activeShards
activeInitializingShards
).build();
assertTrue(routingTable.hasIndex(TEST_INDEX_1));
assertEquals(this.numberOfShards, routingTable.allShards(TEST_INDEX_1).size());
assertEquals(this.numberOfShards - activeShards.size(), routingTable.index(TEST_INDEX_1).shardsWithState(UNASSIGNED).size());
assertEquals(
this.numberOfShards - activeInitializingShards.size(),
routingTable.index(TEST_INDEX_1).shardsWithState(UNASSIGNED).size()
);
}

/** reverse engineer the in sync aid based on the given indexRoutingTable **/
Expand Down

0 comments on commit f7de81e

Please sign in to comment.