From 7a823565d5d500abd027d362fc66e38c1c020908 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Mon, 18 Dec 2023 14:20:09 -0800 Subject: [PATCH] make the logic to decide enabled self-contained in each listener Signed-off-by: Chenyang Ji --- .../search/SearchRequestListenerManager.java | 59 ++------------ .../SearchRequestOperationsListener.java | 1 - .../action/search/SearchRequestSlowLog.java | 14 ++-- .../action/search/SearchRequestStats.java | 4 +- .../action/search/TransportSearchAction.java | 36 +++++++-- .../common/settings/ClusterSettings.java | 2 +- .../main/java/org/opensearch/node/Node.java | 21 +++-- .../SearchRequestListenerManagerTests.java | 77 ++++--------------- .../search/SearchRequestSlowLogTests.java | 2 +- .../snapshots/SnapshotResiliencyTests.java | 2 +- 10 files changed, 76 insertions(+), 142 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestListenerManager.java b/server/src/main/java/org/opensearch/action/search/SearchRequestListenerManager.java index 25e44a0a562e8..90a7084518c46 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestListenerManager.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestListenerManager.java @@ -9,13 +9,12 @@ package org.opensearch.action.search; import org.apache.logging.log4j.Logger; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.Setting; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; /** @@ -27,32 +26,18 @@ * @opensearch.internal */ public class SearchRequestListenerManager { - - private final ClusterService clusterService; - public static final String SEARCH_PHASE_TOOK_ENABLED_KEY = "search.phase_took_enabled"; - public static final Setting SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting( - SEARCH_PHASE_TOOK_ENABLED_KEY, - false, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); private final List searchRequestListenersList; - public SearchRequestListenerManager( - ClusterService clusterService - ) { - this.clusterService = clusterService; - searchRequestListenersList = new ArrayList<>(); - } - /** - * Add multiple {@link SearchRequestOperationsListener} to the searchRequestListenersList. + * Create the SearchRequestListenerManager and add multiple {@link SearchRequestOperationsListener} + * to the searchRequestListenersList. * Those enabled listeners will be executed during each search request. * * @param listeners Multiple SearchRequestOperationsListener object to add. * @throws IllegalArgumentException if any input listener is null or already exists in the list. */ - public void addListeners(SearchRequestOperationsListener... listeners) { + public SearchRequestListenerManager(SearchRequestOperationsListener... listeners) { + searchRequestListenersList = new ArrayList<>(); for (SearchRequestOperationsListener listener : listeners) { if (listener == null) { throw new IllegalArgumentException("listener must not be null"); @@ -64,22 +49,6 @@ public void addListeners(SearchRequestOperationsListener... listeners) { } } - /** - * Remove a {@link SearchRequestOperationsListener} from the searchRequestListenersList, - * - * @param listener A SearchRequestOperationsListener object to remove. - * @throws IllegalArgumentException if the input listener is null or already exists in the list. - */ - public void removeListener(SearchRequestOperationsListener listener) { - if (listener == null) { - throw new IllegalArgumentException("listener must not be null"); - } - if (!searchRequestListenersList.contains(listener)) { - throw new IllegalArgumentException("listener does not exist in the listeners list"); - } - searchRequestListenersList.remove(listener); - } - /** * Get searchRequestListenersList, * @@ -95,30 +64,18 @@ public List getListeners() { * Create the {@link SearchRequestOperationsListener.CompositeListener} * with the all listeners enabled at cluster-level and request-level. * - * @param searchRequest The SearchRequest object. SearchRequestListenerManager will decide which request-level listeners to add based on states/flags of the request * @param logger Logger to be attached to the {@link SearchRequestOperationsListener.CompositeListener} * @param perRequestListeners the per-request listeners that can be optionally added to the returned CompositeListener list. * @return SearchRequestOperationsListener.CompositeListener */ public SearchRequestOperationsListener.CompositeListener buildCompositeListener( - SearchRequest searchRequest, Logger logger, SearchRequestOperationsListener... perRequestListeners ) { - final List searchListenersList = searchRequestListenersList.stream().filter(SearchRequestOperationsListener::getEnabled).collect(Collectors.toList()); + final List searchListenersList = Stream.concat(searchRequestListenersList.stream(), Arrays.stream(perRequestListeners)) + .filter(SearchRequestOperationsListener::getEnabled) + .collect(Collectors.toList()); - Arrays.stream(perRequestListeners).forEach((listener) -> { - if (listener != null && listener.getClass() == TransportSearchAction.SearchTimeProvider.class) { - TransportSearchAction.SearchTimeProvider timeProvider = (TransportSearchAction.SearchTimeProvider) listener; - // phase_took is enabled with request param and/or cluster setting - boolean phaseTookEnabled = (searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) || - clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED); - if (phaseTookEnabled) { - timeProvider.setPhaseTook(true); - searchListenersList.add(timeProvider); - } - } - }); return new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 7e318a542c812..121d7cd2a518c 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -40,7 +40,6 @@ void onRequestStart(SearchRequestContext searchRequestContext) {} void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} - boolean getEnabled() { return enabled; } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java index 0fadbda6b62a9..c15ef38789e65 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -108,9 +108,7 @@ public final class SearchRequestSlowLog extends SearchRequestOperationsListener private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); - public SearchRequestSlowLog( - ClusterService clusterService - ) { + public SearchRequestSlowLog(ClusterService clusterService) { this(clusterService, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties } @@ -235,22 +233,22 @@ private static String escapeJson(String text) { void setWarnThreshold(TimeValue warnThreshold) { this.warnThreshold = warnThreshold.nanos(); - setEnabled(); + setEnabledIfThresholdExceed(); } void setInfoThreshold(TimeValue infoThreshold) { this.infoThreshold = infoThreshold.nanos(); - setEnabled(); + setEnabledIfThresholdExceed(); } void setDebugThreshold(TimeValue debugThreshold) { this.debugThreshold = debugThreshold.nanos(); - setEnabled(); + setEnabledIfThresholdExceed(); } void setTraceThreshold(TimeValue traceThreshold) { this.traceThreshold = traceThreshold.nanos(); - setEnabled(); + setEnabledIfThresholdExceed(); } void setLevel(SlowLogLevel level) { @@ -277,7 +275,7 @@ SlowLogLevel getLevel() { return level; } - private void setEnabled() { + private void setEnabledIfThresholdExceed() { super.setEnabled(this.warnThreshold >= 0 || this.debugThreshold >= 0 || this.infoThreshold >= 0 diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index 19cb9dc4b73d5..58dbbb1a31e97 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -37,9 +37,7 @@ public final class SearchRequestStats extends SearchRequestOperationsListener { ); @Inject - public SearchRequestStats( - ClusterService clusterService - ) { + public SearchRequestStats(ClusterService clusterService) { clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled); for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { phaseStatsMap.put(searchPhaseName, new StatsHolder()); diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 17b220ae324a4..e1cd7d075c355 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -154,6 +154,14 @@ public class TransportSearchAction extends HandledTransportAction SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting( + SEARCH_PHASE_TOOK_ENABLED_KEY, + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private final NodeClient client; private final ThreadPool threadPool; private final ClusterService clusterService; @@ -277,7 +285,6 @@ static final class SearchTimeProvider extends SearchRequestOperationsListener { private final long absoluteStartMillis; private final long relativeStartNanos; private final LongSupplier relativeCurrentNanosProvider; - private boolean phaseTook = false; /** * Instantiates a new search time provider. The absolute start time is the real clock time @@ -304,12 +311,8 @@ long buildTookInMillis() { return TimeUnit.NANOSECONDS.toMillis(relativeCurrentNanosProvider.getAsLong() - relativeStartNanos); } - public void setPhaseTook(boolean phaseTook) { - this.phaseTook = phaseTook; - } - SearchResponse.PhaseTook getPhaseTook() { - if (phaseTook) { + if (getEnabled()) { Map phaseTookMap = new HashMap<>(); // Convert Map to Map for SearchResponse() for (SearchPhaseName searchPhaseName : phaseStatsMap.keySet()) { @@ -323,6 +326,22 @@ SearchResponse.PhaseTook getPhaseTook() { Map phaseStatsMap = new EnumMap<>(SearchPhaseName.class); + /** + * Set if this listener is enabled based on the cluster level setting + * and per request enable flags. + * + * @param enabledAtClusterLevel if the SearchTimeProvider listener is enabled at cluster level + * @param searchRequest the original Search Request + * @opensearch.internal + */ + + void setEnabled(boolean enabledAtClusterLevel, SearchRequest searchRequest) { + // phase_took is enabled wi th request param and/or cluster setting + super.setEnabled( + enabledAtClusterLevel || (searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) + ); + } + @Override void onPhaseStart(SearchPhaseContext context) {} @@ -462,8 +481,11 @@ private void executeRequest( relativeStartNanos, System::nanoTime ); + timeProvider.setEnabled( + clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED), + originalSearchRequest + ); SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestListenerManager.buildCompositeListener( - originalSearchRequest, logger, timeProvider ); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index af211661d42a4..814e7b6619106 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -383,7 +383,7 @@ public void apply(Settings value, Settings current, Settings previous) { TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING, TransportSearchAction.SEARCH_QUERY_METRICS_ENABLED_SETTING, - SearchRequestListenerManager.SEARCH_PHASE_TOOK_ENABLED, + TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED, SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 6acc5105b7b29..4e167b3e7d068 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -47,6 +47,7 @@ import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; import org.opensearch.action.search.SearchRequestListenerManager; +import org.opensearch.action.search.SearchRequestOperationsListener; import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchTransportService; @@ -787,13 +788,6 @@ protected Node( final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService); final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); - // register all standard SearchRequestOperationsListeners to the SearchRequestListenerManager - final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(clusterService); - searchRequestListenerManager.addListeners( - searchRequestStats, - searchRequestSlowLog - ); - remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); final IndicesService indicesService = new IndicesService( settings, @@ -887,6 +881,19 @@ protected Node( ) .collect(Collectors.toList()); + // register all standard SearchRequestOperationsListeners to the SearchRequestListenerManager + final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager( + Stream.concat( + Stream.of( + searchRequestStats, + searchRequestSlowLog + ), + Stream.of(pluginComponents) + .filter(p -> p instanceof SearchRequestOperationsListener) + .map(p -> (SearchRequestOperationsListener) p) + ).toArray(SearchRequestOperationsListener[]::new) + ); + ActionModule actionModule = new ActionModule( settings, clusterModule.getIndexNameExpressionResolver(), diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestListenerManagerTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestListenerManagerTests.java index 348049dacf667..f8098610268f5 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestListenerManagerTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestListenerManagerTests.java @@ -21,51 +21,21 @@ public class SearchRequestListenerManagerTests extends OpenSearchTestCase { public void testAddAndGetListeners() { - ClusterService clusterService = new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - null - ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); SearchRequestOperationsListener testListener = createTestSearchRequestOperationsListener(); - listenerManager.addListeners(testListener); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener); assertEquals(1, listenerManager.getListeners().size()); assertEquals(testListener, listenerManager.getListeners().get(0)); } - public void testRemoveListeners() { - ClusterService clusterService = new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - null - ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); - SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); - listenerManager.addListeners(testListener1, testListener2); - assertEquals(2, listenerManager.getListeners().size()); - listenerManager.removeListener(testListener2); - assertEquals(1, listenerManager.getListeners().size()); - assertEquals(testListener1, listenerManager.getListeners().get(0)); - } - public void testStandardListenersEnabled() throws NoSuchFieldException, IllegalAccessException { - ClusterService clusterService = new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - null - ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); - testListener2.setEnabled(true); - listenerManager.addListeners(testListener1, testListener2); - SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); - SearchRequest searchRequest = new SearchRequest().source(source); - SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener( - searchRequest, - logger + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager( + testListener1, + testListener2 ); + testListener2.setEnabled(true); + SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(logger); Field listenersField = SearchRequestOperationsListener.CompositeListener.class.getDeclaredField("listeners"); listenersField.setAccessible(true); List listeners = (List) listenersField.get(compositeListener); @@ -77,25 +47,20 @@ public void testStandardListenersEnabled() throws NoSuchFieldException, IllegalA } public void testStandardListenersAndTimeProvider() throws NoSuchFieldException, IllegalAccessException { - ClusterService clusterService = new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - null - ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener1); + testListener1.setEnabled(true); - SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider( + TransportSearchAction.SearchTimeProvider timeProviderListener = new TransportSearchAction.SearchTimeProvider( 0, System.nanoTime(), System::nanoTime ); - listenerManager.addListeners(testListener1); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); searchRequest.setPhaseTook(true); + timeProviderListener.setEnabled(false, searchRequest); SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener( - searchRequest, logger, timeProviderListener ); @@ -110,24 +75,18 @@ public void testStandardListenersAndTimeProvider() throws NoSuchFieldException, } public void testStandardListenersDisabledAndTimeProvider() throws NoSuchFieldException, IllegalAccessException { - ClusterService clusterService = new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - null - ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); - SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider( + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener1); + TransportSearchAction.SearchTimeProvider timeProviderListener = new TransportSearchAction.SearchTimeProvider( 0, System.nanoTime(), System::nanoTime ); - listenerManager.addListeners(testListener1); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); searchRequest.setPhaseTook(true); + timeProviderListener.setEnabled(false, searchRequest); SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener( - searchRequest, logger, timeProviderListener ); @@ -142,25 +101,19 @@ public void testStandardListenersDisabledAndTimeProvider() throws NoSuchFieldExc } public void testStandardListenerAndTimeProviderDisabled() throws NoSuchFieldException, IllegalAccessException { - ClusterService clusterService = new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - null - ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener1); + testListener1.setEnabled(true); SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider( 0, System.nanoTime(), System::nanoTime ); - listenerManager.addListeners(testListener1); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); searchRequest.setPhaseTook(false); SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener( - searchRequest, logger, timeProviderListener ); diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java index 5456ef02b9b8e..e70bba36445e0 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java @@ -104,7 +104,7 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager2 = new SearchRequestListenerManager(clusterService2); + SearchRequestListenerManager listenerManager2 = new SearchRequestListenerManager(); SearchRequestSlowLog searchRequestSlowLog2 = new SearchRequestSlowLog(clusterService2); int numberOfLoggersAfter = context.getLoggers().size(); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index bf1a6e1593f4c..e631d3e67a484 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2285,7 +2285,7 @@ public void onFailure(final Exception e) { writableRegistry(), searchService::aggReduceContextBuilder ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(); actions.put( SearchAction.INSTANCE, new TransportSearchAction(