Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aggregation list to node info #60074

Merged
merged 3 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
"node_info test aggregations":
- skip:
version: " - 7.99.99"
reason: "aggregation info only supported in 8.0.0+"
features: [arbitrary_key]


- do:
nodes.info: {}
- set:
nodes._arbitrary_key_: node_id

- do:
nodes.info:
metric: [ aggregations ]

# if this test failed because a new aggregation was added, please open an issues in the elastic/telemetry repository
# so they can update the mapping accordingly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment seems abit out of place, and probably shouldn't reference a private repo.

also, I guess I don't understand the output. is it aggregrations.??.types = ?? . Is there anything that prevents checking for known types

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was sure I removed it. This comment is out of place. Good catch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The output looks like this:

      "aggregations" : {
        "adjacency_matrix" : {
          "types" : [
            "other"
          ]
        },
        "auto_date_histogram" : {
          "types" : [
            "boolean",
            "date",
            "numeric"
          ]
        },
        "avg" : {
          "types" : [
            "boolean",
            "date",
            "histogram",
            "numeric"
          ]
        },
        "boxplot" : {
          "types" : [
            "histogram",
            "numeric"
          ]
        },
        "cardinality" : {
          "types" : [
            "boolean",
            "bytes",
            "date",
            "geopoint",
            "geoshape",
            "ip",
            "numeric",
            "range"
          ]
        },
        ....

and both types and aggs change depending on license that this is ran under.

- match : { nodes.$node_id.aggregations.filter: { "types": ["other"] } }
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.search.aggregations.AggregationInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;

Expand Down Expand Up @@ -82,12 +83,15 @@ public NodeInfo(StreamInput in) throws IOException {
addInfoIfNonNull(HttpInfo.class, in.readOptionalWriteable(HttpInfo::new));
addInfoIfNonNull(PluginsAndModules.class, in.readOptionalWriteable(PluginsAndModules::new));
addInfoIfNonNull(IngestInfo.class, in.readOptionalWriteable(IngestInfo::new));
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
addInfoIfNonNull(AggregationInfo.class, in.readOptionalWriteable(AggregationInfo::new));
}
}

public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Settings settings,
@Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable ThreadPoolInfo threadPool,
@Nullable TransportInfo transport, @Nullable HttpInfo http, @Nullable PluginsAndModules plugins,
@Nullable IngestInfo ingest, @Nullable ByteSizeValue totalIndexingBuffer) {
@Nullable IngestInfo ingest, @Nullable AggregationInfo aggsInfo, @Nullable ByteSizeValue totalIndexingBuffer) {
super(node);
this.version = version;
this.build = build;
Expand All @@ -100,6 +104,7 @@ public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Sett
addInfoIfNonNull(HttpInfo.class, http);
addInfoIfNonNull(PluginsAndModules.class, plugins);
addInfoIfNonNull(IngestInfo.class, ingest);
addInfoIfNonNull(AggregationInfo.class, aggsInfo);
this.totalIndexingBuffer = totalIndexingBuffer;
}

Expand Down Expand Up @@ -187,5 +192,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(getInfo(HttpInfo.class));
out.writeOptionalWriteable(getInfo(PluginsAndModules.class));
out.writeOptionalWriteable(getInfo(IngestInfo.class));
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalWriteable(getInfo(AggregationInfo.class));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public enum Metric {
HTTP("http"),
PLUGINS("plugins"),
INGEST("ingest"),
AGGREGATIONS("aggregations"),
INDICES("indices");

private String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.search.aggregations.AggregationInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;

Expand Down Expand Up @@ -126,6 +127,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (nodeInfo.getInfo(IngestInfo.class) != null) {
nodeInfo.getInfo(IngestInfo.class).toXContent(builder, params);
}
if (nodeInfo.getInfo(AggregationInfo.class) != null) {
nodeInfo.getInfo(AggregationInfo.class).toXContent(builder, params);
}

builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest, Task task) {
metrics.contains(NodesInfoRequest.Metric.HTTP.metricName()),
metrics.contains(NodesInfoRequest.Metric.PLUGINS.metricName()),
metrics.contains(NodesInfoRequest.Metric.INGEST.metricName()),
metrics.contains(NodesInfoRequest.Metric.AGGREGATIONS.metricName()),
metrics.contains(NodesInfoRequest.Metric.INDICES.metricName()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce

@Override
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest, Task task) {
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false);
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
true, true, true, false, true, false, false, false, false, false, true, false, false, false);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ protected Node(final Environment initialEnvironment,
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptService,
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
searchTransportService, indexingLimits);
searchTransportService, indexingLimits, searchModule.getValuesSourceRegistry().getUsageService());

