Skip to content

Commit

Permalink
Dynamically add search request operation listeners with SearchRequest…
Browse files Browse the repository at this point in the history
…ListenerManager

Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Dec 12, 2023
1 parent badcda9 commit 2dfbd76
Show file tree
Hide file tree
Showing 15 changed files with 421 additions and 156 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Setting;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;


/**
* SearchRequestListenerManager manages listeners registered to search requests,
* and is responsible for creating the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
*
* @opensearch.internal
*/
public class SearchRequestListenerManager {

private final ClusterService clusterService;
public static final String SEARCH_PHASE_TOOK_ENABLED_KEY = "search.phase_took_enabled";
public static final Setting<Boolean> SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting(
SEARCH_PHASE_TOOK_ENABLED_KEY,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private final List<SearchRequestOperationsListener> searchRequestListenersList;

@Inject
public SearchRequestListenerManager(
ClusterService clusterService
) {
this.clusterService = clusterService;
searchRequestListenersList = new ArrayList<>();
}

/**
* Add a {@link SearchRequestOperationsListener} to the searchRequestListenersList,
* which will be executed during each search request.
*
* @param listener A SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if the input listener is null or already exists in the list.
*/
public void addListener(SearchRequestOperationsListener listener) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (searchRequestListenersList.contains(listener)) {
throw new IllegalArgumentException("listener already added");
}
searchRequestListenersList.add(listener);
}

/**
* Remove a {@link SearchRequestOperationsListener} from the searchRequestListenersList,
*
* @param listener A SearchRequestOperationsListener object to remove.
* @throws IllegalArgumentException if the input listener is null or already exists in the list.
*/
public void removeListener(SearchRequestOperationsListener listener) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (!searchRequestListenersList.contains(listener)) {
throw new IllegalArgumentException("listener does not exist in the listeners list");
}
searchRequestListenersList.remove(listener);
}

/**
* Get searchRequestListenersList,
*
* @return List of SearchRequestOperationsListener
* @throws IllegalArgumentException if the input listener is null or already exists in the list.
*/
public List<SearchRequestOperationsListener> getListeners() {
return searchRequestListenersList;
}


