Skip to content

Commit

Permalink
make the logic to decide enabled self-contained in each listener
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Dec 19, 2023
1 parent 3448338 commit 36b3e2a
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.action.search.SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import java.util.Set;
import java.util.function.Function;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.action.search.SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
package org.opensearch.action.search;

import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;


/**
Expand All @@ -27,32 +26,18 @@
* @opensearch.internal
*/
public class SearchRequestListenerManager {

private final ClusterService clusterService;
public static final String SEARCH_PHASE_TOOK_ENABLED_KEY = "search.phase_took_enabled";
public static final Setting<Boolean> SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting(
SEARCH_PHASE_TOOK_ENABLED_KEY,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private final List<SearchRequestOperationsListener> searchRequestListenersList;

public SearchRequestListenerManager(
ClusterService clusterService
) {
this.clusterService = clusterService;
searchRequestListenersList = new ArrayList<>();
}

/**
* Add multiple {@link SearchRequestOperationsListener} to the searchRequestListenersList.
* Create the SearchRequestListenerManager and add multiple {@link SearchRequestOperationsListener}
* to the searchRequestListenersList.
* Those enabled listeners will be executed during each search request.
*
* @param listeners Multiple SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if any input listener is null or already exists in the list.
*/
public void addListeners(SearchRequestOperationsListener... listeners) {
public SearchRequestListenerManager(SearchRequestOperationsListener... listeners) {
searchRequestListenersList = new ArrayList<>();
for (SearchRequestOperationsListener listener : listeners) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
Expand All @@ -64,22 +49,6 @@ public void addListeners(SearchRequestOperationsListener... listeners) {
}
}

/**
* Remove a {@link SearchRequestOperationsListener} from the searchRequestListenersList,
*
* @param listener A SearchRequestOperationsListener object to remove.
* @throws IllegalArgumentException if the input listener is null or already exists in the list.
*/
public void removeListener(SearchRequestOperationsListener listener) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (!searchRequestListenersList.contains(listener)) {
throw new IllegalArgumentException("listener does not exist in the listeners list");
}
searchRequestListenersList.remove(listener);
}

/**
* Get searchRequestListenersList,
*
Expand All @@ -95,30 +64,18 @@ public List<SearchRequestOperationsListener> getListeners() {
* Create the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
* @param searchRequest The SearchRequest object. SearchRequestListenerManager will decide which request-level listeners to add based on states/flags of the request
* @param logger Logger to be attached to the {@link SearchRequestOperationsListener.CompositeListener}
* @param perRequestListeners the per-request listeners that can be optionally added to the returned CompositeListener list.
* @return SearchRequestOperationsListener.CompositeListener
*/
public SearchRequestOperationsListener.CompositeListener buildCompositeListener(
SearchRequest searchRequest,
Logger logger,
SearchRequestOperationsListener... perRequestListeners
) {
final List<SearchRequestOperationsListener> searchListenersList = searchRequestListenersList.stream().filter(SearchRequestOperationsListener::getEnabled).collect(Collectors.toList());
final List<SearchRequestOperationsListener> searchListenersList = Stream.concat(searchRequestListenersList.stream(), Arrays.stream(perRequestListeners))
.filter(SearchRequestOperationsListener::getEnabled)
.collect(Collectors.toList());

Arrays.stream(perRequestListeners).forEach((listener) -> {
if (listener != null && listener.getClass() == TransportSearchAction.SearchTimeProvider.class) {
TransportSearchAction.SearchTimeProvider timeProvider = (TransportSearchAction.SearchTimeProvider) listener;
// phase_took is enabled with request param and/or cluster setting
boolean phaseTookEnabled = (searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) ||
clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED);
if (phaseTookEnabled) {
timeProvider.setPhaseTook(true);
searchListenersList.add(timeProvider);
}
}
});
return new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ void onRequestStart(SearchRequestContext searchRequestContext) {}

void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}


boolean getEnabled() {
return enabled;
}
Expand All @@ -62,6 +61,7 @@ static final class CompositeListener extends SearchRequestOperationsListener {
CompositeListener(List<SearchRequestOperationsListener> listeners, Logger logger) {
this.listeners = listeners;
this.logger = logger;
this.setEnabled(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ public final class SearchRequestSlowLog extends SearchRequestOperationsListener

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

public SearchRequestSlowLog(
ClusterService clusterService
) {
public SearchRequestSlowLog(ClusterService clusterService) {
this(clusterService, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties
}

Expand Down Expand Up @@ -235,22 +233,22 @@ private static String escapeJson(String text) {

void setWarnThreshold(TimeValue warnThreshold) {
this.warnThreshold = warnThreshold.nanos();
setEnabled();
setEnabledIfThresholdExceed();
}

void setInfoThreshold(TimeValue infoThreshold) {
this.infoThreshold = infoThreshold.nanos();
setEnabled();
setEnabledIfThresholdExceed();
}

void setDebugThreshold(TimeValue debugThreshold) {
this.debugThreshold = debugThreshold.nanos();
setEnabled();
setEnabledIfThresholdExceed();
}

void setTraceThreshold(TimeValue traceThreshold) {
this.traceThreshold = traceThreshold.nanos();
setEnabled();
setEnabledIfThresholdExceed();
}

void setLevel(SlowLogLevel level) {
Expand All @@ -277,7 +275,7 @@ SlowLogLevel getLevel() {
return level;
}

private void setEnabled() {
private void setEnabledIfThresholdExceed() {
super.setEnabled(this.warnThreshold >= 0
|| this.debugThreshold >= 0
|| this.infoThreshold >= 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ public final class SearchRequestStats extends SearchRequestOperationsListener {
);

@Inject
public SearchRequestStats(
ClusterService clusterService
) {
public SearchRequestStats(ClusterService clusterService) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
phaseStatsMap.put(searchPhaseName, new StatsHolder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
Setting.Property.NodeScope
);

public static final String SEARCH_PHASE_TOOK_ENABLED_KEY = "search.phase_took_enabled";
public static final Setting<Boolean> SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting(
SEARCH_PHASE_TOOK_ENABLED_KEY,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final NodeClient client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
Expand Down Expand Up @@ -277,7 +285,6 @@ static final class SearchTimeProvider extends SearchRequestOperationsListener {
private final long absoluteStartMillis;
private final long relativeStartNanos;
private final LongSupplier relativeCurrentNanosProvider;
private boolean phaseTook = false;

/**
* Instantiates a new search time provider. The absolute start time is the real clock time
Expand All @@ -304,12 +311,8 @@ long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(relativeCurrentNanosProvider.getAsLong() - relativeStartNanos);
}

public void setPhaseTook(boolean phaseTook) {
this.phaseTook = phaseTook;
}

SearchResponse.PhaseTook getPhaseTook() {
if (phaseTook) {
if (getEnabled()) {
Map<String, Long> phaseTookMap = new HashMap<>();
// Convert Map<SearchPhaseName, Long> to Map<String, Long> for SearchResponse()
for (SearchPhaseName searchPhaseName : phaseStatsMap.keySet()) {
Expand All @@ -323,6 +326,22 @@ SearchResponse.PhaseTook getPhaseTook() {

Map<SearchPhaseName, Long> phaseStatsMap = new EnumMap<>(SearchPhaseName.class);

/**
* Set if this listener is enabled based on the cluster level setting
* and per request enable flags.
*
* @param enabledAtClusterLevel if the SearchTimeProvider listener is enabled at cluster level
* @param searchRequest the original Search Request
* @opensearch.internal
*/

void setEnabled(boolean enabledAtClusterLevel, SearchRequest searchRequest) {
// phase_took is enabled wi th request param and/or cluster setting
super.setEnabled(
enabledAtClusterLevel || (searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook())
);
}

@Override
void onPhaseStart(SearchPhaseContext context) {}

Expand Down Expand Up @@ -462,8 +481,11 @@ private void executeRequest(
relativeStartNanos,
System::nanoTime
);
timeProvider.setEnabled(
clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED),
originalSearchRequest
);
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestListenerManager.buildCompositeListener(
originalSearchRequest,
logger,
timeProvider
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public void apply(Settings value, Settings current, Settings previous) {
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING,
TransportSearchAction.SEARCH_QUERY_METRICS_ENABLED_SETTING,
SearchRequestListenerManager.SEARCH_PHASE_TOOK_ENABLED,
TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED,
SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
Expand Down
21 changes: 14 additions & 7 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.action.search.SearchExecutionStatsCollector;
import org.opensearch.action.search.SearchPhaseController;
import org.opensearch.action.search.SearchRequestListenerManager;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.action.search.SearchRequestSlowLog;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.action.search.SearchTransportService;
Expand Down Expand Up @@ -787,13 +788,6 @@ protected Node(
final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService);
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService);

// register all standard SearchRequestOperationsListeners to the SearchRequestListenerManager
final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(clusterService);
searchRequestListenerManager.addListeners(
searchRequestStats,
searchRequestSlowLog
);

remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings);
final IndicesService indicesService = new IndicesService(
settings,
Expand Down Expand Up @@ -887,6 +881,19 @@ protected Node(
)
.collect(Collectors.toList());

// register all standard SearchRequestOperationsListeners to the SearchRequestListenerManager
final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(
Stream.concat(
Stream.of(
searchRequestStats,
searchRequestSlowLog
),
pluginComponents.stream()
.filter(p -> p instanceof SearchRequestOperationsListener)
.map(p -> (SearchRequestOperationsListener) p)
).toArray(SearchRequestOperationsListener[]::new)
);

ActionModule actionModule = new ActionModule(
settings,
clusterModule.getIndexNameExpressionResolver(),
Expand Down
Loading

0 comments on commit 36b3e2a

Please sign in to comment.