final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptService, bigArrays, searchModule.getFetchPhase(),
Expand Down
9 changes: 7 additions & 2 deletions server/src/main/java/org/elasticsearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -61,6 +62,7 @@ public class NodeService implements Closeable {
private final ResponseCollectorService responseCollectorService;
private final SearchTransportService searchTransportService;
private final IndexingPressure indexingPressure;
private final AggregationUsageService aggregationUsageService;

private final Discovery discovery;

Expand All @@ -69,7 +71,8 @@ public class NodeService implements Closeable {
CircuitBreakerService circuitBreakerService, ScriptService scriptService,
@Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService,
SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService,
SearchTransportService searchTransportService, IndexingPressure indexingPressure) {
SearchTransportService searchTransportService, IndexingPressure indexingPressure,
AggregationUsageService aggregationUsageService) {
this.settings = settings;
this.threadPool = threadPool;
this.monitorService = monitorService;
Expand All @@ -85,11 +88,12 @@ public class NodeService implements Closeable {
this.responseCollectorService = responseCollectorService;
this.searchTransportService = searchTransportService;
this.indexingPressure = indexingPressure;
this.aggregationUsageService = aggregationUsageService;
clusterService.addStateApplier(ingestService);
}

public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool,
boolean transport, boolean http, boolean plugin, boolean ingest, boolean indices) {
boolean transport, boolean http, boolean plugin, boolean ingest, boolean aggs, boolean indices) {
return new NodeInfo(Version.CURRENT, Build.CURRENT, transportService.getLocalNode(),
settings ? settingsFilter.filter(this.settings) : null,
os ? monitorService.osService().info() : null,
Expand All @@ -100,6 +104,7 @@ public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm,
http ? (httpServerTransport == null ? null : httpServerTransport.info()) : null,
plugin ? (pluginService == null ? null : pluginService.info()) : null,
ingest ? (ingestService == null ? null : ingestService.info()) : null,
aggs ? (aggregationUsageService == null ? null : aggregationUsageService.info()) : null,
indices ? indicesService.getTotalIndexingBufferBytes() : null
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.node.ReportingService;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

public class AggregationInfo implements ReportingService.Info {

private final Map<String, Set<String>> aggs;

public AggregationInfo(Map<String, Set<String>> aggs) {
this.aggs = aggs;
}

/**
* Read from a stream.
*/
public AggregationInfo(StreamInput in) throws IOException {
aggs = new TreeMap<>();
final int size = in.readVInt();
for (int i = 0; i < size; i++) {
String key = in.readString();
final int keys = in.readVInt();
final Set<String> types = new TreeSet<>();
for (int j = 0; j < keys; j ++) {
types.add(in.readString());
}
aggs.put(key, types);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(aggs.size());
for (Map.Entry<String, Set<String>> e : aggs.entrySet()) {
out.writeString(e.getKey());
out.writeVInt(e.getValue().size());
for (String type : e.getValue()) {
out.writeString(type);
}
}
}

public Map<String, Set<String>> getProcessors() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getProcessors() ? I assume the name is off, but is this even needed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy and paste error. Will fix.

return aggs;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're leaking a mutable reference here. It doesn't look like we mutate it anywhere, might be worth wrapping it to be immutable, if the object creation cost for the wrapper isn't too much.

}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("aggregations");
for (Map.Entry<String, Set<String>> e : aggs.entrySet()) {
builder.startObject(e.getKey());
builder.startArray("types");
for (String s : e.getValue()) {
builder.value(s);
}
builder.endArray();
builder.endObject();
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AggregationInfo that = (AggregationInfo) o;
return Objects.equals(aggs, that.aggs);
}

@Override
public int hashCode() {
return Objects.hash(aggs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@

package org.elasticsearch.search.aggregations.support;

import org.elasticsearch.node.ReportingService;
import org.elasticsearch.search.aggregations.AggregationInfo;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.LongAdder;

public class AggregationUsageService {
public class AggregationUsageService implements ReportingService<AggregationInfo> {
private final Map<String, Map<String, LongAdder>> aggs;
private final AggregationInfo info;

public static final String OTHER_SUBTYPE = "other";

Expand Down Expand Up @@ -54,6 +61,10 @@ public AggregationUsageService build() {

private AggregationUsageService(Builder builder) {
this.aggs = builder.aggs;
// we use a treemap/treeset here to have a test-able / predictable order
Map<String, Set<String>> aggsInfo = new TreeMap<>();
aggs.forEach((s, m) -> aggsInfo.put(s, new TreeSet<>(m.keySet())));
info = new AggregationInfo(aggsInfo);
}

public void incAggregationUsage(String aggregationName, String valuesSourceType) {
Expand Down Expand Up @@ -85,4 +96,9 @@ public Map<String, Object> getUsageStats() {
});
return aggsUsageMap;
}

@Override
public AggregationInfo info() {
return info;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void testGetInfo() {
null,
null,
null,
null,
null);

// OsInfo is absent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,6 @@ private static NodeInfo createNodeInfo(String nodeId, String transportType, Stri
}
return new NodeInfo(null, null,
new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), null),
settings.build(), null, null, null, null, null, null, null, null, null);
settings.build(), null, null, null, null, null, null, null, null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.plugins.PluginInfo;
import org.elasticsearch.search.aggregations.AggregationInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -47,10 +48,14 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -168,12 +173,22 @@ private static NodeInfo createNodeInfo() {
ingestInfo = new IngestInfo(processors);
}

AggregationInfo aggregationInfo = null;
if (randomBoolean()) {
int numOfAggs = randomIntBetween(0, 10);
Map<String, Set<String>> aggs = new TreeMap<>();
for (int i=0; i<numOfAggs; i++) {
aggs.put(randomAlphaOfLength(10), new TreeSet<>(Arrays.asList(generateRandomStringArray(10,10, false))));
}
aggregationInfo = new AggregationInfo(aggs);
}

ByteSizeValue indexingBuffer = null;
if (randomBoolean()) {
// pick a random long that sometimes exceeds an int:
indexingBuffer = new ByteSizeValue(random().nextLong() & ((1L<<40)-1));
}
return new NodeInfo(VersionUtils.randomVersion(random()), build, node, settings, osInfo, process, jvm,
threadPoolInfo, transport, httpInfo, pluginsAndModules, ingestInfo, indexingBuffer);
threadPoolInfo, transport, httpInfo, pluginsAndModules, ingestInfo, aggregationInfo, indexingBuffer);
}
}