/**
* Create the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
* @param searchRequest The SearchRequest object. SearchRequestListenerManager will decide which request-level listeners to add based on states/flags of the request
* @param logger Logger to be attached to the {@link SearchRequestOperationsListener.CompositeListener}
* @param perRequestListeners the per-request listeners that can be optionally added to the returned CompositeListener list.
* @return SearchRequestOperationsListener.CompositeListener
*/
public SearchRequestOperationsListener.CompositeListener buildCompositeListener(
SearchRequest searchRequest,
Logger logger,
SearchRequestOperationsListener... perRequestListeners
) {
final List<SearchRequestOperationsListener> searchListenersList = new ArrayList<>(searchRequestListenersList);

Arrays.stream(perRequestListeners).parallel().forEach((listener) -> {
if (listener != null && listener.getClass() == TransportSearchAction.SearchTimeProvider.class) {
TransportSearchAction.SearchTimeProvider timeProvider = (TransportSearchAction.SearchTimeProvider) listener;
// phase_took is enabled with request param and/or cluster setting
boolean phaseTookEnabled = (searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) ||
clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED);
if (phaseTookEnabled) {
timeProvider.setPhaseTook(true);
searchListenersList.add(timeProvider);
}
}
});
return new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*/
@InternalApi
abstract class SearchRequestOperationsListener {
protected boolean enabled;
protected SearchRequestListenerManager searchRequestListenerManager;

abstract void onPhaseStart(SearchPhaseContext context);

Expand All @@ -34,7 +34,6 @@ void onRequestStart(SearchRequestContext searchRequestContext) {}
void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
if (enabled) {
register();
} else {
Expand All @@ -47,13 +46,21 @@ public void setEnabled(boolean enabled) {
* Handler function to register this listener to certain components
* This function will be called when the listener is enabled.
*/
protected void register() {}
protected void register() {
if (this.searchRequestListenerManager != null) {
this.searchRequestListenerManager.addListener(this);
}
}

/**
* Handler function to deregister this listener from certain components
* This function will be called when the listener is disabled.
*/
protected void deregister() {}
protected void deregister() {
if (this.searchRequestListenerManager != null) {
this.searchRequestListenerManager.removeListener(this);
}
}

/**
* Holder of Composite Listeners
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.logging.OpenSearchLogMessage;
import org.opensearch.common.logging.SlowLogLevel;
Expand Down Expand Up @@ -108,12 +109,18 @@ public final class SearchRequestSlowLog extends SearchRequestOperationsListener

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

public SearchRequestSlowLog(ClusterService clusterService) {
this(clusterService, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties
@Inject
public SearchRequestSlowLog(
ClusterService clusterService,
SearchRequestListenerManager searchRequestListenerManager
) {
this(clusterService, searchRequestListenerManager, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties
}

SearchRequestSlowLog(ClusterService clusterService, Logger logger) {
@Inject
SearchRequestSlowLog(ClusterService clusterService, SearchRequestListenerManager searchRequestListenerManager, Logger logger) {
this.logger = logger;
this.searchRequestListenerManager = searchRequestListenerManager;
Loggers.setLevel(this.logger, SlowLogLevel.TRACE.name());

this.warnThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING).nanos();
Expand Down Expand Up @@ -160,22 +167,6 @@ void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequest
}
}

/**
* register this listener to TransportSearchAction
*/
@Override
protected void register() {
TransportSearchAction.addSearchOperationsListener(this);
}

/**
* deregister this listener to TransportSearchAction
*/
@Override
protected void deregister() {
TransportSearchAction.removeSearchOperationsListener(this);
}

/**
* Search request slow log message
*
Expand Down Expand Up @@ -249,22 +240,22 @@ private static String escapeJson(String text) {

void setWarnThreshold(TimeValue warnThreshold) {
this.warnThreshold = warnThreshold.nanos();
changeEnabledIfNeeded();
setEnabled();
}

void setInfoThreshold(TimeValue infoThreshold) {
this.infoThreshold = infoThreshold.nanos();
changeEnabledIfNeeded();
setEnabled();
}

void setDebugThreshold(TimeValue debugThreshold) {
this.debugThreshold = debugThreshold.nanos();
changeEnabledIfNeeded();
setEnabled();
}

void setTraceThreshold(TimeValue traceThreshold) {
this.traceThreshold = traceThreshold.nanos();
changeEnabledIfNeeded();
setEnabled();
}

void setLevel(SlowLogLevel level) {
Expand All @@ -291,7 +282,7 @@ SlowLogLevel getLevel() {
return level;
}

private void changeEnabledIfNeeded() {
private void setEnabled() {
super.setEnabled(this.warnThreshold >= 0
|| this.debugThreshold >= 0
|| this.infoThreshold >= 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,15 @@ public final class SearchRequestStats extends SearchRequestOperationsListener {
);

@Inject
public SearchRequestStats(ClusterService clusterService) {
public SearchRequestStats(
ClusterService clusterService,
SearchRequestListenerManager searchRequestListenerManager
) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
phaseStatsMap.put(searchPhaseName, new StatsHolder());
}
}

/**
* register this listener to TransportSearchAction
*/
@Override
protected void register() {
TransportSearchAction.addSearchOperationsListener(this);
}

/**
* deregister this listener to TransportSearchAction
*/
@Override
protected void deregister() {
TransportSearchAction.removeSearchOperationsListener(this);
this.searchRequestListenerManager = searchRequestListenerManager;
}

public long getPhaseCurrent(SearchPhaseName searchPhaseName) {
Expand Down
Loading

0 comments on commit 2dfbd76

Please sign in to comment.