Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aggregation list to node info #60074

Merged
merged 3 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"node_info test aggregations":
- skip:
version: " - 7.99.99"
reason: "aggregation info only supported in 8.0.0+"
features: [arbitrary_key]


- do:
nodes.info: {}
- set:
nodes._arbitrary_key_: node_id

- do:
nodes.info:
metric: [ aggregations ]

- match : { nodes.$node_id.aggregations.filter: { "types": ["other"] } }
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.search.aggregations.support.AggregationInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;

Expand Down Expand Up @@ -82,12 +83,15 @@ public NodeInfo(StreamInput in) throws IOException {
addInfoIfNonNull(HttpInfo.class, in.readOptionalWriteable(HttpInfo::new));
addInfoIfNonNull(PluginsAndModules.class, in.readOptionalWriteable(PluginsAndModules::new));
addInfoIfNonNull(IngestInfo.class, in.readOptionalWriteable(IngestInfo::new));
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
addInfoIfNonNull(AggregationInfo.class, in.readOptionalWriteable(AggregationInfo::new));
}
}

public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Settings settings,
@Nullable OsInfo os, @Nullable ProcessInfo process, @Nullable JvmInfo jvm, @Nullable ThreadPoolInfo threadPool,
@Nullable TransportInfo transport, @Nullable HttpInfo http, @Nullable PluginsAndModules plugins,
@Nullable IngestInfo ingest, @Nullable ByteSizeValue totalIndexingBuffer) {
@Nullable IngestInfo ingest, @Nullable AggregationInfo aggsInfo, @Nullable ByteSizeValue totalIndexingBuffer) {
super(node);
this.version = version;
this.build = build;
Expand All @@ -100,6 +104,7 @@ public NodeInfo(Version version, Build build, DiscoveryNode node, @Nullable Sett
addInfoIfNonNull(HttpInfo.class, http);
addInfoIfNonNull(PluginsAndModules.class, plugins);
addInfoIfNonNull(IngestInfo.class, ingest);
addInfoIfNonNull(AggregationInfo.class, aggsInfo);
this.totalIndexingBuffer = totalIndexingBuffer;
}

Expand Down Expand Up @@ -187,5 +192,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(getInfo(HttpInfo.class));
out.writeOptionalWriteable(getInfo(PluginsAndModules.class));
out.writeOptionalWriteable(getInfo(IngestInfo.class));
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalWriteable(getInfo(AggregationInfo.class));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public enum Metric {
HTTP("http"),
PLUGINS("plugins"),
INGEST("ingest"),
AGGREGATIONS("aggregations"),
INDICES("indices");

private String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.search.aggregations.support.AggregationInfo;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.transport.TransportInfo;

Expand Down Expand Up @@ -126,6 +127,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (nodeInfo.getInfo(IngestInfo.class) != null) {
nodeInfo.getInfo(IngestInfo.class).toXContent(builder, params);
}
if (nodeInfo.getInfo(AggregationInfo.class) != null) {
nodeInfo.getInfo(AggregationInfo.class).toXContent(builder, params);
}

builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest, Task task) {
metrics.contains(NodesInfoRequest.Metric.HTTP.metricName()),
metrics.contains(NodesInfoRequest.Metric.PLUGINS.metricName()),
metrics.contains(NodesInfoRequest.Metric.INGEST.metricName()),
metrics.contains(NodesInfoRequest.Metric.AGGREGATIONS.metricName()),
metrics.contains(NodesInfoRequest.Metric.INDICES.metricName()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOExce

@Override
protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest, Task task) {
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false);
NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, true, false, false, false);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE,
true, true, true, false, true, false, false, false, false, false, true, false, false, false);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ protected Node(final Environment initialEnvironment,
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptService,
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
searchTransportService, indexingLimits);
searchTransportService, indexingLimits, searchModule.getValuesSourceRegistry().getUsageService());

