Skip to content

Commit

Permalink
Add aggregation list to node info (#60074) (#60256)
Browse files Browse the repository at this point in the history
Adds a full list of supported aggregations to the node info API. This list
will be used in transform tests and telemetry mapping tests that will be added
as follow-up PRs.

Fixes #59774
  • Loading branch information
imotov authored Jul 28, 2020
1 parent c7bfb5d commit 0dd53b7
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"node_info test aggregations":
- skip:
version: " - 7.9.99"
reason: "aggregation info only supported in 8.0.0+"
features: [arbitrary_key]


- do:
nodes.info: {}
- set:
nodes._arbitrary_key_: node_id

- do:
nodes.info:
metric: [ aggregations ]

- match : { nodes.$node_id.aggregations.filter: { "types": ["other"] } }
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.search.aggregations.support.AggregationInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;

Expand Down Expand Up @@ -82,12 +83,15 @@ public NodeInfo(StreamInput in) throws IOException {
addInfoIfNonNull(HttpInfo.class, in.readOptionalWriteable(HttpInfo::new));
addInfoIfNonNull(PluginsAndModules.class, in.readOptionalWriteable(PluginsAndModules::new));
addInfoIfNonNull(IngestInfo.class, in.readOptionalWriteable(IngestInfo::new));
if (in.getVersion().onOrAfter(Version.V_7_10_0)) {
addInfoIfNonNull(AggregationInfo.class, in.readOptionalWriteable(AggregationInfo::new));
}
}

public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Settings settings,
@Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable ThreadPoolInfo threadPool,
@Nullable TransportInfo transport, @Nullable HttpInfo http, @Nullable PluginsAndModules plugins,
@Nullable IngestInfo ingest, @Nullable ByteSizeValue totalIndexingBuffer) {
@Nullable IngestInfo ingest, @Nullable AggregationInfo aggsInfo, @Nullable ByteSizeValue totalIndexingBuffer) {
super(node);
this.version = version;
this.build = build;
Expand All @@ -100,6 +104,7 @@ public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Sett
addInfoIfNonNull(HttpInfo.class, http);
addInfoIfNonNull(PluginsAndModules.class, plugins);
addInfoIfNonNull(IngestInfo.class, ingest);
addInfoIfNonNull(AggregationInfo.class, aggsInfo);
this.totalIndexingBuffer = totalIndexingBuffer;
}

Expand Down Expand Up @@ -187,5 +192,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(getInfo(HttpInfo.class));
out.writeOptionalWriteable(getInfo(PluginsAndModules.class));
out.writeOptionalWriteable(getInfo(IngestInfo.class));
if (out.getVersion().onOrAfter(Version.V_7_10_0)) {
out.writeOptionalWriteable(getInfo(AggregationInfo.class));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public enum Metric {
HTTP("http"),
PLUGINS("plugins"),
INGEST("ingest"),
AGGREGATIONS("aggregations"),
INDICES("indices");

private String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.search.aggregations.support.AggregationInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;

Expand Down Expand Up @@ -126,6 +127,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (nodeInfo.getInfo(IngestInfo.class) != null) {
nodeInfo.getInfo(IngestInfo.class).toXContent(builder, params);
}
if (nodeInfo.getInfo(AggregationInfo.class) != null) {
nodeInfo.getInfo(AggregationInfo.class).toXContent(builder, params);
}

builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) {
metrics.contains(NodesInfoRequest.Metric.HTTP.metricName()),
metrics.contains(NodesInfoRequest.Metric.PLUGINS.metricName()),
metrics.contains(NodesInfoRequest.Metric.INGEST.metricName()),
metrics.contains(NodesInfoRequest.Metric.AGGREGATIONS.metricName()),
metrics.contains(NodesInfoRequest.Metric.INDICES.metricName()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce

@Override
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest) {
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false);
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
true, true, true, false, true, false, false, false, false, false, true, false, false, false);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ protected Node(final Environment initialEnvironment,
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptService,
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
searchTransportService, indexingLimits);
searchTransportService, indexingLimits, searchModule.getValuesSourceRegistry().getUsageService());

