Skip to content

Commit

Permalink
fix flaky tests in SegmentReplicationPressureIT (#6868)
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <poojiraj@amazon.com>
  • Loading branch information
Poojita-Raj authored Mar 29, 2023
1 parent bd9b00d commit 7b708c4
Showing 1 changed file with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.replication.SegmentReplicationBaseIT;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -52,12 +53,24 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

@Override
public Settings indexSettings() {
// we want to control refreshes
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.refresh_interval", -1)
.build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6671")
public void testWritesRejected() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
Expand All @@ -76,6 +89,10 @@ public void testWritesRejected() throws Exception {
indexingThread.start();
indexingThread.join();
latch.await();

indexDoc();
totalDocs.incrementAndGet();
refresh(INDEX_NAME);
// index again while we are stale.
assertBusy(() -> {
expectThrows(OpenSearchRejectedExecutionException.class, () -> {
Expand All @@ -90,6 +107,7 @@ public void testWritesRejected() throws Exception {

// index another doc showing there is no pressure enforced.
indexDoc();
refresh(INDEX_NAME);
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}
Expand All @@ -98,7 +116,6 @@ public void testWritesRejected() throws Exception {
* This test ensures that a replica can be added while the index is under write block.
* Ensuring that only write requests are blocked.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6671")
public void testAddReplicaWhileWritesBlocked() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
Expand All @@ -118,6 +135,9 @@ public void testAddReplicaWhileWritesBlocked() throws Exception {
indexingThread.start();
indexingThread.join();
latch.await();
indexDoc();
totalDocs.incrementAndGet();
refresh(INDEX_NAME);
// index again while we are stale.
assertBusy(() -> {
expectThrows(OpenSearchRejectedExecutionException.class, () -> {
Expand All @@ -142,6 +162,7 @@ public void testAddReplicaWhileWritesBlocked() throws Exception {

// index another doc showing there is no pressure enforced.
indexDoc();
refresh(INDEX_NAME);
waitForSearchableDocs(totalDocs.incrementAndGet(), replicaNodes.toArray(new String[] {}));
verifyStoreContent();
}
Expand Down Expand Up @@ -258,7 +279,7 @@ private void assertFailedRequests(BulkResponse response) {
}

private void indexDoc() {
client().prepareIndex(INDEX_NAME).setId(UUIDs.base64UUID()).setSource("{}", "{}").get();
client().prepareIndex(INDEX_NAME).setId(UUIDs.base64UUID()).setSource("{}", "{}").execute().actionGet();
}

private void assertEqualSegmentInfosVersion(List<String> replicaNames, IndexShard primaryShard) {
Expand Down

0 comments on commit 7b708c4

Please sign in to comment.