diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index daf8e1faf7bb8..19ae52cb51099 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -548,19 +548,35 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc } SearchContext context = createContext(request); + onNewContext(context); boolean success = false; try { putContext(context); - if (request.scroll() != null) { + success = true; + return context; + } finally { + if (success == false) { + freeContext(context.id()); + } + } + } + + private void onNewContext(SearchContext context) { + boolean success = false; + try { + if (context.scrollContext() != null) { openScrollContexts.incrementAndGet(); context.indexShard().getSearchOperationListener().onNewScrollContext(context); } context.indexShard().getSearchOperationListener().onNewContext(context); success = true; - return context; } finally { - if (!success) { - freeContext(context.id()); + // currently, the concrete listener is CompositeListener, which swallows exceptions, but here we anyway try to do the + // right thing by closing and notifying onFreeXXX in case one of the listeners fails with an exception in the future. + if (success == false) { + try (context) { + onFreeContext(context); + } } } } @@ -648,18 +664,23 @@ private void freeAllContextForIndex(Index index) { public boolean freeContext(long id) { try (SearchContext context = removeContext(id)) { if (context != null) { - assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount(); - context.indexShard().getSearchOperationListener().onFreeContext(context); - if (context.scrollContext() != null) { - openScrollContexts.decrementAndGet(); - context.indexShard().getSearchOperationListener().onFreeScrollContext(context); - } + onFreeContext(context); return true; } return false; } } + private void onFreeContext(SearchContext context) { + assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount(); + assert activeContexts.containsKey(context.id()) == false; + context.indexShard().getSearchOperationListener().onFreeContext(context); + if (context.scrollContext() != null) { + openScrollContexts.decrementAndGet(); + context.indexShard().getSearchOperationListener().onFreeScrollContext(context); + } + } + public void freeAllScrollContexts() { for (SearchContext searchContext : activeContexts.values()) { if (searchContext.scrollContext() != null) { diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 90957c2779e8f..f8ef11abe9bf4 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.search; import com.carrotsearch.hppc.IntArrayList; - import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; @@ -51,6 +50,7 @@ import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.index.shard.ShardId; @@ -226,6 +226,7 @@ public void testSearchWhileIndexDeleted() throws InterruptedException { AtomicBoolean running = new AtomicBoolean(true); CountDownLatch startGun = new CountDownLatch(1); Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + final Thread thread = new Thread() { @Override public void run() { @@ -261,12 +262,16 @@ public void onFailure(Exception e) { try { final int rounds = scaledRandomIntBetween(100, 10000); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + SearchRequest scrollSearchRequest = new SearchRequest().allowPartialSearchResults(true) + .scroll(new Scroll(TimeValue.timeValueMinutes(1))); for (int i = 0; i < rounds; i++) { try { try { PlainActionFuture result = new PlainActionFuture<>(); + final boolean useScroll = randomBoolean(); service.executeQueryPhase( - new ShardSearchLocalRequest(searchRequest, indexShard.shardId(), 1, + new ShardSearchLocalRequest(useScroll ? scrollSearchRequest : searchRequest, + indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result); SearchPhaseResult searchPhaseResult = result.get(); @@ -276,6 +281,9 @@ public void onFailure(Exception e) { PlainActionFuture listener = new PlainActionFuture<>(); service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener); listener.get(); + if (useScroll) { + service.freeContext(searchPhaseResult.getRequestId()); + } } catch (ExecutionException ex) { assertThat(ex.getCause(), instanceOf(RuntimeException.class)); throw ((RuntimeException)ex.getCause()); @@ -293,6 +301,13 @@ public void onFailure(Exception e) { thread.join(); semaphore.acquire(Integer.MAX_VALUE); } + + assertEquals(0, service.getActiveContexts()); + + SearchStats.Stats totalStats = indexShard.searchStats().getTotal(); + assertEquals(0, totalStats.getQueryCurrent()); + assertEquals(0, totalStats.getScrollCurrent()); + assertEquals(0, totalStats.getFetchCurrent()); } public void testTimeout() throws IOException {