Skip to content

Commit

Permalink
Fix flaky SearchCancellationIT tests to avoid race condition (#5656)
Browse files Browse the repository at this point in the history
* Add waiting time to account for Thread.sleep inaccuracy

Signed-off-by: Daniel Widdis <widdis@gmail.com>
  • Loading branch information
dbwiddis authored Dec 29, 2022
1 parent f2b5044 commit d248643
Showing 1 changed file with 41 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
Expand All @@ -88,6 +87,10 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE)
public class SearchCancellationIT extends OpenSearchIntegTestCase {

private TimeValue requestCancellationTimeout = TimeValue.timeValueSeconds(1);
private TimeValue clusterCancellationTimeout = TimeValue.timeValueMillis(1500);
private TimeValue keepAlive = TimeValue.timeValueSeconds(5);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(ScriptedBlockPlugin.class);
Expand Down Expand Up @@ -233,15 +236,13 @@ public void testCancellationDuringQueryPhaseUsingRequestParameter() throws Excep
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();

TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setCancelAfterTimeInterval(cancellationTimeout)
.setCancelAfterTimeInterval(requestCancellationTimeout)
.setAllowPartialSearchResults(randomBoolean())
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute();
awaitForBlock(plugins);
// sleep for cancellation timeout to ensure scheduled cancellation task is actually executed
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(requestCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
ensureSearchWasCancelled(searchResponse);
Expand All @@ -251,19 +252,19 @@ public void testCancellationDuringQueryPhaseUsingClusterSetting() throws Excepti
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();

TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout).build())
.setPersistentSettings(
Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, clusterCancellationTimeout).build()
)
.get();
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setAllowPartialSearchResults(randomBoolean())
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute();
awaitForBlock(plugins);
// sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(clusterCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
ensureSearchWasCancelled(searchResponse);
Expand All @@ -288,14 +289,12 @@ public void testCancellationDuringFetchPhase() throws Exception {
public void testCancellationDuringFetchPhaseUsingRequestParameter() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setCancelAfterTimeInterval(cancellationTimeout)
.setCancelAfterTimeInterval(requestCancellationTimeout)
.addScriptField("test_field", new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))
.execute();
awaitForBlock(plugins);
// sleep for request cancellation timeout to ensure scheduled cancellation task is actually executed
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(requestCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
ensureSearchWasCancelled(searchResponse);
Expand All @@ -307,7 +306,7 @@ public void testCancellationOfScrollSearches() throws Exception {

logger.info("Executing search");
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setScroll(TimeValue.timeValueSeconds(10))
.setScroll(keepAlive)
.setSize(5)
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.execute();
Expand All @@ -326,16 +325,16 @@ public void testCancellationOfScrollSearches() throws Exception {
public void testCancellationOfFirstScrollSearchRequestUsingRequestParameter() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setScroll(TimeValue.timeValueSeconds(10))
.setCancelAfterTimeInterval(cancellationTimeout)
.setScroll(keepAlive)
.setCancelAfterTimeInterval(requestCancellationTimeout)
.setSize(5)
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.execute();

awaitForBlock(plugins);
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(requestCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
SearchResponse response = ensureSearchWasCancelled(searchResponse);
if (response != null) {
Expand All @@ -354,7 +353,6 @@ public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exceptio
disableBlocks(plugins);

logger.info("Executing search");
TimeValue keepAlive = TimeValue.timeValueSeconds(5);
SearchResponse searchResponse = client().prepareSearch("test")
.setScroll(keepAlive)
.setSize(2)
Expand Down Expand Up @@ -394,11 +392,9 @@ public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception

// Disable block so the first request would pass
disableBlocks(plugins);
TimeValue keepAlive = TimeValue.timeValueSeconds(5);
TimeValue cancellationTimeout = TimeValue.timeValueSeconds(2);
SearchResponse searchResponse = client().prepareSearch("test")
.setScroll(keepAlive)
.setCancelAfterTimeInterval(cancellationTimeout)
.setCancelAfterTimeInterval(requestCancellationTimeout)
.setSize(2)
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())))
.get();
Expand All @@ -418,8 +414,8 @@ public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception
.execute();

awaitForBlock(plugins);
// sleep for cancellation timeout to ensure there is no scheduled task for cancellation
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(requestCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);

// wait for response and ensure there is no failure
Expand All @@ -432,20 +428,20 @@ public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception
public void testDisableCancellationAtRequestLevel() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout).build())
.setPersistentSettings(
Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, clusterCancellationTimeout).build()
)
.get();
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setAllowPartialSearchResults(randomBoolean())
.setCancelAfterTimeInterval(NO_TIMEOUT)
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute();
awaitForBlock(plugins);
// sleep for cancellation timeout to ensure there is no scheduled task for cancellation
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(clusterCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
// ensure search was successful since cancellation was disabled at request level
Expand All @@ -455,7 +451,6 @@ public void testDisableCancellationAtRequestLevel() throws Exception {
public void testDisableCancellationAtClusterLevel() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
client().admin()
.cluster()
.prepareUpdateSettings()
Expand All @@ -466,8 +461,7 @@ public void testDisableCancellationAtClusterLevel() throws Exception {
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute();
awaitForBlock(plugins);
// sleep for cancellation timeout to ensure there is no scheduled task for cancellation
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(clusterCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
// ensure search was successful since cancellation was disabled at request level
Expand Down Expand Up @@ -501,11 +495,12 @@ public void testCancelMultiSearch() throws Exception {
public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout).build())
.setPersistentSettings(
Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, clusterCancellationTimeout).build()
)
.get();
ActionFuture<MultiSearchResponse> mSearchResponse = client().prepareMultiSearch()
.setMaxConcurrentSearchRequests(2)
Expand All @@ -526,8 +521,7 @@ public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws
)
.execute();
awaitForBlock(plugins);
// sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed
Thread.sleep(cancellationTimeout.getMillis());
sleepForAtLeast(clusterCancellationTimeout.getMillis());
// unblock the search thread
disableBlocks(plugins);
// both child requests are expected to fail
Expand All @@ -544,8 +538,6 @@ public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws
public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception {
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
TimeValue reqCancellationTimeout = new TimeValue(2, TimeUnit.SECONDS);
TimeValue clusterCancellationTimeout = new TimeValue(3, TimeUnit.SECONDS);
client().admin()
.cluster()
.prepareUpdateSettings()
Expand All @@ -558,7 +550,7 @@ public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception
.add(
client().prepareSearch("test")
.setAllowPartialSearchResults(randomBoolean())
.setCancelAfterTimeInterval(reqCancellationTimeout)
.setCancelAfterTimeInterval(requestCancellationTimeout)
.setQuery(
scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))
)
Expand All @@ -581,8 +573,7 @@ public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception
)
.execute();
awaitForBlock(plugins);
// sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed
Thread.sleep(Math.max(reqCancellationTimeout.getMillis(), clusterCancellationTimeout.getMillis()));
sleepForAtLeast(Math.max(requestCancellationTimeout.getMillis(), clusterCancellationTimeout.getMillis()));
// unblock the search thread
disableBlocks(plugins);
// only first and last child request are expected to fail
Expand All @@ -592,6 +583,16 @@ public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception
ensureMSearchWasCancelled(mSearchResponse, expectedFailedRequests);
}

/**
* Sleeps for the specified number of milliseconds plus a 100ms buffer to account for system timer/scheduler inaccuracies.
*
* @param milliseconds The minimum time to sleep
* @throws InterruptedException if interrupted during sleep
*/
private static void sleepForAtLeast(long milliseconds) throws InterruptedException {
Thread.sleep(milliseconds + 100L);
}

public static class ScriptedBlockPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_block";

Expand Down

0 comments on commit d248643

Please sign in to comment.