Skip to content

Commit

Permalink
support dynamically adding SearchRequestOperationsListener
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Dec 12, 2023
1 parent 2798114 commit badcda9
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/
@InternalApi
abstract class SearchRequestOperationsListener {
protected boolean enabled;

abstract void onPhaseStart(SearchPhaseContext context);

Expand All @@ -32,6 +33,28 @@ void onRequestStart(SearchRequestContext searchRequestContext) {}

void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
if (enabled) {
register();
} else {
deregister();
}
}


/**
* Handler function to register this listener to certain components
* This function will be called when the listener is enabled.
*/
protected void register() {}

/**
* Handler function to deregister this listener from certain components
* This function will be called when the listener is disabled.
*/
protected void deregister() {}

/**
* Holder of Composite Listeners
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,22 @@ void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequest
}
}

/**
* register this listener to TransportSearchAction
*/
@Override
protected void register() {
TransportSearchAction.addSearchOperationsListener(this);
}

/**
* deregister this listener to TransportSearchAction
*/
@Override
protected void deregister() {
TransportSearchAction.removeSearchOperationsListener(this);
}

/**
* Search request slow log message
*
Expand Down Expand Up @@ -233,18 +249,22 @@ private static String escapeJson(String text) {

void setWarnThreshold(TimeValue warnThreshold) {
this.warnThreshold = warnThreshold.nanos();
changeEnabledIfNeeded();
}

void setInfoThreshold(TimeValue infoThreshold) {
this.infoThreshold = infoThreshold.nanos();
changeEnabledIfNeeded();
}

void setDebugThreshold(TimeValue debugThreshold) {
this.debugThreshold = debugThreshold.nanos();
changeEnabledIfNeeded();
}

void setTraceThreshold(TimeValue traceThreshold) {
this.traceThreshold = traceThreshold.nanos();
changeEnabledIfNeeded();
}

void setLevel(SlowLogLevel level) {
Expand All @@ -270,4 +290,11 @@ protected long getTraceThreshold() {
SlowLogLevel getLevel() {
return level;
}

private void changeEnabledIfNeeded() {
super.setEnabled(this.warnThreshold >= 0
|| this.debugThreshold >= 0
|| this.infoThreshold >= 0
|| this.traceThreshold >= 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@

package org.opensearch.action.search;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.common.settings.Setting;

import java.util.EnumMap;
import java.util.Map;
Expand All @@ -26,13 +28,38 @@
public final class SearchRequestStats extends SearchRequestOperationsListener {
Map<SearchPhaseName, StatsHolder> phaseStatsMap = new EnumMap<>(SearchPhaseName.class);

public static final String SEARCH_REQUEST_STATS_ENABLED_KEY = "search.request_stats_enabled";
public static final Setting<Boolean> SEARCH_REQUEST_STATS_ENABLED = Setting.boolSetting(
SEARCH_REQUEST_STATS_ENABLED_KEY,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

@Inject
public SearchRequestStats() {
public SearchRequestStats(ClusterService clusterService) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
phaseStatsMap.put(searchPhaseName, new StatsHolder());
}
}

/**
* register this listener to TransportSearchAction
*/
@Override
protected void register() {
TransportSearchAction.addSearchOperationsListener(this);
}

/**
* deregister this listener to TransportSearchAction
*/
@Override
protected void deregister() {
TransportSearchAction.removeSearchOperationsListener(this);
}

public long getPhaseCurrent(SearchPhaseName searchPhaseName) {
return phaseStatsMap.get(searchPhaseName).current.count();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,6 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
Setting.Property.NodeScope
);

public static final String SEARCH_REQUEST_STATS_ENABLED_KEY = "search.request_stats_enabled";
public static final Setting<Boolean> SEARCH_REQUEST_STATS_ENABLED = Setting.boolSetting(
SEARCH_REQUEST_STATS_ENABLED_KEY,
false,
Property.Dynamic,
Property.NodeScope
);

public static final String SEARCH_PHASE_TOOK_ENABLED_KEY = "search.phase_took_enabled";
public static final Setting<Boolean> SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting(
Expand All @@ -170,6 +163,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
Property.NodeScope
);

private static final List<SearchRequestOperationsListener> searchRequestOperationsListenersList = new ArrayList<>();

private final NodeClient client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
Expand All @@ -182,13 +177,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final CircuitBreaker circuitBreaker;
private final SearchPipelineService searchPipelineService;

private volatile boolean isRequestStatsEnabled;

private volatile boolean searchQueryMetricsEnabled;

private final SearchRequestStats searchRequestStats;
private final SearchRequestSlowLog searchRequestSlowLog;

