From d248643fc51ba9f263cdc2c6fa90405e63107328 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Thu, 29 Dec 2022 12:41:38 -0800 Subject: [PATCH] Fix flaky SearchCancellationIT tests to avoid race condition (#5656) * Add waiting time to account for Thread.sleep inaccuracy Signed-off-by: Daniel Widdis --- .../search/SearchCancellationIT.java | 81 ++++++++++--------- 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java index da5698918cf99..7f500b4e25cea 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java @@ -69,7 +69,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -88,6 +87,10 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE) public class SearchCancellationIT extends OpenSearchIntegTestCase { + private TimeValue requestCancellationTimeout = TimeValue.timeValueSeconds(1); + private TimeValue clusterCancellationTimeout = TimeValue.timeValueMillis(1500); + private TimeValue keepAlive = TimeValue.timeValueSeconds(5); + @Override protected Collection> nodePlugins() { return Collections.singleton(ScriptedBlockPlugin.class); @@ -233,15 +236,13 @@ public void testCancellationDuringQueryPhaseUsingRequestParameter() throws Excep List plugins = initBlockFactory(); indexTestData(); - TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); ActionFuture searchResponse = client().prepareSearch("test") - .setCancelAfterTimeInterval(cancellationTimeout) + .setCancelAfterTimeInterval(requestCancellationTimeout) .setAllowPartialSearchResults(randomBoolean()) .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) .execute(); awaitForBlock(plugins); - // sleep for cancellation timeout to ensure scheduled cancellation task is actually executed - Thread.sleep(cancellationTimeout.getMillis()); + sleepForAtLeast(requestCancellationTimeout.getMillis()); // unblock the search thread disableBlocks(plugins); ensureSearchWasCancelled(searchResponse); @@ -251,19 +252,19 @@ public void testCancellationDuringQueryPhaseUsingClusterSetting() throws Excepti List plugins = initBlockFactory(); indexTestData(); - TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); client().admin() .cluster() .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout).build()) + .setPersistentSettings( + Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, clusterCancellationTimeout).build() + ) .get(); ActionFuture searchResponse = client().prepareSearch("test") .setAllowPartialSearchResults(randomBoolean()) .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) .execute(); awaitForBlock(plugins); - // sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed - Thread.sleep(cancellationTimeout.getMillis()); + sleepForAtLeast(clusterCancellationTimeout.getMillis()); // unblock the search thread disableBlocks(plugins); ensureSearchWasCancelled(searchResponse); @@ -288,14 +289,12 @@ public void testCancellationDuringFetchPhase() throws Exception { public void testCancellationDuringFetchPhaseUsingRequestParameter() throws Exception { List plugins = initBlockFactory(); indexTestData(); - TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); ActionFuture searchResponse = client().prepareSearch("test") - .setCancelAfterTimeInterval(cancellationTimeout) + .setCancelAfterTimeInterval(requestCancellationTimeout) .addScriptField("test_field", new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())) .execute(); awaitForBlock(plugins); - // sleep for request cancellation timeout to ensure scheduled cancellation task is actually executed - Thread.sleep(cancellationTimeout.getMillis()); + sleepForAtLeast(requestCancellationTimeout.getMillis()); // unblock the search thread disableBlocks(plugins); ensureSearchWasCancelled(searchResponse); @@ -307,7 +306,7 @@ public void testCancellationOfScrollSearches() throws Exception { logger.info("Executing search"); ActionFuture searchResponse = client().prepareSearch("test") - .setScroll(TimeValue.timeValueSeconds(10)) + .setScroll(keepAlive) .setSize(5) .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) .execute(); @@ -326,16 +325,16 @@ public void testCancellationOfScrollSearches() throws Exception { public void testCancellationOfFirstScrollSearchRequestUsingRequestParameter() throws Exception { List plugins = initBlockFactory(); indexTestData(); - TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); ActionFuture searchResponse = client().prepareSearch("test") - .setScroll(TimeValue.timeValueSeconds(10)) - .setCancelAfterTimeInterval(cancellationTimeout) + .setScroll(keepAlive) + .setCancelAfterTimeInterval(requestCancellationTimeout) .setSize(5) .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) .execute(); awaitForBlock(plugins); - Thread.sleep(cancellationTimeout.getMillis()); + sleepForAtLeast(requestCancellationTimeout.getMillis()); + // unblock the search thread disableBlocks(plugins); SearchResponse response = ensureSearchWasCancelled(searchResponse); if (response != null) { @@ -354,7 +353,6 @@ public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exceptio disableBlocks(plugins); logger.info("Executing search"); - TimeValue keepAlive = TimeValue.timeValueSeconds(5); SearchResponse searchResponse = client().prepareSearch("test") .setScroll(keepAlive) .setSize(2) @@ -394,11 +392,9 @@ public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception // Disable block so the first request would pass disableBlocks(plugins); - TimeValue keepAlive = TimeValue.timeValueSeconds(5); - TimeValue cancellationTimeout = TimeValue.timeValueSeconds(2); SearchResponse searchResponse = client().prepareSearch("test") .setScroll(keepAlive) - .setCancelAfterTimeInterval(cancellationTimeout) + .setCancelAfterTimeInterval(requestCancellationTimeout) .setSize(2) .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) .get(); @@ -418,8 +414,8 @@ public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception .execute(); awaitForBlock(plugins); - // sleep for cancellation timeout to ensure there is no scheduled task for cancellation - Thread.sleep(cancellationTimeout.getMillis()); + sleepForAtLeast(requestCancellationTimeout.getMillis()); + // unblock the search thread disableBlocks(plugins); // wait for response and ensure there is no failure @@ -432,11 +428,12 @@ public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception public void testDisableCancellationAtRequestLevel() throws Exception { List plugins = initBlockFactory(); indexTestData(); - TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); client().admin() .cluster() .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout).build()) + .setPersistentSettings( + Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, clusterCancellationTimeout).build() + ) .get(); ActionFuture searchResponse = client().prepareSearch("test") .setAllowPartialSearchResults(randomBoolean()) @@ -444,8 +441,7 @@ public void testDisableCancellationAtRequestLevel() throws Exception { .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) .execute(); awaitForBlock(plugins); - // sleep for cancellation timeout to ensure there is no scheduled task for cancellation - Thread.sleep(cancellationTimeout.getMillis()); + sleepForAtLeast(clusterCancellationTimeout.getMillis()); // unblock the search thread disableBlocks(plugins); // ensure search was successful since cancellation was disabled at request level @@ -455,7 +451,6 @@ public void testDisableCancellationAtRequestLevel() throws Exception { public void testDisableCancellationAtClusterLevel() throws Exception { List plugins = initBlockFactory(); indexTestData(); - TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); client().admin() .cluster() .prepareUpdateSettings() @@ -466,8 +461,7 @@ public void testDisableCancellationAtClusterLevel() throws Exception { .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) .execute(); awaitForBlock(plugins); - // sleep for cancellation timeout to ensure there is no scheduled task for cancellation - Thread.sleep(cancellationTimeout.getMillis()); + sleepForAtLeast(clusterCancellationTimeout.getMillis()); // unblock the search thread disableBlocks(plugins); // ensure search was successful since cancellation was disabled at request level @@ -501,11 +495,12 @@ public void testCancelMultiSearch() throws Exception { public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws Exception { List plugins = initBlockFactory(); indexTestData(); - TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); client().admin() .cluster() .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout).build()) + .setPersistentSettings( + Settings.builder().put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, clusterCancellationTimeout).build() + ) .get(); ActionFuture mSearchResponse = client().prepareMultiSearch() .setMaxConcurrentSearchRequests(2) @@ -526,8 +521,7 @@ public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws ) .execute(); awaitForBlock(plugins); - // sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed - Thread.sleep(cancellationTimeout.getMillis()); + sleepForAtLeast(clusterCancellationTimeout.getMillis()); // unblock the search thread disableBlocks(plugins); // both child requests are expected to fail @@ -544,8 +538,6 @@ public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception { List plugins = initBlockFactory(); indexTestData(); - TimeValue reqCancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); - TimeValue clusterCancellationTimeout = new TimeValue(3, TimeUnit.SECONDS); client().admin() .cluster() .prepareUpdateSettings() @@ -558,7 +550,7 @@ public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception .add( client().prepareSearch("test") .setAllowPartialSearchResults(randomBoolean()) - .setCancelAfterTimeInterval(reqCancellationTimeout) + .setCancelAfterTimeInterval(requestCancellationTimeout) .setQuery( scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())) ) @@ -581,8 +573,7 @@ public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception ) .execute(); awaitForBlock(plugins); - // sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed - Thread.sleep(Math.max(reqCancellationTimeout.getMillis(), clusterCancellationTimeout.getMillis())); + sleepForAtLeast(Math.max(requestCancellationTimeout.getMillis(), clusterCancellationTimeout.getMillis())); // unblock the search thread disableBlocks(plugins); // only first and last child request are expected to fail @@ -592,6 +583,16 @@ public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception ensureMSearchWasCancelled(mSearchResponse, expectedFailedRequests); } + /** + * Sleeps for the specified number of milliseconds plus a 100ms buffer to account for system timer/scheduler inaccuracies. + * + * @param milliseconds The minimum time to sleep + * @throws InterruptedException if interrupted during sleep + */ + private static void sleepForAtLeast(long milliseconds) throws InterruptedException { + Thread.sleep(milliseconds + 100L); + } + public static class ScriptedBlockPlugin extends MockScriptPlugin { static final String SCRIPT_NAME = "search_block";