From 5f86ea10fd4b3adbe6262148aff23d6d73973f71 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Tue, 28 May 2019 13:29:43 +0200 Subject: [PATCH 1/6] Fix concurrent search and index delete Changed order of listener invocation so that we notify before registering search context and notify after unregistering same. This ensures that count up/down like what we do in ShardSearchStats works. Otherwise, we risk notifying onFreeScrollContext before notifying onNewScrollContext (same for onFreeContext/onNewContext, but we currently have no assertions failing in those). --- .../java/org/elasticsearch/search/SearchService.java | 10 +++++++++- .../org/elasticsearch/search/SearchServiceTests.java | 10 ++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index daf8e1faf7bb8..3f8a3bc495c3c 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; @@ -550,12 +551,19 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc SearchContext context = createContext(request); boolean success = false; try { - putContext(context); if (request.scroll() != null) { openScrollContexts.incrementAndGet(); context.indexShard().getSearchOperationListener().onNewScrollContext(context); } context.indexShard().getSearchOperationListener().onNewContext(context); + putContext(context); + // ensure that if index is deleted concurrently, we free the context immediately, either here or in afterIndexRemoved + try { + indicesService.indexServiceSafe(request.shardId().getIndex()); + } catch (IndexNotFoundException e) { + freeContext(context.id()); + throw e; + } success = true; return context; } finally { diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 90957c2779e8f..4e8d361c30666 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.search; import com.carrotsearch.hppc.IntArrayList; - import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; @@ -261,12 +260,16 @@ public void onFailure(Exception e) { try { final int rounds = scaledRandomIntBetween(100, 10000); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + SearchRequest scrollSearchRequest = new SearchRequest().allowPartialSearchResults(true) + .scroll(new Scroll(TimeValue.timeValueMinutes(1))); for (int i = 0; i < rounds; i++) { try { try { PlainActionFuture result = new PlainActionFuture<>(); + boolean useScroll = randomBoolean(); service.executeQueryPhase( - new ShardSearchLocalRequest(searchRequest, indexShard.shardId(), 1, + new ShardSearchLocalRequest(useScroll ? searchRequest : scrollSearchRequest, + indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result); SearchPhaseResult searchPhaseResult = result.get(); @@ -276,6 +279,9 @@ public void onFailure(Exception e) { PlainActionFuture listener = new PlainActionFuture<>(); service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener); listener.get(); + if (useScroll) { + service.freeContext(searchPhaseResult.requestId); + } } catch (ExecutionException ex) { assertThat(ex.getCause(), instanceOf(RuntimeException.class)); throw ((RuntimeException)ex.getCause()); From 8edda7b36f7009f6d01d44fabe97e94153d7d06e Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Wed, 29 May 2019 14:46:28 +0200 Subject: [PATCH 2/6] Removed duplicate catch block. --- .../main/java/org/elasticsearch/search/SearchService.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 3f8a3bc495c3c..b931529d8905c 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -44,7 +44,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; @@ -558,12 +557,7 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc context.indexShard().getSearchOperationListener().onNewContext(context); putContext(context); // ensure that if index is deleted concurrently, we free the context immediately, either here or in afterIndexRemoved - try { - indicesService.indexServiceSafe(request.shardId().getIndex()); - } catch (IndexNotFoundException e) { - freeContext(context.id()); - throw e; - } + indicesService.indexServiceSafe(request.shardId().getIndex()); success = true; return context; } finally { From ac02fc0fc69915bcfd9498398f8be4ae68e5b772 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 3 Jun 2019 09:06:30 +0200 Subject: [PATCH 3/6] Always call onFree plus test fix --- .../elasticsearch/search/SearchService.java | 44 +++++++++++++------ .../search/SearchServiceTests.java | 14 ++++-- 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index b931529d8905c..cfb23a29c6c16 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -548,25 +548,39 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc } SearchContext context = createContext(request); + onNewContext(context); boolean success = false; try { - if (request.scroll() != null) { - openScrollContexts.incrementAndGet(); - context.indexShard().getSearchOperationListener().onNewScrollContext(context); - } - context.indexShard().getSearchOperationListener().onNewContext(context); putContext(context); - // ensure that if index is deleted concurrently, we free the context immediately, either here or in afterIndexRemoved + // ensure that if index is deleted concurrently, we free the context immediately, either here or in freeAllContextForIndex indicesService.indexServiceSafe(request.shardId().getIndex()); success = true; return context; } finally { - if (!success) { + if (success == false) { freeContext(context.id()); } } } + private void onNewContext(SearchContext context) { + boolean success = false; + try { + if (context.scrollContext() != null) { + openScrollContexts.incrementAndGet(); + context.indexShard().getSearchOperationListener().onNewScrollContext(context); + } + context.indexShard().getSearchOperationListener().onNewContext(context); + success = true; + } finally { + // currently, the concrete listener is CompositeListener, which swallows exceptions, but here we anyway try to do the + // right thing by notifying onFreeXXX in case one of the listeners fails with an exception in the future. + if (success == false) { + onFreeContext(context); + } + } + } + final SearchContext createContext(ShardSearchRequest request) throws IOException { final DefaultSearchContext context = createSearchContext(request, defaultSearchTimeout); try { @@ -650,18 +664,22 @@ private void freeAllContextForIndex(Index index) { public boolean freeContext(long id) { try (SearchContext context = removeContext(id)) { if (context != null) { - assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount(); - context.indexShard().getSearchOperationListener().onFreeContext(context); - if (context.scrollContext() != null) { - openScrollContexts.decrementAndGet(); - context.indexShard().getSearchOperationListener().onFreeScrollContext(context); - } + onFreeContext(context); return true; } return false; } } + private void onFreeContext(SearchContext context) { + assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount(); + context.indexShard().getSearchOperationListener().onFreeContext(context); + if (context.scrollContext() != null) { + openScrollContexts.decrementAndGet(); + context.indexShard().getSearchOperationListener().onFreeScrollContext(context); + } + } + public void freeAllScrollContexts() { for (SearchContext searchContext : activeContexts.values()) { if (searchContext.scrollContext() != null) { diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 4e8d361c30666..42a093698ad0d 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.index.shard.ShardId; @@ -266,9 +267,9 @@ public void onFailure(Exception e) { try { try { PlainActionFuture result = new PlainActionFuture<>(); - boolean useScroll = randomBoolean(); + final boolean useScroll = randomBoolean(); service.executeQueryPhase( - new ShardSearchLocalRequest(useScroll ? searchRequest : scrollSearchRequest, + new ShardSearchLocalRequest(useScroll ? scrollSearchRequest : searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result); @@ -280,7 +281,7 @@ public void onFailure(Exception e) { service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener); listener.get(); if (useScroll) { - service.freeContext(searchPhaseResult.requestId); + service.freeContext(searchPhaseResult.getRequestId()); } } catch (ExecutionException ex) { assertThat(ex.getCause(), instanceOf(RuntimeException.class)); @@ -299,6 +300,13 @@ public void onFailure(Exception e) { thread.join(); semaphore.acquire(Integer.MAX_VALUE); } + + assertEquals(0, service.getActiveContexts()); + + SearchStats.Stats totalStats = indexShard.searchStats().getTotal(); + assertEquals(0, totalStats.getQueryCurrent()); + assertEquals(0, totalStats.getScrollCurrent()); + assertEquals(0, totalStats.getFetchCurrent()); } public void testTimeout() throws IOException { From 49dfb3ccc906f1e9db7b9e1070f14f10ca15f9f7 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Mon, 3 Jun 2019 11:34:42 +0200 Subject: [PATCH 4/6] Close search context if onNewXXX fails. --- .../main/java/org/elasticsearch/search/SearchService.java | 6 ++++-- .../java/org/elasticsearch/search/SearchServiceTests.java | 5 +++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index cfb23a29c6c16..f7b3fbe77e328 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -574,9 +574,11 @@ private void onNewContext(SearchContext context) { success = true; } finally { // currently, the concrete listener is CompositeListener, which swallows exceptions, but here we anyway try to do the - // right thing by notifying onFreeXXX in case one of the listeners fails with an exception in the future. + // right thing by closing and notifying onFreeXXX in case one of the listeners fails with an exception in the future. if (success == false) { - onFreeContext(context); + try (context) { + onFreeContext(context); + } } } } diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 42a093698ad0d..9e999a6af4214 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -226,6 +226,8 @@ public void testSearchWhileIndexDeleted() throws InterruptedException { AtomicBoolean running = new AtomicBoolean(true); CountDownLatch startGun = new CountDownLatch(1); Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + final int expectedStoreRefCount = indexShard.store().refCount(); + final Thread thread = new Thread() { @Override public void run() { @@ -307,6 +309,9 @@ public void onFailure(Exception e) { assertEquals(0, totalStats.getQueryCurrent()); assertEquals(0, totalStats.getScrollCurrent()); assertEquals(0, totalStats.getFetchCurrent()); + + // check searchers are not leaked. + assertEquals(expectedStoreRefCount, indexShard.store().refCount()); } public void testTimeout() throws IOException { From 5fe98929c29fa22aaa383742bb7fe10f6afa9c87 Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Tue, 4 Jun 2019 13:17:27 +0200 Subject: [PATCH 5/6] Removed double check for index delete. Removed double check for index delete to avoid the performance overhead. Removed validation of store ref count, since IndexService periodically does a shard refresh, which disturbs this validation. --- .../src/main/java/org/elasticsearch/search/SearchService.java | 2 -- .../java/org/elasticsearch/search/SearchServiceTests.java | 4 ---- 2 files changed, 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index f7b3fbe77e328..708aa006c6e74 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -552,8 +552,6 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc boolean success = false; try { putContext(context); - // ensure that if index is deleted concurrently, we free the context immediately, either here or in freeAllContextForIndex - indicesService.indexServiceSafe(request.shardId().getIndex()); success = true; return context; } finally { diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 9e999a6af4214..f8ef11abe9bf4 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -226,7 +226,6 @@ public void testSearchWhileIndexDeleted() throws InterruptedException { AtomicBoolean running = new AtomicBoolean(true); CountDownLatch startGun = new CountDownLatch(1); Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); - final int expectedStoreRefCount = indexShard.store().refCount(); final Thread thread = new Thread() { @Override @@ -309,9 +308,6 @@ public void onFailure(Exception e) { assertEquals(0, totalStats.getQueryCurrent()); assertEquals(0, totalStats.getScrollCurrent()); assertEquals(0, totalStats.getFetchCurrent()); - - // check searchers are not leaked. - assertEquals(expectedStoreRefCount, indexShard.store().refCount()); } public void testTimeout() throws IOException { From b7a24f40ed7ea3c2f89e7d7dd3f6eb5c511379af Mon Sep 17 00:00:00 2001 From: Henning Andersen Date: Thu, 6 Jun 2019 13:17:29 +0200 Subject: [PATCH 6/6] Assert that context is inactive when freeing. --- server/src/main/java/org/elasticsearch/search/SearchService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 708aa006c6e74..19ae52cb51099 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -673,6 +673,7 @@ public boolean freeContext(long id) { private void onFreeContext(SearchContext context) { assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount(); + assert activeContexts.containsKey(context.id()) == false; context.indexShard().getSearchOperationListener().onFreeContext(context); if (context.scrollContext() != null) { openScrollContexts.decrementAndGet();