final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptService, bigArrays, searchModule.getFetchPhase(),
Expand Down
9 changes: 7 additions & 2 deletions server/src/main/java/org/elasticsearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -61,6 +62,7 @@ public class NodeService implements Closeable {
private final ResponseCollectorService responseCollectorService;
private final SearchTransportService searchTransportService;
private final IndexingPressure indexingPressure;
private final AggregationUsageService aggregationUsageService;

private final Discovery discovery;

Expand All @@ -69,7 +71,8 @@ public class NodeService implements Closeable {
CircuitBreakerService circuitBreakerService, ScriptService scriptService,
@Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService,
SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService,
SearchTransportService searchTransportService, IndexingPressure indexingPressure) {
SearchTransportService searchTransportService, IndexingPressure indexingPressure,
AggregationUsageService aggregationUsageService) {
this.settings = settings;
this.threadPool = threadPool;
this.monitorService = monitorService;
Expand All @@ -85,11 +88,12 @@ public class NodeService implements Closeable {
this.responseCollectorService = responseCollectorService;
this.searchTransportService = searchTransportService;
this.indexingPressure = indexingPressure;
this.aggregationUsageService = aggregationUsageService;
clusterService.addStateApplier(ingestService);
}

public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool,
boolean transport, boolean http, boolean plugin, boolean ingest, boolean indices) {
boolean transport, boolean http, boolean plugin, boolean ingest, boolean aggs, boolean indices) {
return new NodeInfo(Version.CURRENT, Build.CURRENT, transportService.getLocalNode(),
settings ? settingsFilter.filter(this.settings) : null,
os ? monitorService.osService().info() : null,
Expand All @@ -100,6 +104,7 @@ public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm,
http ? (httpServerTransport == null ? null : httpServerTransport.info()) : null,
plugin ? (pluginService == null ? null : pluginService.info()) : null,
ingest ? (ingestService == null ? null : ingestService.info()) : null,
aggs ? (aggregationUsageService == null ? null : aggregationUsageService.info()) : null,
indices ? indicesService.getTotalIndexingBufferBytes() : null
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.support;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.node.ReportingService;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.LongAdder;

public class AggregationInfo implements ReportingService.Info {

private final Map<String, Set<String>> aggs;

AggregationInfo(Map<String, Map<String, LongAdder>> aggs) {
// we use a treemap/treeset here to have a test-able / predictable order
Map<String, Set<String>> aggsMap = new TreeMap<>();
aggs.forEach((s, m) -> aggsMap.put(s, Collections.unmodifiableSet(new TreeSet<>(m.keySet()))));
this.aggs = Collections.unmodifiableMap(aggsMap);
}

/**
* Read from a stream.
*/
public AggregationInfo(StreamInput in) throws IOException {
aggs = new TreeMap<>();
final int size = in.readVInt();
for (int i = 0; i < size; i++) {
String key = in.readString();
final int keys = in.readVInt();
final Set<String> types = new TreeSet<>();
for (int j = 0; j < keys; j ++) {
types.add(in.readString());
}
aggs.put(key, types);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(aggs.size());
for (Map.Entry<String, Set<String>> e : aggs.entrySet()) {
out.writeString(e.getKey());
out.writeVInt(e.getValue().size());
for (String type : e.getValue()) {
out.writeString(type);
}
}
}

public Map<String, Set<String>> getAggregations() {
return aggs;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("aggregations");
for (Map.Entry<String, Set<String>> e : aggs.entrySet()) {
builder.startObject(e.getKey());
builder.startArray("types");
for (String s : e.getValue()) {
builder.value(s);
}
builder.endArray();
builder.endObject();
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AggregationInfo that = (AggregationInfo) o;
return Objects.equals(aggs, that.aggs);
}

@Override
public int hashCode() {
return Objects.hash(aggs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

package org.elasticsearch.search.aggregations.support;

import org.elasticsearch.node.ReportingService;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.LongAdder;

public class AggregationUsageService {
public class AggregationUsageService implements ReportingService<AggregationInfo> {
private final Map<String, Map<String, LongAdder>> aggs;
private final AggregationInfo info;

public static final String OTHER_SUBTYPE = "other";

Expand Down Expand Up @@ -54,6 +57,7 @@ public AggregationUsageService build() {

private AggregationUsageService(Builder builder) {
this.aggs = builder.aggs;
info = new AggregationInfo(aggs);
}

public void incAggregationUsage(String aggregationName, String valuesSourceType) {
Expand Down Expand Up @@ -85,4 +89,9 @@ public Map<String, Object> getUsageStats() {
});
return aggsUsageMap;
}

@Override
public AggregationInfo info() {
return info;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void testGetInfo() {
null,
null,
null,
null,
null);

// OsInfo is absent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,6 @@ private static NodeInfo createNodeInfo(String nodeId, String transportType, Stri
}
return new NodeInfo(null, null,
new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), null),
settings.build(), null, null, null, null, null, null, null, null, null);
settings.build(), null, null, null, null, null, null, null, null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.elasticsearch.monitor.os.OsInfo;
import org.elasticsearch.monitor.process.ProcessInfo;
import org.elasticsearch.plugins.PluginInfo;
import org.elasticsearch.search.aggregations.support.AggregationInfo;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -168,12 +170,35 @@ private static NodeInfo createNodeInfo() {
ingestInfo = new IngestInfo(processors);
}

AggregationInfo aggregationInfo = null;
if (randomBoolean()) {
AggregationUsageService.Builder builder = new AggregationUsageService.Builder();
int numOfAggs = randomIntBetween(0, 10);
for (int i = 0; i < numOfAggs; i++) {
String aggName = randomAlphaOfLength(10);

try {
if (randomBoolean()) {
builder.registerAggregationUsage(aggName);
} else {
int numOfTypes = randomIntBetween(1, 10);
for (int j = 0; j < numOfTypes; j++) {
builder.registerAggregationUsage(aggName, randomAlphaOfLength(10));
}
}
} catch (IllegalArgumentException ex) {
// Ignore duplicate strings
}
}
aggregationInfo = builder.build().info();
}

ByteSizeValue indexingBuffer = null;
if (randomBoolean()) {
// pick a random long that sometimes exceeds an int:
indexingBuffer = new ByteSizeValue(random().nextLong() & ((1L<<40)-1));
}
return new NodeInfo(VersionUtils.randomVersion(random()), build, node, settings, osInfo, process, jvm,
threadPoolInfo, transport, httpInfo, pluginsAndModules, ingestInfo, indexingBuffer);
threadPoolInfo, transport, httpInfo, pluginsAndModules, ingestInfo, aggregationInfo, indexingBuffer);
}
}