diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.info/40_aggs.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.info/40_aggs.yml new file mode 100644 index 0000000000000..97bb4e86bdf86 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.info/40_aggs.yml @@ -0,0 +1,18 @@ +--- +"node_info test aggregations": + - skip: + version: " - 7.9.99" + reason: "aggregation info only supported in 8.0.0+" + features: [arbitrary_key] + + + - do: + nodes.info: {} + - set: + nodes._arbitrary_key_: node_id + + - do: + nodes.info: + metric: [ aggregations ] + + - match : { nodes.$node_id.aggregations.filter: { "types": ["other"] } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java index 13dfdc0896bf3..54a50bf486ce4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfo.java @@ -34,6 +34,7 @@ import org.elasticsearch.monitor.os.OsInfo; import org.elasticsearch.monitor.process.ProcessInfo; import org.elasticsearch.node.ReportingService; +import org.elasticsearch.search.aggregations.support.AggregationInfo; import org.elasticsearch.threadpool.ThreadPoolInfo; import org.elasticsearch.transport.TransportInfo; @@ -82,12 +83,15 @@ public NodeInfo(StreamInput in) throws IOException { addInfoIfNonNull(HttpInfo.class, in.readOptionalWriteable(HttpInfo::new)); addInfoIfNonNull(PluginsAndModules.class, in.readOptionalWriteable(PluginsAndModules::new)); addInfoIfNonNull(IngestInfo.class, in.readOptionalWriteable(IngestInfo::new)); + if (in.getVersion().onOrAfter(Version.V_7_10_0)) { + addInfoIfNonNull(AggregationInfo.class, in.readOptionalWriteable(AggregationInfo::new)); + } } public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Settings settings, @Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable ThreadPoolInfo threadPool, @Nullable TransportInfo transport, @Nullable HttpInfo http, @Nullable PluginsAndModules plugins, - @Nullable IngestInfo ingest, @Nullable ByteSizeValue totalIndexingBuffer) { + @Nullable IngestInfo ingest, @Nullable AggregationInfo aggsInfo, @Nullable ByteSizeValue totalIndexingBuffer) { super(node); this.version = version; this.build = build; @@ -100,6 +104,7 @@ public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Sett addInfoIfNonNull(HttpInfo.class, http); addInfoIfNonNull(PluginsAndModules.class, plugins); addInfoIfNonNull(IngestInfo.class, ingest); + addInfoIfNonNull(AggregationInfo.class, aggsInfo); this.totalIndexingBuffer = totalIndexingBuffer; } @@ -187,5 +192,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(getInfo(HttpInfo.class)); out.writeOptionalWriteable(getInfo(PluginsAndModules.class)); out.writeOptionalWriteable(getInfo(IngestInfo.class)); + if (out.getVersion().onOrAfter(Version.V_7_10_0)) { + out.writeOptionalWriteable(getInfo(AggregationInfo.class)); + } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java index 06f8d95591561..157ef40af31fd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoRequest.java @@ -183,6 +183,7 @@ public enum Metric { HTTP("http"), PLUGINS("plugins"), INGEST("ingest"), + AGGREGATIONS("aggregations"), INDICES("indices"); private String metricName; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java index 8251d49676cb7..56ac729566443 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/NodesInfoResponse.java @@ -35,6 +35,7 @@ import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.os.OsInfo; import org.elasticsearch.monitor.process.ProcessInfo; +import org.elasticsearch.search.aggregations.support.AggregationInfo; import org.elasticsearch.threadpool.ThreadPoolInfo; import org.elasticsearch.transport.TransportInfo; @@ -126,6 +127,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (nodeInfo.getInfo(IngestInfo.class) != null) { nodeInfo.getInfo(IngestInfo.class).toXContent(builder, params); } + if (nodeInfo.getInfo(AggregationInfo.class) != null) { + nodeInfo.getInfo(AggregationInfo.class).toXContent(builder, params); + } builder.endObject(); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index b75470619cb22..efeba262da60e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -80,6 +80,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) { metrics.contains(NodesInfoRequest.Metric.HTTP.metricName()), metrics.contains(NodesInfoRequest.Metric.PLUGINS.metricName()), metrics.contains(NodesInfoRequest.Metric.INGEST.metricName()), + metrics.contains(NodesInfoRequest.Metric.AGGREGATIONS.metricName()), metrics.contains(NodesInfoRequest.Metric.INDICES.metricName())); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index bd93d8b766148..d54465ff84521 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -95,7 +95,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce @Override protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) { - NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false); + NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false, false); NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, true, true, true, false, true, false, false, false, false, false, true, false, false, false); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 640f8daa87f05..67fbc7dbd952f 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -585,7 +585,7 @@ protected Node(final Environment initialEnvironment, this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptService, httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, - searchTransportService, indexingLimits); + searchTransportService, indexingLimits, searchModule.getValuesSourceRegistry().getUsageService()); final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptService, bigArrays, searchModule.getFetchPhase(), diff --git a/server/src/main/java/org/elasticsearch/node/NodeService.java b/server/src/main/java/org/elasticsearch/node/NodeService.java index f72923dee9801..32202db162477 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeService.java +++ b/server/src/main/java/org/elasticsearch/node/NodeService.java @@ -39,6 +39,7 @@ import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.aggregations.support.AggregationUsageService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -61,6 +62,7 @@ public class NodeService implements Closeable { private final ResponseCollectorService responseCollectorService; private final SearchTransportService searchTransportService; private final IndexingPressure indexingPressure; + private final AggregationUsageService aggregationUsageService; private final Discovery discovery; @@ -69,7 +71,8 @@ public class NodeService implements Closeable { CircuitBreakerService circuitBreakerService, ScriptService scriptService, @Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService, SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService, - SearchTransportService searchTransportService, IndexingPressure indexingPressure) { + SearchTransportService searchTransportService, IndexingPressure indexingPressure, + AggregationUsageService aggregationUsageService) { this.settings = settings; this.threadPool = threadPool; this.monitorService = monitorService; @@ -85,11 +88,12 @@ public class NodeService implements Closeable { this.responseCollectorService = responseCollectorService; this.searchTransportService = searchTransportService; this.indexingPressure = indexingPressure; + this.aggregationUsageService = aggregationUsageService; clusterService.addStateApplier(ingestService); } public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool, - boolean transport, boolean http, boolean plugin, boolean ingest, boolean indices) { + boolean transport, boolean http, boolean plugin, boolean ingest, boolean aggs, boolean indices) { return new NodeInfo(Version.CURRENT, Build.CURRENT, transportService.getLocalNode(), settings ? settingsFilter.filter(this.settings) : null, os ? monitorService.osService().info() : null, @@ -100,6 +104,7 @@ public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, http ? (httpServerTransport == null ? null : httpServerTransport.info()) : null, plugin ? (pluginService == null ? null : pluginService.info()) : null, ingest ? (ingestService == null ? null : ingestService.info()) : null, + aggs ? (aggregationUsageService == null ? null : aggregationUsageService.info()) : null, indices ? indicesService.getTotalIndexingBufferBytes() : null ); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationInfo.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationInfo.java new file mode 100644 index 0000000000000..63ca371b96b28 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationInfo.java @@ -0,0 +1,108 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.support; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.node.ReportingService; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.atomic.LongAdder; + +public class AggregationInfo implements ReportingService.Info { + + private final Map> aggs; + + AggregationInfo(Map> aggs) { + // we use a treemap/treeset here to have a test-able / predictable order + Map> aggsMap = new TreeMap<>(); + aggs.forEach((s, m) -> aggsMap.put(s, Collections.unmodifiableSet(new TreeSet<>(m.keySet())))); + this.aggs = Collections.unmodifiableMap(aggsMap); + } + + /** + * Read from a stream. + */ + public AggregationInfo(StreamInput in) throws IOException { + aggs = new TreeMap<>(); + final int size = in.readVInt(); + for (int i = 0; i < size; i++) { + String key = in.readString(); + final int keys = in.readVInt(); + final Set types = new TreeSet<>(); + for (int j = 0; j < keys; j ++) { + types.add(in.readString()); + } + aggs.put(key, types); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(aggs.size()); + for (Map.Entry> e : aggs.entrySet()) { + out.writeString(e.getKey()); + out.writeVInt(e.getValue().size()); + for (String type : e.getValue()) { + out.writeString(type); + } + } + } + + public Map> getAggregations() { + return aggs; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("aggregations"); + for (Map.Entry> e : aggs.entrySet()) { + builder.startObject(e.getKey()); + builder.startArray("types"); + for (String s : e.getValue()) { + builder.value(s); + } + builder.endArray(); + builder.endObject(); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AggregationInfo that = (AggregationInfo) o; + return Objects.equals(aggs, that.aggs); + } + + @Override + public int hashCode() { + return Objects.hash(aggs); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationUsageService.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationUsageService.java index e0486208942d4..f8eaa4e1e3540 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationUsageService.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationUsageService.java @@ -19,12 +19,15 @@ package org.elasticsearch.search.aggregations.support; +import org.elasticsearch.node.ReportingService; + import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.LongAdder; -public class AggregationUsageService { +public class AggregationUsageService implements ReportingService { private final Map> aggs; + private final AggregationInfo info; public static final String OTHER_SUBTYPE = "other"; @@ -54,6 +57,7 @@ public AggregationUsageService build() { private AggregationUsageService(Builder builder) { this.aggs = builder.aggs; + info = new AggregationInfo(aggs); } public void incAggregationUsage(String aggregationName, String valuesSourceType) { @@ -85,4 +89,9 @@ public Map getUsageStats() { }); return aggsUsageMap; } + + @Override + public AggregationInfo info() { + return info; + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfoTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfoTests.java index 6a4aa92c81b29..1747767c3e4d0 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfoTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/info/NodeInfoTests.java @@ -57,6 +57,7 @@ public void testGetInfo() { null, null, null, + null, null); // OsInfo is absent diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index 58756f016daa6..8f00edb6779c1 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -128,6 +128,6 @@ private static NodeInfo createNodeInfo(String nodeId, String transportType, Stri } return new NodeInfo(null, null, new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), null), - settings.build(), null, null, null, null, null, null, null, null, null); + settings.build(), null, null, null, null, null, null, null, null, null, null); } } diff --git a/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java b/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java index 41632c5198898..fbfc0df558274 100644 --- a/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java +++ b/server/src/test/java/org/elasticsearch/nodesinfo/NodeInfoStreamingTests.java @@ -39,6 +39,8 @@ import org.elasticsearch.monitor.os.OsInfo; import org.elasticsearch.monitor.process.ProcessInfo; import org.elasticsearch.plugins.PluginInfo; +import org.elasticsearch.search.aggregations.support.AggregationInfo; +import org.elasticsearch.search.aggregations.support.AggregationUsageService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; @@ -168,12 +170,35 @@ private static NodeInfo createNodeInfo() { ingestInfo = new IngestInfo(processors); } + AggregationInfo aggregationInfo = null; + if (randomBoolean()) { + AggregationUsageService.Builder builder = new AggregationUsageService.Builder(); + int numOfAggs = randomIntBetween(0, 10); + for (int i = 0; i < numOfAggs; i++) { + String aggName = randomAlphaOfLength(10); + + try { + if (randomBoolean()) { + builder.registerAggregationUsage(aggName); + } else { + int numOfTypes = randomIntBetween(1, 10); + for (int j = 0; j < numOfTypes; j++) { + builder.registerAggregationUsage(aggName, randomAlphaOfLength(10)); + } + } + } catch (IllegalArgumentException ex) { + // Ignore duplicate strings + } + } + aggregationInfo = builder.build().info(); + } + ByteSizeValue indexingBuffer = null; if (randomBoolean()) { // pick a random long that sometimes exceeds an int: indexingBuffer = new ByteSizeValue(random().nextLong() & ((1L<<40)-1)); } return new NodeInfo(VersionUtils.randomVersion(random()), build, node, settings, osInfo, process, jvm, - threadPoolInfo, transport, httpInfo, pluginsAndModules, ingestInfo, indexingBuffer); + threadPoolInfo, transport, httpInfo, pluginsAndModules, ingestInfo, aggregationInfo, indexingBuffer); } }