private final MetricsRegistry metricsRegistry;

private SearchQueryCategorizer searchQueryCategorizer;
Expand All @@ -207,8 +197,6 @@ public TransportSearchAction(
IndexNameExpressionResolver indexNameExpressionResolver,
NamedWriteableRegistry namedWriteableRegistry,
SearchPipelineService searchPipelineService,
SearchRequestStats searchRequestStats,
SearchRequestSlowLog searchRequestSlowLog,
MetricsRegistry metricsRegistry
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
Expand All @@ -224,10 +212,6 @@ public TransportSearchAction(
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.namedWriteableRegistry = namedWriteableRegistry;
this.searchPipelineService = searchPipelineService;
this.isRequestStatsEnabled = clusterService.getClusterSettings().get(SEARCH_REQUEST_STATS_ENABLED);
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setIsRequestStatsEnabled);
this.searchRequestStats = searchRequestStats;
this.searchRequestSlowLog = searchRequestSlowLog;
this.metricsRegistry = metricsRegistry;
this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING);
clusterService.getClusterSettings()
Expand All @@ -241,10 +225,6 @@ private void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) {
}
}

private void setIsRequestStatsEnabled(boolean isRequestStatsEnabled) {
this.isRequestStatsEnabled = isRequestStatsEnabled;
}

private Map<String, AliasFilter> buildPerIndexAliasFilter(
SearchRequest request,
ClusterState clusterState,
Expand Down Expand Up @@ -1245,11 +1225,7 @@ AbstractSearchAsyncAction<? extends SearchPhaseResult> asyncSearchAction(
}

private List<SearchRequestOperationsListener> createSearchListenerList(SearchRequest searchRequest, SearchTimeProvider timeProvider) {
final List<SearchRequestOperationsListener> searchListenersList = new ArrayList<>();

if (isRequestStatsEnabled) {
searchListenersList.add(searchRequestStats);
}
final List<SearchRequestOperationsListener> searchListenersList = new ArrayList<>(searchRequestOperationsListenersList);

// phase_took is enabled with request param and/or cluster setting
Boolean phaseTookRequestParam = searchRequest.isPhaseTook();
Expand All @@ -1263,13 +1239,6 @@ private List<SearchRequestOperationsListener> createSearchListenerList(SearchReq
searchListenersList.add(timeProvider);
}

if (searchRequestSlowLog.getWarnThreshold() >= 0
|| searchRequestSlowLog.getInfoThreshold() >= 0
|| searchRequestSlowLog.getDebugThreshold() >= 0
|| searchRequestSlowLog.getTraceThreshold() >= 0) {
searchListenersList.add(searchRequestSlowLog);
}

return searchListenersList;
}

Expand Down Expand Up @@ -1540,4 +1509,38 @@ static List<SearchShardIterator> getLocalLocalShardsIteratorFromPointInTime(
}
return iterators;
}


/**
* Add a {@link SearchRequestOperationsListener} to the searchRequestOperationsListenersList,
* which will be executed during each search request.
*
* @param listener A SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if the input listener is null or already exists in the list.
*/
public static void addSearchOperationsListener(SearchRequestOperationsListener listener) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (searchRequestOperationsListenersList.contains(listener)) {
throw new IllegalArgumentException("listener already added");
}
searchRequestOperationsListenersList.add(listener);
}

/**
* Remove a {@link SearchRequestOperationsListener} from the searchRequestOperationsListenersList,
*
* @param listener A SearchRequestOperationsListener object to remove.
* @throws IllegalArgumentException if the input listener is null or already exists in the list.
*/
public static void removeSearchOperationsListener(SearchRequestOperationsListener listener) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (!searchRequestOperationsListenersList.contains(listener)) {
throw new IllegalArgumentException("listener does not exist in the listeners list");
}
searchRequestOperationsListenersList.remove(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.action.admin.indices.close.TransportCloseIndexAction;
import org.opensearch.action.search.CreatePitController;
import org.opensearch.action.search.SearchRequestSlowLog;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.action.support.DestructiveOperations;
Expand Down Expand Up @@ -380,9 +381,9 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING,
TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED,
TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED,
TransportSearchAction.SEARCH_QUERY_METRICS_ENABLED_SETTING,
SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ protected Node(
threadPool
);

final SearchRequestStats searchRequestStats = new SearchRequestStats();
final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService);
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService);

remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2310,8 +2310,6 @@ public void onFailure(final Exception e) {
List.of(),
client
),
null,
new SearchRequestSlowLog(clusterService),
NoopMetricsRegistry.INSTANCE
)
);
Expand Down

0 comments on commit badcda9

Please sign in to comment.