From 96c8388ddabe9d8b08b1844a1a13eeae4f99aa7c Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Tue, 9 Jan 2024 18:17:17 -0800 Subject: [PATCH] include SearchRequest in SearchContext and rename to SearchRequestOperationsCompositeListenerFactory Signed-off-by: Chenyang Ji --- CHANGELOG.md | 2 +- .../action/search/SearchRequestContext.java | 21 +--- ...stOperationsCompositeListenerFactory.java} | 15 ++- .../SearchRequestOperationsListener.java | 4 +- .../action/search/TransportSearchAction.java | 15 ++- .../main/java/org/opensearch/node/Node.java | 23 ++--- .../AbstractSearchAsyncActionTests.java | 13 ++- .../CanMatchPreFilterSearchPhaseTests.java | 31 ++++-- .../action/search/SearchAsyncActionTests.java | 12 ++- .../SearchQueryThenFetchAsyncActionTests.java | 7 +- ...rationsCompositeListenerFactoryTests.java} | 23 +++-- ...earchRequestOperationsListenerSupport.java | 12 ++- .../search/SearchRequestSlowLogTests.java | 25 +++-- .../search/SearchRequestStatsTests.java | 18 +++- .../search/SearchResponseMergerTests.java | 98 ++++++++++++++++--- .../search/TransportSearchActionTests.java | 31 ++++-- .../snapshots/SnapshotResiliencyTests.java | 7 +- 17 files changed, 257 insertions(+), 100 deletions(-) rename server/src/main/java/org/opensearch/action/search/{SearchRequestOperationsListeners.java => SearchRequestOperationsCompositeListenerFactory.java} (83%) rename server/src/test/java/org/opensearch/action/search/{SearchRequestOperationsListenersTests.java => SearchRequestOperationsCompositeListenerFactoryTests.java} (85%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 64f71157dbfde..1f64dcf82bd4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -193,7 +193,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add deleted doc count in _cat/shards ([#11678](https://github.com/opensearch-project/OpenSearch/pull/11678)) - Capture information for additional query types and aggregation types ([#11582](https://github.com/opensearch-project/OpenSearch/pull/11582)) - Use slice_size == shard_size heuristic in terms aggs for concurrent segment search and properly calculate the doc_count_error ([#11732](https://github.com/opensearch-project/OpenSearch/pull/11732)) -- Added Support for dynamically adding SearchRequestOperationsListeners ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526)) +- Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index 7f2e71afec5ce..eceac7204b196 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -8,13 +8,11 @@ package org.opensearch.action.search; -import org.apache.logging.log4j.LogManager; import org.apache.lucene.search.TotalHits; import org.opensearch.common.annotation.InternalApi; import java.util.EnumMap; import java.util.HashMap; -import java.util.List; import java.util.Locale; import java.util.Map; @@ -31,25 +29,14 @@ class SearchRequestContext { private TotalHits totalHits; private final EnumMap shardStats; - private final boolean phaseTookEnabled; + private final SearchRequest searchRequest; - /** - * This constructor is for testing only - */ - SearchRequestContext() { - this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), false); - } - - SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, boolean phaseTookEnabled) { + SearchRequestContext(final SearchRequestOperationsListener searchRequestOperationsListener, final SearchRequest searchRequest) { this.searchRequestOperationsListener = searchRequestOperationsListener; this.absoluteStartNanos = System.nanoTime(); this.phaseTookMap = new HashMap<>(); this.shardStats = new EnumMap<>(ShardStatsFieldNames.class); - this.phaseTookEnabled = phaseTookEnabled; - } - - SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) { - this(searchRequestOperationsListener, false); + this.searchRequest = searchRequest; } SearchRequestOperationsListener getSearchRequestOperationsListener() { @@ -65,7 +52,7 @@ Map phaseTookMap() { } SearchResponse.PhaseTook getPhaseTook() { - if (phaseTookEnabled) { + if (searchRequest != null && searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) { return new SearchResponse.PhaseTook(phaseTookMap); } else { return null; diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListeners.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java similarity index 83% rename from server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListeners.java rename to server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java index 237d25169bdd2..db487bf945889 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListeners.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactory.java @@ -17,25 +17,25 @@ import java.util.stream.Stream; /** - * SearchRequestOperationsListeners contains listeners registered to search requests, + * SearchRequestOperationsCompositeListenerFactory contains listeners registered to search requests, * and is responsible for creating the {@link SearchRequestOperationsListener.CompositeListener} * with the all listeners enabled at cluster-level and request-level. * * * @opensearch.internal */ -public class SearchRequestOperationsListeners { +public final class SearchRequestOperationsCompositeListenerFactory { private final List searchRequestListenersList; /** - * Create the SearchRequestOperationsListeners and add multiple {@link SearchRequestOperationsListener} + * Create the SearchRequestOperationsCompositeListenerFactory and add multiple {@link SearchRequestOperationsListener} * to the searchRequestListenersList. * Those enabled listeners will be executed during each search request. * * @param listeners Multiple SearchRequestOperationsListener object to add. * @throws IllegalArgumentException if any input listener is null. */ - public SearchRequestOperationsListeners(SearchRequestOperationsListener... listeners) { + public SearchRequestOperationsCompositeListenerFactory(final SearchRequestOperationsListener... listeners) { searchRequestListenersList = new ArrayList<>(); for (SearchRequestOperationsListener listener : listeners) { if (listener == null) { @@ -49,7 +49,6 @@ public SearchRequestOperationsListeners(SearchRequestOperationsListener... liste * Get searchRequestListenersList, * * @return List of SearchRequestOperationsListener - * @throws IllegalArgumentException if the input listener is null or already exists in the list. */ public List getListeners() { return searchRequestListenersList; @@ -65,9 +64,9 @@ public List getListeners() { * @return SearchRequestOperationsListener.CompositeListener */ public SearchRequestOperationsListener.CompositeListener buildCompositeListener( - SearchRequest searchRequest, - Logger logger, - SearchRequestOperationsListener... perRequestListeners + final SearchRequest searchRequest, + final Logger logger, + final SearchRequestOperationsListener... perRequestListeners ) { final List searchListenersList = Stream.concat( searchRequestListenersList.stream(), diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 99619d9f9c900..2a09cc084f79f 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -27,7 +27,7 @@ protected SearchRequestOperationsListener() { this.enabled = true; } - protected SearchRequestOperationsListener(boolean enabled) { + protected SearchRequestOperationsListener(final boolean enabled) { this.enabled = enabled; } @@ -49,7 +49,7 @@ boolean isEnabled() { return enabled; } - protected void setEnabled(boolean enabled) { + protected void setEnabled(final boolean enabled) { this.enabled = enabled; } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 0738ce0c5ca68..842c10b700d24 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -172,7 +172,7 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); this.client = client; @@ -212,7 +212,7 @@ public TransportSearchAction( this.searchPipelineService = searchPipelineService; this.metricsRegistry = metricsRegistry; this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING); - this.searchRequestOperationsListeners = searchRequestOperationsListeners; + this.searchRequestOperationsCompositeListenerFactory = searchRequestOperationsCompositeListenerFactory; clusterService.getClusterSettings() .addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled); } @@ -428,15 +428,12 @@ private void executeRequest( relativeStartNanos, System::nanoTime ); - final boolean phaseTookEnabled; if (originalSearchRequest.isPhaseTook() == null) { - phaseTookEnabled = clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED); - } else { - phaseTookEnabled = originalSearchRequest.isPhaseTook(); + originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED)); } - SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsListeners + SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsCompositeListenerFactory .buildCompositeListener(originalSearchRequest, logger); - SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, phaseTookEnabled); + SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest); searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext); PipelinedRequest searchRequest; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 0a08885006809..8510122c39fcb 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -46,8 +46,8 @@ import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; +import org.opensearch.action.search.SearchRequestOperationsCompositeListenerFactory; import org.opensearch.action.search.SearchRequestOperationsListener; -import org.opensearch.action.search.SearchRequestOperationsListeners; import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchTransportService; @@ -881,15 +881,16 @@ protected Node( ) .collect(Collectors.toList()); - // register all standard SearchRequestOperationsListeners to the SearchRequestOperationsListeners - final SearchRequestOperationsListeners searchRequestOperationsListeners = new SearchRequestOperationsListeners( - Stream.concat( - Stream.of(searchRequestStats, searchRequestSlowLog), - pluginComponents.stream() - .filter(p -> p instanceof SearchRequestOperationsListener) - .map(p -> (SearchRequestOperationsListener) p) - ).toArray(SearchRequestOperationsListener[]::new) - ); + // register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory + final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = + new SearchRequestOperationsCompositeListenerFactory( + Stream.concat( + Stream.of(searchRequestStats, searchRequestSlowLog), + pluginComponents.stream() + .filter(p -> p instanceof SearchRequestOperationsListener) + .map(p -> (SearchRequestOperationsListener) p) + ).toArray(SearchRequestOperationsListener[]::new) + ); ActionModule actionModule = new ActionModule( settings, @@ -1287,7 +1288,7 @@ protected Node( b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); - b.bind(SearchRequestOperationsListeners.class).toInstance(searchRequestOperationsListeners); + b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory); }); injector = modules.createInjector(); diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index e7d3f4108a4b9..76129341fc9a2 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.opensearch.action.OriginalIndices; import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.ClusterState; @@ -177,7 +178,7 @@ private AbstractSearchAsyncAction createAction( results, request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) ) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, SearchPhaseContext context) { @@ -715,7 +716,10 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct null, task, SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger)) + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger), + searchRequest + ) ); } @@ -765,7 +769,10 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction( null, task, SearchResponse.Clusters.EMPTY, - new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger)) + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger), + searchRequest + ) ) { @Override ShardSearchFailure[] buildShardFailures() { diff --git a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 4ed4797efe604..56dcf66d5607d 100644 --- a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -31,6 +31,7 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.apache.lucene.util.BytesRef; import org.opensearch.Version; import org.opensearch.action.OriginalIndices; @@ -137,7 +138,10 @@ public void run() throws IOException { } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); canMatchPhase.start(); @@ -229,7 +233,10 @@ public void run() throws IOException { } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); canMatchPhase.start(); @@ -320,7 +327,10 @@ public void sendCanMatch( new ArraySearchPhaseResults<>(iter.size()), randomIntBetween(1, 32), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ) { @Override @@ -348,7 +358,10 @@ protected void executePhaseOnShard( } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); canMatchPhase.start(); @@ -433,7 +446,10 @@ public void run() { } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); canMatchPhase.start(); @@ -533,7 +549,10 @@ public void run() { } }, SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); canMatchPhase.start(); diff --git a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java index 7b4fa1d8387df..af7adc4e58fb8 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java @@ -31,6 +31,7 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.opensearch.Version; import org.opensearch.action.OriginalIndices; import org.opensearch.cluster.ClusterState; @@ -61,6 +62,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -136,7 +138,7 @@ public void testSkipSearchShards() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) ) { @Override @@ -255,7 +257,7 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) ) { @Override @@ -373,7 +375,7 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) ) { TestSearchResponse response = new TestSearchResponse(); @@ -496,7 +498,7 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) ) { TestSearchResponse response = new TestSearchResponse(); @@ -610,7 +612,7 @@ public void testAllowPartialResults() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), request) ) { @Override protected void executePhaseOnShard( diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java index a8c0c43ac5080..faf6f86c69c27 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.SortField; import org.apache.lucene.search.TopFieldDocs; @@ -63,6 +64,7 @@ import org.opensearch.transport.Transport; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -215,7 +217,10 @@ public void sendExecuteQuery( null, task, SearchResponse.Clusters.EMPTY, - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ) { @Override protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenersTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java similarity index 85% rename from server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenersTests.java rename to server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java index e762e795d88d2..78c5ba4412c68 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenersTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsCompositeListenerFactoryTests.java @@ -14,10 +14,12 @@ import java.util.List; -public class SearchRequestOperationsListenersTests extends OpenSearchTestCase { +public class SearchRequestOperationsCompositeListenerFactoryTests extends OpenSearchTestCase { public void testAddAndGetListeners() { SearchRequestOperationsListener testListener = createTestSearchRequestOperationsListener(); - SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener); + SearchRequestOperationsCompositeListenerFactory requestListeners = new SearchRequestOperationsCompositeListenerFactory( + testListener + ); assertEquals(1, requestListeners.getListeners().size()); assertEquals(testListener, requestListeners.getListeners().get(0)); } @@ -26,7 +28,10 @@ public void testStandardListenersEnabled() { SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); testListener1.setEnabled(false); - SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1, testListener2); + SearchRequestOperationsCompositeListenerFactory requestListeners = new SearchRequestOperationsCompositeListenerFactory( + testListener1, + testListener2 + ); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener( @@ -43,7 +48,9 @@ public void testStandardListenersEnabled() { public void testStandardListenersAndPerRequestListener() { SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); - SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1); + SearchRequestOperationsCompositeListenerFactory requestListeners = new SearchRequestOperationsCompositeListenerFactory( + testListener1 + ); SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); testListener1.setEnabled(true); testListener2.setEnabled(true); @@ -66,7 +73,9 @@ public void testStandardListenersAndPerRequestListener() { public void testStandardListenersDisabledAndPerRequestListener() { SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); testListener1.setEnabled(false); - SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1); + SearchRequestOperationsCompositeListenerFactory requestListeners = new SearchRequestOperationsCompositeListenerFactory( + testListener1 + ); SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); @@ -85,7 +94,9 @@ public void testStandardListenersDisabledAndPerRequestListener() { public void testStandardListenerAndPerRequestListenerDisabled() { SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); - SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1); + SearchRequestOperationsCompositeListenerFactory requestListeners = new SearchRequestOperationsCompositeListenerFactory( + testListener1 + ); testListener1.setEnabled(true); SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); testListener2.setEnabled(false); diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java index 58a4c4a4e555d..0f737e00478cb 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerSupport.java @@ -8,6 +8,10 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; + +import java.util.List; + /** * Helper interface to access package protected {@link SearchRequestOperationsListener} from test cases. */ @@ -17,6 +21,12 @@ default void onPhaseStart(SearchRequestOperationsListener listener, SearchPhaseC } default void onPhaseEnd(SearchRequestOperationsListener listener, SearchPhaseContext context) { - listener.onPhaseEnd(context, new SearchRequestContext()); + listener.onPhaseEnd( + context, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); } } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java index d2e5c95862fd9..f009988ffae17 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java @@ -104,7 +104,7 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestOperationsListeners searchRequestListeners = new SearchRequestOperationsListeners(); + SearchRequestOperationsCompositeListenerFactory searchRequestListeners = new SearchRequestOperationsCompositeListenerFactory(); SearchRequestSlowLog searchRequestSlowLog2 = new SearchRequestSlowLog(clusterService2); int numberOfLoggersAfter = context.getLoggers().size(); @@ -176,7 +176,8 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { ArrayList searchRequestContexts = new ArrayList<>(); for (int i = 0; i < numRequests; i++) { SearchRequestContext searchRequestContext = new SearchRequestContext( - new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger) + new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger), + searchRequest ); searchRequestContext.setAbsoluteStartNanos((i < numRequestsLogged) ? 0 : System.nanoTime()); searchRequestContexts.add(searchRequestContext); @@ -205,7 +206,10 @@ public void testSearchRequestSlowLogHasJsonFields_EmptySearchRequestContext() th SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); - SearchRequestContext searchRequestContext = new SearchRequestContext(); + SearchRequestContext searchRequestContext = new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ); SearchRequestSlowLog.SearchRequestSlowLogMessage p = new SearchRequestSlowLog.SearchRequestSlowLogMessage( searchPhaseContext, 10, @@ -226,7 +230,10 @@ public void testSearchRequestSlowLogHasJsonFields_NotEmptySearchRequestContext() SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); - SearchRequestContext searchRequestContext = new SearchRequestContext(); + SearchRequestContext searchRequestContext = new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ); searchRequestContext.updatePhaseTookMap(SearchPhaseName.FETCH.getName(), 10L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.QUERY.getName(), 50L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.EXPAND.getName(), 5L); @@ -252,7 +259,10 @@ public void testSearchRequestSlowLogHasJsonFields_PartialContext() throws IOExce SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); - SearchRequestContext searchRequestContext = new SearchRequestContext(); + SearchRequestContext searchRequestContext = new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ); searchRequestContext.updatePhaseTookMap(SearchPhaseName.FETCH.getName(), 10L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.QUERY.getName(), 50L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.EXPAND.getName(), 5L); @@ -278,7 +288,10 @@ public void testSearchRequestSlowLogSearchContextPrinterToLog() throws IOExcepti SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); - SearchRequestContext searchRequestContext = new SearchRequestContext(); + SearchRequestContext searchRequestContext = new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ); searchRequestContext.updatePhaseTookMap(SearchPhaseName.FETCH.getName(), 10L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.QUERY.getName(), 50L); searchRequestContext.updatePhaseTookMap(SearchPhaseName.EXPAND.getName(), 5L); diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java index 25559482d095c..377ccebbfd418 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java @@ -8,11 +8,13 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.test.OpenSearchTestCase; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; @@ -54,7 +56,13 @@ public void testSearchRequestStats() { long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); - testRequestStats.onPhaseEnd(ctx, new SearchRequestContext()); + testRequestStats.onPhaseEnd( + ctx, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); assertEquals(1, testRequestStats.getPhaseTotal(searchPhaseName)); assertThat(testRequestStats.getPhaseMetric(searchPhaseName), greaterThanOrEqualTo(tookTimeInMillis)); @@ -108,7 +116,13 @@ public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedExc for (int i = 0; i < numTasks; i++) { threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); - testRequestStats.onPhaseEnd(ctx, new SearchRequestContext()); + testRequestStats.onPhaseEnd( + ctx, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); countDownLatch.countDown(); }); threads[i].start(); diff --git a/server/src/test/java/org/opensearch/action/search/SearchResponseMergerTests.java b/server/src/test/java/org/opensearch/action/search/SearchResponseMergerTests.java index 71e4236de8510..ce4d5ca4f7091 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchResponseMergerTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchResponseMergerTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.apache.lucene.search.SortField; import org.apache.lucene.search.TotalHits; import org.opensearch.OpenSearchException; @@ -132,7 +133,13 @@ public void testMergeTookInMillis() throws InterruptedException { addResponse(merger, searchResponse); } awaitResponsesAdded(); - SearchResponse searchResponse = merger.getMergedResponse(SearchResponse.Clusters.EMPTY, new SearchRequestContext()); + SearchResponse searchResponse = merger.getMergedResponse( + SearchResponse.Clusters.EMPTY, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); } @@ -184,7 +191,13 @@ public void testMergeShardFailures() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = merger.getMergedResponse(clusters, new SearchRequestContext()); + SearchResponse mergedResponse = merger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -235,7 +248,13 @@ public void testMergeShardFailuresNullShardTarget() throws InterruptedException awaitResponsesAdded(); assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = merger.getMergedResponse(clusters, new SearchRequestContext()); + SearchResponse mergedResponse = merger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -281,8 +300,13 @@ public void testMergeShardFailuresNullShardId() throws InterruptedException { } awaitResponsesAdded(); assertEquals(numResponses, merger.numResponses()); - ShardSearchFailure[] shardFailures = merger.getMergedResponse(SearchResponse.Clusters.EMPTY, new SearchRequestContext()) - .getShardFailures(); + ShardSearchFailure[] shardFailures = merger.getMergedResponse( + SearchResponse.Clusters.EMPTY, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ).getShardFailures(); assertThat(Arrays.asList(shardFailures), containsInAnyOrder(expectedFailures.toArray(ShardSearchFailure.EMPTY_ARRAY))); } @@ -316,7 +340,13 @@ public void testMergeProfileResults() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, merger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = merger.getMergedResponse(clusters, new SearchRequestContext()); + SearchResponse mergedResponse = merger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -378,7 +408,13 @@ public void testMergeCompletionSuggestions() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters, new SearchRequestContext()); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -450,7 +486,13 @@ public void testMergeCompletionSuggestionsTieBreak() throws InterruptedException awaitResponsesAdded(); assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters, new SearchRequestContext()); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -524,7 +566,13 @@ public void testMergeAggs() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, searchResponseMerger.numResponses()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse mergedResponse = searchResponseMerger.getMergedResponse(clusters, new SearchRequestContext()); + SearchResponse mergedResponse = searchResponseMerger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, mergedResponse.getClusters()); assertEquals(numResponses, mergedResponse.getTotalShards()); assertEquals(numResponses, mergedResponse.getSuccessfulShards()); @@ -681,7 +729,13 @@ public void testMergeSearchHits() throws InterruptedException { awaitResponsesAdded(); assertEquals(numResponses, searchResponseMerger.numResponses()); final SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); - SearchResponse searchResponse = searchResponseMerger.getMergedResponse(clusters, new SearchRequestContext()); + SearchResponse searchResponse = searchResponseMerger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), searchResponse.getTook().millis()); assertEquals(expectedTotal, searchResponse.getTotalShards()); @@ -741,7 +795,13 @@ public void testMergeNoResponsesAdded() { SearchResponseMerger merger = new SearchResponseMerger(0, 10, Integer.MAX_VALUE, timeProvider, emptyReduceContextBuilder()); SearchResponse.Clusters clusters = SearchResponseTests.randomClusters(); assertEquals(0, merger.numResponses()); - SearchResponse response = merger.getMergedResponse(clusters, new SearchRequestContext()); + SearchResponse response = merger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertSame(clusters, response.getClusters()); assertEquals(TimeUnit.NANOSECONDS.toMillis(currentRelativeTime), response.getTook().millis()); assertEquals(0, response.getTotalShards()); @@ -814,7 +874,13 @@ public void testMergeEmptySearchHitsWithNonEmpty() { merger.add(searchResponse); } assertEquals(2, merger.numResponses()); - SearchResponse mergedResponse = merger.getMergedResponse(clusters, new SearchRequestContext()); + SearchResponse mergedResponse = merger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(10, mergedResponse.getHits().getTotalHits().value); assertEquals(10, mergedResponse.getHits().getHits().length); assertEquals(2, mergedResponse.getTotalShards()); @@ -856,7 +922,13 @@ public void testMergeOnlyEmptyHits() { ); merger.add(searchResponse); } - SearchResponse mergedResponse = merger.getMergedResponse(clusters, new SearchRequestContext()); + SearchResponse mergedResponse = merger.getMergedResponse( + clusters, + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + new SearchRequest() + ) + ); assertEquals(expectedTotalHits, mergedResponse.getHits().getTotalHits()); } diff --git a/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java index bed9bd37c49b4..da19c839f3826 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; import org.apache.lucene.search.TotalHits; import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; @@ -484,7 +485,10 @@ public void testCCSRemoteReduceMergeFails() throws Exception { threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); if (localIndices == null) { assertNull(setOnce.get()); @@ -543,7 +547,10 @@ public void testCCSRemoteReduce() throws Exception { threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); if (localIndices == null) { assertNull(setOnce.get()); @@ -581,7 +588,10 @@ public void testCCSRemoteReduce() throws Exception { threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); if (localIndices == null) { assertNull(setOnce.get()); @@ -640,7 +650,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); if (localIndices == null) { assertNull(setOnce.get()); @@ -681,7 +694,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); if (localIndices == null) { assertNull(setOnce.get()); @@ -733,7 +749,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti threadPool, listener, (r, l) -> setOnce.set(Tuple.tuple(r, l)), - new SearchRequestContext() + new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), + searchRequest + ) ); if (localIndices == null) { assertNull(setOnce.get()); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 752d88b677a9c..9bb1f51c51cf6 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -90,7 +90,7 @@ import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchRequestOperationsListeners; +import org.opensearch.action.search.SearchRequestOperationsCompositeListenerFactory; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchTransportService; import org.opensearch.action.search.TransportSearchAction; @@ -2285,7 +2285,8 @@ public void onFailure(final Exception e) { writableRegistry(), searchService::aggReduceContextBuilder ); - SearchRequestOperationsListeners searchRequestOperationsListeners = new SearchRequestOperationsListeners(); + SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory = + new SearchRequestOperationsCompositeListenerFactory(); actions.put( SearchAction.INSTANCE, new TransportSearchAction( @@ -2312,7 +2313,7 @@ public void onFailure(final Exception e) { client ), NoopMetricsRegistry.INSTANCE, - searchRequestOperationsListeners + searchRequestOperationsCompositeListenerFactory ) ); actions.put(