Skip to content

Commit

Permalink
Fixing ITs
Browse files Browse the repository at this point in the history
Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
  • Loading branch information
psychbot authored and Sachin Kale committed Aug 31, 2023
1 parent a227d20 commit c94713e
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)

public class PrimaryTermValidationIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
Expand All @@ -64,17 +62,11 @@ public void testPrimaryTermValidation() throws Exception {
.put(remoteStoreClusterSettings(REPOSITORY_NAME, REPOSITORY_2_NAME, true))
.build();
internalCluster().startClusterManagerOnlyNode(clusterSettings);

// Create repository
absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
absolutePath2 = randomRepoPath().toAbsolutePath();
putRepository(absolutePath2, REPOSITORY_2_NAME);

// Start data nodes and create index
internalCluster().startDataOnlyNodes(2, clusterSettings);
ensureStableCluster(3);
assertRepositoryMetadataPresentInClusterState();

// Create index
createIndex(INDEX_NAME, remoteStoreIndexSettings(1));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ public class RemoteSegmentStatsFromNodesStatsIT extends RemoteStoreBaseIntegTest
private static final int CLUSTER_MANAGER_NODE_COUNT = 3;

@Before
public void setup() {
public void setup() throws Exception {
setupCustomCluster();
setupRepo(false);
}

private void setupCustomCluster() {
private void setupCustomCluster() throws Exception {
internalCluster().startClusterManagerOnlyNodes(CLUSTER_MANAGER_NODE_COUNT);
internalCluster().startDataOnlyNodes(DATA_NODE_COUNT);
ensureStableCluster(DATA_NODE_COUNT + CLUSTER_MANAGER_NODE_COUNT);
assertRepositoryMetadataPresentInClusterState();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.FileVisitResult;
Expand Down Expand Up @@ -61,8 +61,8 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total";
protected static final String MAX_SEQ_NO_REFRESHED_OR_FLUSHED = "max-seq-no-refreshed-or-flushed";

protected Path absolutePath;
protected Path absolutePath2;
protected Path segmentRepoPath;
protected Path translogRepoPath;
protected Settings nodeAttributesSettings;
private final List<String> documentKeys = List.of(
randomAlphaOfLength(5),
Expand Down Expand Up @@ -120,7 +120,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, REPOSITORY_2_NAME, true))
.put(remoteStoreNodeAttributes(REPOSITORY_NAME, REPOSITORY_2_NAME))
.put(remoteStoreNodeAttributes(REPOSITORY_NAME, FsRepository.TYPE, REPOSITORY_2_NAME, FsRepository.TYPE))
.build();
}

Expand Down Expand Up @@ -191,36 +191,51 @@ public static Settings remoteStoreClusterSettings(String segmentRepoName, String
return settingsBuilder.build();
}

public Settings remoteStoreNodeAttributes(String segmentRepoName, String translogRepoName) {
public Settings remoteStoreNodeAttributes(
String segmentRepoName,
String segmentRepoType,
String translogRepoName,
String translogRepoType
) {
if (nodeAttributesSettings != null) {
return nodeAttributesSettings;
}
absolutePath = randomRepoPath().toAbsolutePath();
absolutePath2 = randomRepoPath().toAbsolutePath();
segmentRepoPath = randomRepoPath().toAbsolutePath();
translogRepoPath = randomRepoPath().toAbsolutePath();
String segmentRepoKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
segmentRepoName
);
String translogRepoKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
translogRepoName
);
String segmentRepoSettingsPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
segmentRepoName
);
String translogRepoSettingsPrefix = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX,
translogRepoName
);

if (segmentRepoName.equals(translogRepoName)) {
absolutePath2 = absolutePath;
segmentRepoPath = translogRepoPath;
segmentRepoKey = translogRepoKey;
segmentRepoSettingsPrefix = translogRepoSettingsPrefix;
}

