Skip to content

Commit

Permalink
Fix flaky test BulkIntegrationIT.testDeleteIndexWhileIndexing
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <poojiraj@amazon.com>
  • Loading branch information
Poojita-Raj committed Dec 8, 2022
1 parent 2416d37 commit b6f841e
Showing 1 changed file with 17 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,34 +193,29 @@ public void testDeleteIndexWhileIndexing() throws Exception {
String index = "deleted_while_indexing";
createIndex(index);
AtomicBoolean stopped = new AtomicBoolean();
Thread[] threads = new Thread[between(1, 4)];
AtomicInteger docID = new AtomicInteger();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
while (stopped.get() == false && docID.get() < 5000) {
String id = Integer.toString(docID.incrementAndGet());
try {
IndexResponse response = client().prepareIndex(index)
.setId(id)
.setSource(Collections.singletonMap("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON)
.get();
assertThat(response.getResult(), is(oneOf(CREATED, UPDATED)));
logger.info("--> index id={} seq_no={}", response.getId(), response.getSeqNo());
} catch (OpenSearchException ignore) {
logger.info("--> fail to index id={}", id);
}
Thread thread = new Thread(() -> {
while (stopped.get() == false && docID.get() < 5000) {
String id = Integer.toString(docID.incrementAndGet());
try {
IndexResponse response = client().prepareIndex(index)
.setId(id)
.setSource(Collections.singletonMap("f" + randomIntBetween(1, 10), randomNonNegativeLong()), XContentType.JSON)
.get();
assertThat(response.getResult(), is(oneOf(CREATED, UPDATED)));
logger.info("--> index id={} seq_no={}", response.getId(), response.getSeqNo());
} catch (OpenSearchException ignore) {
logger.info("--> fail to index id={}", id);
}
});
threads[i].start();
}
}
});
thread.start();
ensureGreen(index);
assertBusy(() -> assertThat(docID.get(), greaterThanOrEqualTo(1)));
assertAcked(client().admin().indices().prepareDelete(index));
stopped.set(true);
for (Thread thread : threads) {
thread.join(ReplicationRequest.DEFAULT_TIMEOUT.millis() / 2);
assertFalse(thread.isAlive());
}
thread.join(ReplicationRequest.DEFAULT_TIMEOUT.millis() / 2);
assertFalse(thread.isAlive());
}

}

0 comments on commit b6f841e

Please sign in to comment.