final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptService, bigArrays, searchModule.getFetchPhase(),
Expand Down
9 changes: 7 additions & 2 deletions server/src/main/java/org/elasticsearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -61,6 +62,7 @@ public class NodeService implements Closeable {
private final ResponseCollectorService responseCollectorService;
private final SearchTransportService searchTransportService;
private final IndexingPressure indexingPressure;
private final AggregationUsageService aggregationUsageService;

private final Discovery discovery;

Expand All @@ -69,7 +71,8 @@ public class NodeService implements Closeable {
CircuitBreakerService circuitBreakerService, ScriptService scriptService,
@Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService,
SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService,
SearchTransportService searchTransportService, IndexingPressure indexingPressure) {
SearchTransportService searchTransportService, IndexingPressure indexingPressure,
AggregationUsageService aggregationUsageService) {
this.settings = settings;
this.threadPool = threadPool;
this.monitorService = monitorService;
Expand All @@ -85,11 +88,12 @@ public class NodeService implements Closeable {
this.responseCollectorService = responseCollectorService;
this.searchTransportService = searchTransportService;
this.indexingPressure = indexingPressure;
this.aggregationUsageService = aggregationUsageService;
clusterService.addStateApplier(ingestService);
}

public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool,
boolean transport, boolean http, boolean plugin, boolean ingest, boolean indices) {
boolean transport, boolean http, boolean plugin, boolean ingest, boolean aggs, boolean indices) {
return new NodeInfo(Version.CURRENT, Build.CURRENT, transportService.getLocalNode(),
settings ? settingsFilter.filter(this.settings) : null,
os ? monitorService.osService().info() : null,
Expand All @@ -100,6 +104,7 @@ public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm,
http ? (httpServerTransport == null ? null : httpServerTransport.info()) : null,
plugin ? (pluginService == null ? null : pluginService.info()) : null,
ingest ? (ingestService == null ? null : ingestService.info()) : null,
aggs ? (aggregationUsageService == null ? null : aggregationUsageService.info()) : null,
indices ? indicesService.getTotalIndexingBufferBytes() : null
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.support;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.node.ReportingService;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.LongAdder;

public class AggregationInfo implements ReportingService.Info {

private final Map<String, Set<String>> aggs;

AggregationInfo(Map<String, Map<String, LongAdder>> aggs) {
// we use a treemap/treeset here to have a test-able / predictable order
Map<String, Set<String>> aggsMap = new TreeMap<>();
aggs.forEach((s, m) -> aggsMap.put(s, Collections.unmodifiableSet(new TreeSet<>(m.keySet()))));
this.aggs = Collections.unmodifiableMap(aggsMap);
}

/**
* Read from a stream.
*/
public AggregationInfo(StreamInput in) throws IOException {
aggs = new TreeMap<>();
final int size = in.readVInt();
for (int i = 0; i < size; i++) {
String key = in.readString();
final int keys = in.readVInt();
final Set<String> types = new TreeSet<>();
for (int j = 0; j < keys; j ++) {
types.add(in.readString());
}
aggs.put(key, types);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(aggs.size());
for (Map.Entry<String, Set<String>> e : aggs.entrySet()) {
out.writeString(e.getKey());
out.writeVInt(e.getValue().size());
for (String type : e.getValue()) {
out.writeString(type);
}
}
}

public Map<String, Set<String>> getAggregations() {
return aggs;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("aggregations");
for (Map.Entry<String, Set<String>> e : aggs.entrySet()) {
builder.startObject(e.getKey());
builder.startArray("types");
for (String s : e.getValue()) {
builder.value(s);
}
builder.endArray();
builder.endObject();
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AggregationInfo that = (AggregationInfo) o;
return Objects.equals(aggs, that.aggs);
}

@Override
public int hashCode() {
return Objects.hash(aggs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

package org.elasticsearch.search.aggregations.support;

import org.elasticsearch.node.ReportingService;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;

public class AggregationUsageService {
public class AggregationUsageService implements ReportingService<AggregationInfo> {
private final Map<String, Map<String, LongAdder>> aggs;
private final AggregationInfo info;

public static final String OTHER_SUBTYPE = "other";

Expand Down Expand Up @@ -54,6 +57,7 @@ public AggregationUsageService build() {

private AggregationUsageService(Builder builder) {
this.aggs = builder.aggs;
info = new AggregationInfo(aggs);
}

public void incAggregationUsage(String aggregationName, String valuesSourceType) {
Expand Down Expand Up @@ -85,4 +89,9 @@ public Map<String, Object> getUsageStats() {
});
return aggsUsageMap;
}

@Override
public AggregationInfo info() {
return info;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void testGetInfo() {
null,
null,
null,
null,
null);

// OsInfo is absent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,6 @@ private static NodeInfo createNodeInfo(String nodeId, String transportType, Stri
}
return new NodeInfo(null, null,
new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), null),
settings.build(), null, null, null, null, null, null, null, null, null);
settings.build(), null, null, null, null, null, null, null, null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.plugins.PluginInfo;
import org.elasticsearch.search.aggregations.support.AggregationInfo;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -168,12 +170,35 @@ private static NodeInfo createNodeInfo() {
ingestInfo = new IngestInfo(processors);
}

AggregationInfo aggregationInfo = null;
if (randomBoolean()) {
AggregationUsageService.Builder builder = new AggregationUsageService.Builder();
int numOfAggs = randomIntBetween(0, 10);
for (int i = 0; i < numOfAggs; i++) {
String aggName = randomAlphaOfLength(10);

try {
if (randomBoolean()) {
builder.registerAggregationUsage(aggName);
} else {
int numOfTypes = randomIntBetween(1, 10);
for (int j = 0; j < numOfTypes; j++) {
builder.registerAggregationUsage(aggName, randomAlphaOfLength(10));
}
}
} catch (IllegalArgumentException ex) {
// Ignore duplicate strings
}
}
aggregationInfo = builder.build().info();
}

ByteSizeValue indexingBuffer = null;
if (randomBoolean()) {
// pick a random long that sometimes exceeds an int:
indexingBuffer = new ByteSizeValue(random().nextLong() & ((1L<<40)-1));
}
return new NodeInfo(VersionUtils.randomVersion(random()), build, node, settings, osInfo, process, jvm,
threadPoolInfo, transport, httpInfo, pluginsAndModules, ingestInfo, indexingBuffer);
threadPoolInfo, transport, httpInfo, pluginsAndModules, ingestInfo, aggregationInfo, indexingBuffer);
}
}

0 comments on commit 0dd53b7

Please sign in to comment.