nodeAttributesSettings = Settings.builder()
.put("node.attr." + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, segmentRepoName)
.put(
String.format(Locale.getDefault(), "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, segmentRepoName),
"fs"
)
.put(
String.format(Locale.getDefault(), "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, segmentRepoName)
+ "location",
absolutePath.toString()
)
.put(segmentRepoKey, segmentRepoType)
.put(segmentRepoSettingsPrefix + "location", segmentRepoPath.toString())
.put("node.attr." + REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, translogRepoName)
.put(
String.format(Locale.getDefault(), "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, translogRepoName),
"fs"
)
.put(
String.format(Locale.getDefault(), "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, translogRepoName)
+ "location",
absolutePath2.toString()
)
.put(translogRepoKey, translogRepoType)
.put(translogRepoSettingsPrefix + "location", translogRepoPath.toString())
.build();
return nodeAttributesSettings;
}
Expand Down Expand Up @@ -264,11 +279,6 @@ protected void putRepository(Path path, String repoName) {
assertAcked(clusterAdmin().preparePutRepository(repoName).setType("fs").setSettings(Settings.builder().put("location", path)));
}

@Before
public void setup() throws Exception {
assertRepositoryMetadataPresentInClusterState();
}

@After
public void teardown() {
nodeAttributesSettings = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
Expand Down Expand Up @@ -54,6 +51,8 @@ public Settings indexSettings() {

private void testPeerRecovery(int numberOfIterations, boolean invokeFlush) throws Exception {
internalCluster().startNodes(3);
ensureStableCluster(3);
assertRepositoryMetadataPresentInClusterState();
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -115,6 +114,8 @@ public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exc

public void verifyRemoteStoreCleanup() throws Exception {
internalCluster().startNodes(3);
ensureStableCluster(3);
assertRepositoryMetadataPresentInClusterState();
createIndex(INDEX_NAME, remoteStoreIndexSettings(1));

indexData(5, randomBoolean(), INDEX_NAME);
Expand All @@ -123,7 +124,7 @@ public void verifyRemoteStoreCleanup() throws Exception {
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID);
assertTrue(getFileCount(indexPath) > 0);
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
// Delete is async. Give time for it
Expand All @@ -141,6 +142,8 @@ public void testRemoteTranslogCleanup() throws Exception {

public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
internalCluster().startNode();
ensureStableCluster(1);
assertRepositoryMetadataPresentInClusterState();
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
Expand All @@ -149,7 +152,7 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata");
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
// Delete is async.
assertBusy(() -> {
int actualFileCount = getFileCount(indexPath);
Expand All @@ -168,6 +171,8 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {

public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
internalCluster().startNode();
ensureStableCluster(1);
assertRepositoryMetadataPresentInClusterState();
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, false, INDEX_NAME);
Expand All @@ -176,7 +181,7 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata");
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String na
}

private void assertRemoteStoreRepositoryOnAllNodes() throws Exception {
assertRepositoryMetadataPresentInClusterState();
RepositoriesMetadata repositories = internalCluster().getInstance(ClusterService.class, internalCluster().getNodeNames()[0])
.state()
.metadata()
Expand All @@ -71,18 +72,20 @@ private void assertRemoteStoreRepositoryOnAllNodes() throws Exception {

public void testSingleNodeClusterRepositoryRegistration() throws Exception {
internalCluster().startNode();
ensureStableCluster(1);
assertRemoteStoreRepositoryOnAllNodes();
}

public void testMultiNodeClusterRepositoryRegistration() throws Exception {
internalCluster().startNodes(3);
ensureStableCluster(3);
assertRemoteStoreRepositoryOnAllNodes();
}

public void testMultiNodeClusterRepositoryRegistrationWithMultipleMasters() throws Exception {
internalCluster().startClusterManagerOnlyNodes(3);
internalCluster().startNodes(3);

ensureStableCluster(6);
assertRemoteStoreRepositoryOnAllNodes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.junit.Before;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -53,11 +52,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

@Before
public void setup() {
setupRepo();
}

private void restore(String... indices) {
boolean restoreAllShards = randomBoolean();
if (restoreAllShards) {
Expand Down Expand Up @@ -93,14 +87,16 @@ private void verifyRestoredData(Map<String, Long> indexStats, String indexName)
);
}

private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount)
throws Exception {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
assertRepositoryMetadataPresentInClusterState();
}

/**
Expand Down Expand Up @@ -187,7 +183,7 @@ private void restoreAndVerify(int shardCount, int replicaCount, Map<String, Long
* @throws IOException IO Exception.
*/
private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, int shardCount) throws Exception {
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
prepareCluster(1, 3, INDEX_NAME, 0, shardCount);
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);

Expand Down Expand Up @@ -267,9 +263,9 @@ private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invo
}
}

public void testRestoreFlowAllShardsNoRedIndex() throws InterruptedException {
public void testRestoreFlowAllShardsNoRedIndex() throws Exception {
int shardCount = randomIntBetween(1, 5);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
prepareCluster(1, 3, INDEX_NAME, 0, shardCount);
indexData(randomIntBetween(2, 5), true, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);

Expand All @@ -285,7 +281,7 @@ public void testRestoreFlowAllShardsNoRedIndex() throws InterruptedException {

public void testRestoreFlowNoRedIndex() throws Exception {
int shardCount = randomIntBetween(1, 5);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
prepareCluster(1, 3, INDEX_NAME, 0, shardCount);
Map<String, Long> indexStats = indexData(randomIntBetween(2, 5), true, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);

Expand Down Expand Up @@ -474,7 +470,7 @@ public void testRateLimitedRemoteDownloads() throws Exception {
)
);
int shardCount = randomIntBetween(1, 3);
prepareCluster(0, 3, INDEX_NAME, 0, shardCount);
prepareCluster(1, 3, INDEX_NAME, 0, shardCount);
Map<String, Long> indexStats = indexData(5, false, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.util.Locale;
import java.util.concurrent.CountDownLatch;
Expand All @@ -38,11 +37,6 @@
public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIntegTestCase {
private int shard_count = 5;

@Before
public void setup() {
setupRepo();
}

@Override
public Settings indexSettings() {
return Settings.builder().put(super.indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shard_count).build();
Expand All @@ -51,6 +45,8 @@ public Settings indexSettings() {
public void testPromoteReplicaToPrimary() throws Exception {
internalCluster().startNode();
internalCluster().startNode();
ensureStableCluster(2);
assertRepositoryMetadataPresentInClusterState();
final String indexName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
shard_count = scaledRandomIntBetween(1, 5);
createIndex(indexName);
Expand Down Expand Up @@ -126,6 +122,8 @@ public void testPromoteReplicaToPrimary() throws Exception {
public void testFailoverWhileIndexing() throws Exception {
internalCluster().startNode();
internalCluster().startNode();
ensureStableCluster(2);
assertRepositoryMetadataPresentInClusterState();
final String indexName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
shard_count = scaledRandomIntBetween(1, 5);
createIndex(indexName);
Expand Down
Loading

0 comments on commit c94713e

Please sign in to comment.