From ff9548d3378fd9f15a7ad053ba83802b951d1582 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Mon, 4 Mar 2024 13:10:55 -0800 Subject: [PATCH 1/4] Add user info in top queries Signed-off-by: Chenyang Ji --- CHANGELOG.md | 1 + gradle/code-coverage.gradle | 1 + plugins/query-insights/build.gradle | 19 ++++++++ .../plugin/insights/QueryInsightsPlugin.java | 2 +- .../core/listener/QueryInsightsListener.java | 22 +++++++++- .../insights/rules/model/Attribute.java | 10 ++++- .../settings/QueryInsightsSettings.java | 9 ++++ .../listener/QueryInsightsListenerTests.java | 44 ++++++++++++++++--- 8 files changed, 98 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 33d1f6ee02027..cee1ad17c2f6c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -113,6 +113,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835)) - The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586)) - Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601)) +- [Query insights] Add user info in top queries ([#12529](https://github.com/opensearch-project/OpenSearch/pull/12529)) ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) diff --git a/gradle/code-coverage.gradle b/gradle/code-coverage.gradle index 3ca6b1fe84ea7..4ad1965b212ca 100644 --- a/gradle/code-coverage.gradle +++ b/gradle/code-coverage.gradle @@ -15,6 +15,7 @@ repositories { maven { url = "https://ci.opensearch.org/ci/dbc/snapshots/lucene/" } + maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } } allprojects { diff --git a/plugins/query-insights/build.gradle b/plugins/query-insights/build.gradle index eabbd395bd3bd..99dfaea44e6e0 100644 --- a/plugins/query-insights/build.gradle +++ b/plugins/query-insights/build.gradle @@ -14,5 +14,24 @@ opensearchplugin { classname 'org.opensearch.plugin.insights.QueryInsightsPlugin' } +dependencyLicenses.enabled = false + +ext { + opensearch_version = versions.opensearch + // 3.0.0-SNAPSHOT -> 3.0.0.0 + version_tokens = opensearch_version.tokenize('-') + common_utils_version = version_tokens[0] + '.0' + if (version_tokens.size() > 1) { + common_utils_version = common_utils_version + "-" + version_tokens[1] + } +} + +repositories { + mavenLocal() + mavenCentral() + maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } +} + dependencies { + implementation "org.opensearch:common-utils:${common_utils_version}" } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 4d7e0d486068a..69b6adbe1efcf 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -71,7 +71,7 @@ public Collection createComponents( ) { // create top n queries service final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); - return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService)); + return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService, threadPool)); } @Override diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 9ec8673147c38..dcf92616ca5b6 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -16,11 +16,14 @@ import org.opensearch.action.search.SearchRequestOperationsListener; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; +import org.opensearch.commons.authuser.User; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.threadpool.ThreadPool; import java.util.Collections; import java.util.HashMap; @@ -45,16 +48,23 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener private static final Logger log = LogManager.getLogger(QueryInsightsListener.class); private final QueryInsightsService queryInsightsService; + private final ThreadPool threadPool; /** * Constructor for QueryInsightsListener * * @param clusterService The Node's cluster service. * @param queryInsightsService The topQueriesByLatencyService associated with this listener + * @param threadPool The OpenSearch thread pool to run async tasks */ @Inject - public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) { + public QueryInsightsListener( + final ClusterService clusterService, + final QueryInsightsService queryInsightsService, + final ThreadPool threadPool + ) { this.queryInsightsService = queryInsightsService; + this.threadPool = threadPool; clusterService.getClusterSettings() .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v)); clusterService.getClusterSettings() @@ -138,6 +148,16 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards()); attributes.put(Attribute.INDICES, request.indices()); attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); + // add user related information + Object userInfo = threadPool.getThreadContext().getTransient(QueryInsightsSettings.REQUEST_HEADER_USER_INFO); + if (userInfo != null) { + attributes.put(Attribute.USER, User.parse(userInfo.toString())); + } + // add remote ip address + Object remoteAddress = threadPool.getThreadContext().getTransient(QueryInsightsSettings.REQUEST_HEADER_REMOTE_ADDRESS); + if (remoteAddress != null) { + attributes.put(Attribute.REMOTE_ADDRESS, remoteAddress.toString()); + } SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); queryInsightsService.addRecord(record); } catch (Exception e) { diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java index c1d17edf9ff14..fd16285d203f6 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -43,7 +43,15 @@ public enum Attribute { /** * The node id for this request */ - NODE_ID; + NODE_ID, + /** + * The remote address of this request + */ + REMOTE_ADDRESS, + /** + * Information related to the user who sent this request + */ + USER; /** * Read an Attribute from a StreamInput diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index 52cc1fbde790f..890f99e72dfb5 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -23,6 +23,15 @@ * @opensearch.experimental */ public class QueryInsightsSettings { + /** + * Constant setting for user info header key that are injected during authentication + */ + public static final String REQUEST_HEADER_USER_INFO = "_opendistro_security_user_info"; + /** + * Constant setting for remote address info header key that are injected during authentication + */ + public static final String REQUEST_HEADER_REMOTE_ADDRESS = "_opendistro_security_remote_address"; + /** * Executors settings */ diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index f340950017a5c..ad173734798c1 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -15,23 +15,32 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.commons.InjectSecurity; +import org.opensearch.commons.authuser.User; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.core.service.TopQueriesService; +import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.opensearch.search.aggregations.support.ValueType; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; import org.junit.Before; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; +import org.mockito.ArgumentCaptor; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -47,12 +56,15 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final SearchRequest searchRequest = mock(SearchRequest.class); private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); private final TopQueriesService topQueriesService = mock(TopQueriesService.class); + private final ThreadPool threadPool = mock(ThreadPool.class); + private final Settings.Builder settingsBuilder = Settings.builder(); + private final Settings settings = settingsBuilder.build(); + private final String remoteAddress = "1.2.3.4"; + private User user; private ClusterService clusterService; @Before public void setup() { - Settings.Builder settingsBuilder = Settings.builder(); - Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); @@ -60,10 +72,18 @@ public void setup() { clusterService = new ClusterService(settings, clusterSettings, null); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); + + // inject user info + ThreadContext threadContext = new ThreadContext(settings); + user = new User("user-1", List.of("role1", "role2"), List.of("role3", "role4"), List.of()); + InjectSecurity injector = new InjectSecurity("id", settings, threadContext); + injector.injectUserInfo(user); + threadContext.putTransient(QueryInsightsSettings.REQUEST_HEADER_REMOTE_ADDRESS, remoteAddress); + when(threadPool.getThreadContext()).thenReturn(threadContext); } - public void testOnRequestEnd() throws InterruptedException { - Long timestamp = System.currentTimeMillis() - 100L; + public void testOnRequestEnd() { + long timestamp = System.currentTimeMillis() - 100L; SearchType searchType = SearchType.QUERY_THEN_FETCH; SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); @@ -79,7 +99,7 @@ public void testOnRequestEnd() throws InterruptedException { int numberOfShards = 10; - QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); + QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService, threadPool); when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); when(searchRequest.searchType()).thenReturn(searchType); @@ -92,6 +112,16 @@ public void testOnRequestEnd() throws InterruptedException { queryInsightsListener.onRequestEnd(searchPhaseContext, searchRequestContext); verify(queryInsightsService, times(1)).addRecord(any()); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(SearchQueryRecord.class); + verify(queryInsightsService).addRecord(argumentCaptor.capture()); + assertEquals(timestamp, argumentCaptor.getValue().getTimestamp()); + Map attrs = argumentCaptor.getValue().getAttributes(); + assertEquals(searchType.toString().toLowerCase(Locale.ROOT), attrs.get(Attribute.SEARCH_TYPE)); + assertEquals(numberOfShards, attrs.get(Attribute.TOTAL_SHARDS)); + assertEquals(indices, attrs.get(Attribute.INDICES)); + assertEquals(phaseLatencyMap, attrs.get(Attribute.PHASE_LATENCY_MAP)); + assertEquals(user, attrs.get(Attribute.USER)); + assertEquals(remoteAddress, attrs.get(Attribute.REMOTE_ADDRESS)); } public void testConcurrentOnRequestEnd() throws InterruptedException { @@ -127,7 +157,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(numRequests); for (int i = 0; i < numRequests; i++) { - searchListenersList.add(new QueryInsightsListener(clusterService, queryInsightsService)); + searchListenersList.add(new QueryInsightsListener(clusterService, queryInsightsService, threadPool)); } for (int i = 0; i < numRequests; i++) { @@ -148,7 +178,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { public void testSetEnabled() { when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); - QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); + QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService, threadPool); queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, true); assertTrue(queryInsightsListener.isEnabled()); From f4a5c34a056c37680e73841eac5762baa8b351ef Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Tue, 5 Mar 2024 21:49:20 -0800 Subject: [PATCH 2/4] remove common-utils as a dependency Signed-off-by: Chenyang Ji --- gradle/code-coverage.gradle | 1 - plugins/query-insights/build.gradle | 19 ------ .../core/listener/QueryInsightsListener.java | 13 +--- .../insights/rules/model/Attribute.java | 16 ++++- .../insights/utils/ThreadContextParser.java | 64 +++++++++++++++++++ .../plugin/insights/utils/package-info.java | 12 ++++ .../listener/QueryInsightsListenerTests.java | 19 ++++-- 7 files changed, 104 insertions(+), 40 deletions(-) create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/ThreadContextParser.java create mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/package-info.java diff --git a/gradle/code-coverage.gradle b/gradle/code-coverage.gradle index 4ad1965b212ca..3ca6b1fe84ea7 100644 --- a/gradle/code-coverage.gradle +++ b/gradle/code-coverage.gradle @@ -15,7 +15,6 @@ repositories { maven { url = "https://ci.opensearch.org/ci/dbc/snapshots/lucene/" } - maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } } allprojects { diff --git a/plugins/query-insights/build.gradle b/plugins/query-insights/build.gradle index 99dfaea44e6e0..eabbd395bd3bd 100644 --- a/plugins/query-insights/build.gradle +++ b/plugins/query-insights/build.gradle @@ -14,24 +14,5 @@ opensearchplugin { classname 'org.opensearch.plugin.insights.QueryInsightsPlugin' } -dependencyLicenses.enabled = false - -ext { - opensearch_version = versions.opensearch - // 3.0.0-SNAPSHOT -> 3.0.0.0 - version_tokens = opensearch_version.tokenize('-') - common_utils_version = version_tokens[0] + '.0' - if (version_tokens.size() > 1) { - common_utils_version = common_utils_version + "-" + version_tokens[1] - } -} - -repositories { - mavenLocal() - mavenCentral() - maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } -} - dependencies { - implementation "org.opensearch:common-utils:${common_utils_version}" } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index dcf92616ca5b6..052595d1ade8a 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -16,13 +16,12 @@ import org.opensearch.action.search.SearchRequestOperationsListener; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; -import org.opensearch.commons.authuser.User; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; -import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.plugin.insights.utils.ThreadContextParser; import org.opensearch.threadpool.ThreadPool; import java.util.Collections; @@ -149,15 +148,7 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo attributes.put(Attribute.INDICES, request.indices()); attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); // add user related information - Object userInfo = threadPool.getThreadContext().getTransient(QueryInsightsSettings.REQUEST_HEADER_USER_INFO); - if (userInfo != null) { - attributes.put(Attribute.USER, User.parse(userInfo.toString())); - } - // add remote ip address - Object remoteAddress = threadPool.getThreadContext().getTransient(QueryInsightsSettings.REQUEST_HEADER_REMOTE_ADDRESS); - if (remoteAddress != null) { - attributes.put(Attribute.REMOTE_ADDRESS, remoteAddress.toString()); - } + attributes.putAll(ThreadContextParser.getUserInfoFromThreadContext(threadPool.getThreadContext())); SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); queryInsightsService.addRecord(record); } catch (Exception e) { diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java index fd16285d203f6..9b98f84afc4b4 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -49,9 +49,21 @@ public enum Attribute { */ REMOTE_ADDRESS, /** - * Information related to the user who sent this request + * Username of the user who sent this request */ - USER; + USER_NAME, + /** + * Backend roles of the user who sent this request + */ + USER_BACKEND_ROLES, + /** + * Roles of the user who sent this request + */ + USER_ROLES, + /** + * Tenant info of the user who sent this request + */ + USER_TENANT; /** * Read an Attribute from a StreamInput diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/ThreadContextParser.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/ThreadContextParser.java new file mode 100644 index 0000000000000..ca1bba1d07553 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/ThreadContextParser.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.utils; + +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.common.Strings; +import org.opensearch.plugin.insights.rules.model.Attribute; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Helper class to parse information from the thread context + */ +public final class ThreadContextParser { + + private ThreadContextParser() {} + + /** + * Get User info from the thread context + * + * @param threadContext context of the thread + * @return Map of {@link Attribute} and the corresponding values + */ + public static Map getUserInfoFromThreadContext(ThreadContext threadContext) { + Map userInfoMap = new HashMap<>(); + if (threadContext == null) { + return userInfoMap; + } + Object userInfoObj = threadContext.getTransient(QueryInsightsSettings.REQUEST_HEADER_USER_INFO); + if (userInfoObj == null) { + return userInfoMap; + } + String userInfoStr = userInfoObj.toString(); + Object remoteAddressObj = threadContext.getTransient(QueryInsightsSettings.REQUEST_HEADER_REMOTE_ADDRESS); + if (remoteAddressObj != null) { + userInfoMap.put(Attribute.REMOTE_ADDRESS, remoteAddressObj.toString()); + } + + String[] userInfo = userInfoStr.split("\\|"); + if ((userInfo.length == 0) || (Strings.isNullOrEmpty(userInfo[0]))) { + return userInfoMap; + } + userInfoMap.put(Attribute.USER_NAME, userInfo[0].trim()); + if ((userInfo.length > 1) && !Strings.isNullOrEmpty(userInfo[1])) { + userInfoMap.put(Attribute.USER_BACKEND_ROLES, Arrays.asList(userInfo[1].split(","))); + } + if ((userInfo.length > 2) && !Strings.isNullOrEmpty(userInfo[2])) { + userInfoMap.put(Attribute.USER_ROLES, Arrays.asList(userInfo[2].split(","))); + } + if ((userInfo.length > 3) && !Strings.isNullOrEmpty(userInfo[3])) { + userInfoMap.put(Attribute.USER_TENANT, userInfo[3].trim()); + } + return userInfoMap; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/package-info.java new file mode 100644 index 0000000000000..220329a8d4bf4 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Utils for Query Insights Plugin + */ +package org.opensearch.plugin.insights.utils; diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index ad173734798c1..74fd959050c68 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -16,8 +16,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.commons.InjectSecurity; -import org.opensearch.commons.authuser.User; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.core.service.TopQueriesService; import org.opensearch.plugin.insights.rules.model.Attribute; @@ -60,7 +58,10 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final Settings.Builder settingsBuilder = Settings.builder(); private final Settings settings = settingsBuilder.build(); private final String remoteAddress = "1.2.3.4"; - private User user; + private final String userName = "user1"; + private final List userBackendRoles = List.of("bk-role1", "bk-role2"); + private final List userRoles = List.of("role1", "role2"); + private final String userTenant = "tenant1"; private ClusterService clusterService; @Before @@ -75,9 +76,10 @@ public void setup() { // inject user info ThreadContext threadContext = new ThreadContext(settings); - user = new User("user-1", List.of("role1", "role2"), List.of("role3", "role4"), List.of()); - InjectSecurity injector = new InjectSecurity("id", settings, threadContext); - injector.injectUserInfo(user); + threadContext.putTransient( + QueryInsightsSettings.REQUEST_HEADER_USER_INFO, + userName + '|' + String.join(",", userBackendRoles) + "|" + String.join(",", userRoles) + "|" + userTenant + ); threadContext.putTransient(QueryInsightsSettings.REQUEST_HEADER_REMOTE_ADDRESS, remoteAddress); when(threadPool.getThreadContext()).thenReturn(threadContext); } @@ -120,7 +122,10 @@ public void testOnRequestEnd() { assertEquals(numberOfShards, attrs.get(Attribute.TOTAL_SHARDS)); assertEquals(indices, attrs.get(Attribute.INDICES)); assertEquals(phaseLatencyMap, attrs.get(Attribute.PHASE_LATENCY_MAP)); - assertEquals(user, attrs.get(Attribute.USER)); + assertEquals(userName, attrs.get(Attribute.USER_NAME)); + assertEquals(userBackendRoles, attrs.get(Attribute.USER_BACKEND_ROLES)); + assertEquals(userRoles, attrs.get(Attribute.USER_ROLES)); + assertEquals(userTenant, attrs.get(Attribute.USER_TENANT)); assertEquals(remoteAddress, attrs.get(Attribute.REMOTE_ADDRESS)); } From ec91f1557c12d76ac5db1817a6f549b79b210572 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Thu, 7 Mar 2024 13:34:53 -0800 Subject: [PATCH 3/4] remove user info and read remote address from RestRequest Signed-off-by: Chenyang Ji --- CHANGELOG.md | 2 +- .../plugin/insights/QueryInsightsPlugin.java | 2 +- .../core/listener/QueryInsightsListener.java | 14 +--- .../insights/rules/model/Attribute.java | 18 +----- .../settings/QueryInsightsSettings.java | 9 --- .../insights/utils/ThreadContextParser.java | 64 ------------------- .../plugin/insights/utils/package-info.java | 12 ---- .../listener/QueryInsightsListenerTests.java | 27 ++------ .../action/search/SearchRequestContext.java | 4 ++ .../rest/action/search/RestSearchAction.java | 4 ++ 10 files changed, 17 insertions(+), 139 deletions(-) delete mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/ThreadContextParser.java delete mode 100644 plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/package-info.java diff --git a/CHANGELOG.md b/CHANGELOG.md index cee1ad17c2f6c..5c13c9a1d2718 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -113,7 +113,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835)) - The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586)) - Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601)) -- [Query insights] Add user info in top queries ([#12529](https://github.com/opensearch-project/OpenSearch/pull/12529)) +- [Query insights] Add remote address info in top queries ([#12529](https://github.com/opensearch-project/OpenSearch/pull/12529)) ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index 69b6adbe1efcf..4d7e0d486068a 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -71,7 +71,7 @@ public Collection createComponents( ) { // create top n queries service final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); - return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService, threadPool)); + return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService)); } @Override diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 052595d1ade8a..ca9cd279191b2 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -21,8 +21,6 @@ import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; -import org.opensearch.plugin.insights.utils.ThreadContextParser; -import org.opensearch.threadpool.ThreadPool; import java.util.Collections; import java.util.HashMap; @@ -47,23 +45,16 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener private static final Logger log = LogManager.getLogger(QueryInsightsListener.class); private final QueryInsightsService queryInsightsService; - private final ThreadPool threadPool; /** * Constructor for QueryInsightsListener * * @param clusterService The Node's cluster service. * @param queryInsightsService The topQueriesByLatencyService associated with this listener - * @param threadPool The OpenSearch thread pool to run async tasks */ @Inject - public QueryInsightsListener( - final ClusterService clusterService, - final QueryInsightsService queryInsightsService, - final ThreadPool threadPool - ) { + public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) { this.queryInsightsService = queryInsightsService; - this.threadPool = threadPool; clusterService.getClusterSettings() .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v)); clusterService.getClusterSettings() @@ -147,8 +138,7 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards()); attributes.put(Attribute.INDICES, request.indices()); attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); - // add user related information - attributes.putAll(ThreadContextParser.getUserInfoFromThreadContext(threadPool.getThreadContext())); + attributes.put(Attribute.REMOTE_ADDRESS, searchRequestContext.getRequestRemoteAddress()); SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); queryInsightsService.addRecord(record); } catch (Exception e) { diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java index 9b98f84afc4b4..3a29415618b97 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -47,23 +47,7 @@ public enum Attribute { /** * The remote address of this request */ - REMOTE_ADDRESS, - /** - * Username of the user who sent this request - */ - USER_NAME, - /** - * Backend roles of the user who sent this request - */ - USER_BACKEND_ROLES, - /** - * Roles of the user who sent this request - */ - USER_ROLES, - /** - * Tenant info of the user who sent this request - */ - USER_TENANT; + REMOTE_ADDRESS; /** * Read an Attribute from a StreamInput diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index 890f99e72dfb5..52cc1fbde790f 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -23,15 +23,6 @@ * @opensearch.experimental */ public class QueryInsightsSettings { - /** - * Constant setting for user info header key that are injected during authentication - */ - public static final String REQUEST_HEADER_USER_INFO = "_opendistro_security_user_info"; - /** - * Constant setting for remote address info header key that are injected during authentication - */ - public static final String REQUEST_HEADER_REMOTE_ADDRESS = "_opendistro_security_remote_address"; - /** * Executors settings */ diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/ThreadContextParser.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/ThreadContextParser.java deleted file mode 100644 index ca1bba1d07553..0000000000000 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/ThreadContextParser.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.plugin.insights.utils; - -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.core.common.Strings; -import org.opensearch.plugin.insights.rules.model.Attribute; -import org.opensearch.plugin.insights.settings.QueryInsightsSettings; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -/** - * Helper class to parse information from the thread context - */ -public final class ThreadContextParser { - - private ThreadContextParser() {} - - /** - * Get User info from the thread context - * - * @param threadContext context of the thread - * @return Map of {@link Attribute} and the corresponding values - */ - public static Map getUserInfoFromThreadContext(ThreadContext threadContext) { - Map userInfoMap = new HashMap<>(); - if (threadContext == null) { - return userInfoMap; - } - Object userInfoObj = threadContext.getTransient(QueryInsightsSettings.REQUEST_HEADER_USER_INFO); - if (userInfoObj == null) { - return userInfoMap; - } - String userInfoStr = userInfoObj.toString(); - Object remoteAddressObj = threadContext.getTransient(QueryInsightsSettings.REQUEST_HEADER_REMOTE_ADDRESS); - if (remoteAddressObj != null) { - userInfoMap.put(Attribute.REMOTE_ADDRESS, remoteAddressObj.toString()); - } - - String[] userInfo = userInfoStr.split("\\|"); - if ((userInfo.length == 0) || (Strings.isNullOrEmpty(userInfo[0]))) { - return userInfoMap; - } - userInfoMap.put(Attribute.USER_NAME, userInfo[0].trim()); - if ((userInfo.length > 1) && !Strings.isNullOrEmpty(userInfo[1])) { - userInfoMap.put(Attribute.USER_BACKEND_ROLES, Arrays.asList(userInfo[1].split(","))); - } - if ((userInfo.length > 2) && !Strings.isNullOrEmpty(userInfo[2])) { - userInfoMap.put(Attribute.USER_ROLES, Arrays.asList(userInfo[2].split(","))); - } - if ((userInfo.length > 3) && !Strings.isNullOrEmpty(userInfo[3])) { - userInfoMap.put(Attribute.USER_TENANT, userInfo[3].trim()); - } - return userInfoMap; - } -} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/package-info.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/package-info.java deleted file mode 100644 index 220329a8d4bf4..0000000000000 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/utils/package-info.java +++ /dev/null @@ -1,12 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/** - * Utils for Query Insights Plugin - */ -package org.opensearch.plugin.insights.utils; diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index 74fd959050c68..c8e110b1fed9b 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -15,7 +15,6 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.core.service.TopQueriesService; import org.opensearch.plugin.insights.rules.model.Attribute; @@ -26,7 +25,6 @@ import org.opensearch.search.aggregations.support.ValueType; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.ThreadPool; import org.junit.Before; import java.util.ArrayList; @@ -54,14 +52,9 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final SearchRequest searchRequest = mock(SearchRequest.class); private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); private final TopQueriesService topQueriesService = mock(TopQueriesService.class); - private final ThreadPool threadPool = mock(ThreadPool.class); private final Settings.Builder settingsBuilder = Settings.builder(); private final Settings settings = settingsBuilder.build(); private final String remoteAddress = "1.2.3.4"; - private final String userName = "user1"; - private final List userBackendRoles = List.of("bk-role1", "bk-role2"); - private final List userRoles = List.of("role1", "role2"); - private final String userTenant = "tenant1"; private ClusterService clusterService; @Before @@ -73,15 +66,7 @@ public void setup() { clusterService = new ClusterService(settings, clusterSettings, null); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); - - // inject user info - ThreadContext threadContext = new ThreadContext(settings); - threadContext.putTransient( - QueryInsightsSettings.REQUEST_HEADER_USER_INFO, - userName + '|' + String.join(",", userBackendRoles) + "|" + String.join(",", userRoles) + "|" + userTenant - ); - threadContext.putTransient(QueryInsightsSettings.REQUEST_HEADER_REMOTE_ADDRESS, remoteAddress); - when(threadPool.getThreadContext()).thenReturn(threadContext); + when(searchRequestContext.getRequestRemoteAddress()).thenReturn(remoteAddress); } public void testOnRequestEnd() { @@ -101,7 +86,7 @@ public void testOnRequestEnd() { int numberOfShards = 10; - QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService, threadPool); + QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); when(searchRequest.searchType()).thenReturn(searchType); @@ -122,10 +107,6 @@ public void testOnRequestEnd() { assertEquals(numberOfShards, attrs.get(Attribute.TOTAL_SHARDS)); assertEquals(indices, attrs.get(Attribute.INDICES)); assertEquals(phaseLatencyMap, attrs.get(Attribute.PHASE_LATENCY_MAP)); - assertEquals(userName, attrs.get(Attribute.USER_NAME)); - assertEquals(userBackendRoles, attrs.get(Attribute.USER_BACKEND_ROLES)); - assertEquals(userRoles, attrs.get(Attribute.USER_ROLES)); - assertEquals(userTenant, attrs.get(Attribute.USER_TENANT)); assertEquals(remoteAddress, attrs.get(Attribute.REMOTE_ADDRESS)); } @@ -162,7 +143,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(numRequests); for (int i = 0; i < numRequests; i++) { - searchListenersList.add(new QueryInsightsListener(clusterService, queryInsightsService, threadPool)); + searchListenersList.add(new QueryInsightsListener(clusterService, queryInsightsService)); } for (int i = 0; i < numRequests; i++) { @@ -183,7 +164,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { public void testSetEnabled() { when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); - QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService, threadPool); + QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, true); assertTrue(queryInsightsListener.isEnabled()); diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index b8bbde65ca6bc..be5239ca57448 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -51,6 +51,10 @@ public Map phaseTookMap() { return phaseTookMap; } + public String getRequestRemoteAddress() { + return searchRequest.remoteAddress().toString(); + } + SearchResponse.PhaseTook getPhaseTook() { if (searchRequest != null && searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) { return new SearchResponse.PhaseTook(phaseTookMap); diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java index 80dc34c4d5d68..f96fec932694f 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java @@ -42,6 +42,7 @@ import org.opensearch.common.Booleans; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.query.QueryBuilder; import org.opensearch.rest.BaseRestHandler; @@ -223,6 +224,9 @@ public static void parseSearchRequest( } searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null)); + + // set remote address for searchRequest + searchRequest.remoteAddress(new TransportAddress(request.getHttpChannel().getRemoteAddress())); } /** From 41ea2019db44440341a6ef8885237de6cb222b89 Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Thu, 14 Mar 2024 14:40:22 -0700 Subject: [PATCH 4/4] add an integration test for remote address Signed-off-by: Chenyang Ji --- .../QueryInsightsPluginTransportIT.java | 23 +++++++++++++++---- .../core/listener/QueryInsightsListener.java | 2 +- .../listener/QueryInsightsListenerTests.java | 10 +++++--- .../action/search/SearchRequestContext.java | 5 ++-- 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java index 04e715444f50a..93e03143a9c17 100644 --- a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java +++ b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java @@ -14,19 +14,26 @@ import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; +import org.opensearch.plugin.insights.rules.model.Attribute; import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.PluginInfo; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.Assert; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -49,6 +56,8 @@ public class QueryInsightsPluginTransportIT extends OpenSearchIntegTestCase { private final int TOTAL_NUMBER_OF_NODES = 2; private final int TOTAL_SEARCH_REQUESTS = 5; + private final String remoteAddress = "1.2.3.4"; + private final int remotePort = 1234; @Override protected Collection> nodePlugins() { @@ -143,7 +152,7 @@ public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionExc /** * Test get top queries when feature enabled */ - public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { + public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException, UnknownHostException { Settings commonSettings = Settings.builder() .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") @@ -168,10 +177,11 @@ public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { } // making search requests to get top queries for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { - SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) + SearchRequestBuilder requestBuilder = internalCluster().client(randomFrom(nodes)) .prepareSearch() - .setQuery(QueryBuilders.matchAllQuery()) - .get(); + .setQuery(QueryBuilders.matchAllQuery()); + requestBuilder.request().remoteAddress(new TransportAddress(InetAddress.getByName(remoteAddress), remotePort)); + SearchResponse searchResponse = requestBuilder.get(); assertEquals(searchResponse.getFailedShards(), 0); } // Sleep to wait for queue drained to top queries store @@ -181,6 +191,11 @@ public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { Assert.assertEquals(0, response.failures().size()); Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum()); + for (TopQueries nodeRecords : response.getNodes()) { + for (SearchQueryRecord record : nodeRecords.getTopQueriesRecord()) { + Assert.assertEquals(remoteAddress + ":" + remotePort, record.getAttributes().get(Attribute.REMOTE_ADDRESS)); + } + } internalCluster().stopAllNodes(); } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index ca9cd279191b2..672e32fefce2b 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -138,7 +138,7 @@ public void onRequestEnd(final SearchPhaseContext context, final SearchRequestCo attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards()); attributes.put(Attribute.INDICES, request.indices()); attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap()); - attributes.put(Attribute.REMOTE_ADDRESS, searchRequestContext.getRequestRemoteAddress()); + attributes.put(Attribute.REMOTE_ADDRESS, String.valueOf(searchRequestContext.getRequestRemoteAddress())); SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes); queryInsightsService.addRecord(record); } catch (Exception e) { diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index c8e110b1fed9b..591313440c221 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.core.service.TopQueriesService; import org.opensearch.plugin.insights.rules.model.Attribute; @@ -27,6 +28,8 @@ import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -55,10 +58,11 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final Settings.Builder settingsBuilder = Settings.builder(); private final Settings settings = settingsBuilder.build(); private final String remoteAddress = "1.2.3.4"; + private final int remotePort = 1234; private ClusterService clusterService; @Before - public void setup() { + public void setup() throws UnknownHostException { ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED); clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE); @@ -66,7 +70,7 @@ public void setup() { clusterService = new ClusterService(settings, clusterSettings, null); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); - when(searchRequestContext.getRequestRemoteAddress()).thenReturn(remoteAddress); + when(searchRequestContext.getRequestRemoteAddress()).thenReturn(new TransportAddress(InetAddress.getByName(remoteAddress), remotePort)); } public void testOnRequestEnd() { @@ -107,7 +111,7 @@ public void testOnRequestEnd() { assertEquals(numberOfShards, attrs.get(Attribute.TOTAL_SHARDS)); assertEquals(indices, attrs.get(Attribute.INDICES)); assertEquals(phaseLatencyMap, attrs.get(Attribute.PHASE_LATENCY_MAP)); - assertEquals(remoteAddress, attrs.get(Attribute.REMOTE_ADDRESS)); + assertEquals(remoteAddress + ":" + remotePort, attrs.get(Attribute.REMOTE_ADDRESS)); } public void testConcurrentOnRequestEnd() throws InterruptedException { diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java index be5239ca57448..4a89c548f97ca 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestContext.java @@ -10,6 +10,7 @@ import org.apache.lucene.search.TotalHits; import org.opensearch.common.annotation.InternalApi; +import org.opensearch.core.common.transport.TransportAddress; import java.util.EnumMap; import java.util.HashMap; @@ -51,8 +52,8 @@ public Map phaseTookMap() { return phaseTookMap; } - public String getRequestRemoteAddress() { - return searchRequest.remoteAddress().toString(); + public TransportAddress getRequestRemoteAddress() { + return searchRequest.remoteAddress(); } SearchResponse.PhaseTook getPhaseTook() {