Skip to content

Commit

Permalink
Use Long instead of Double for allocation disk usage APM metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pxsalehi committed Nov 13, 2024
1 parent 6325e46 commit ea592e3
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
Expand Down Expand Up @@ -56,8 +60,15 @@ public void testDesiredBalanceGaugeMetricsAreOnlyPublishedByCurrentMaster() thro
public void testDesiredBalanceMetrics() {
internalCluster().startNodes(2);
prepareCreate("test").setSettings(indexSettings(2, 1)).get();
indexRandom(randomBoolean(), "test", between(50, 100));
ensureGreen();

indexRandom(randomBoolean(), "test", between(50, 100));
flush("test");
// Make sure new cluster info is available
final var infoService = (InternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class);
ClusterInfoServiceUtils.setUpdateFrequency(infoService, TimeValue.timeValueMillis(200));
assertNotNull("info should not be null", ClusterInfoServiceUtils.refresh(infoService));

final var telemetryPlugin = getTelemetryPlugin(internalCluster().getMasterName());
telemetryPlugin.collect();
assertThat(telemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.UNASSIGNED_SHARDS_METRIC_NAME), not(empty()));
Expand All @@ -73,7 +84,7 @@ public void testDesiredBalanceMetrics() {
);
assertThat(desiredBalanceNodeWeightsMetrics.size(), equalTo(2));
for (var nodeStat : desiredBalanceNodeWeightsMetrics) {
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
assertTrue(nodeStat.isDouble());
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
Expand Down Expand Up @@ -104,6 +115,7 @@ public void testDesiredBalanceMetrics() {
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
assertTrue(desiredBalanceNodeDiskUsageMetrics.stream().anyMatch(m -> m.getDouble() > 0.0));
final var currentNodeShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement(
DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME
);
Expand All @@ -122,15 +134,16 @@ public void testDesiredBalanceMetrics() {
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
final var currentNodeDiskUsageMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
final var currentNodeDiskUsageMetrics = telemetryPlugin.getLongGaugeMeasurement(
DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME
);
assertThat(currentNodeDiskUsageMetrics.size(), equalTo(2));
for (var nodeStat : currentNodeDiskUsageMetrics) {
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
assertThat(nodeStat.value().longValue(), greaterThanOrEqualTo(0L));
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
assertTrue(currentNodeDiskUsageMetrics.stream().anyMatch(m -> m.getLong() > 0L));
final var currentNodeUndesiredShardCountMetrics = telemetryPlugin.getLongGaugeMeasurement(
DesiredBalanceMetrics.CURRENT_NODE_UNDESIRED_SHARD_COUNT_METRIC_NAME
);
Expand All @@ -140,15 +153,16 @@ public void testDesiredBalanceMetrics() {
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
final var currentNodeForecastedDiskUsageMetrics = telemetryPlugin.getDoubleGaugeMeasurement(
final var currentNodeForecastedDiskUsageMetrics = telemetryPlugin.getLongGaugeMeasurement(
DesiredBalanceMetrics.CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME
);
assertThat(currentNodeForecastedDiskUsageMetrics.size(), equalTo(2));
for (var nodeStat : currentNodeForecastedDiskUsageMetrics) {
assertThat(nodeStat.value().doubleValue(), greaterThanOrEqualTo(0.0));
assertThat(nodeStat.value().longValue(), greaterThanOrEqualTo(0L));
assertThat((String) nodeStat.attributes().get("node_id"), is(in(nodeIds)));
assertThat((String) nodeStat.attributes().get("node_name"), is(in(nodeNames)));
}
assertTrue(currentNodeForecastedDiskUsageMetrics.stream().anyMatch(m -> m.getLong() > 0L));
}

private static void assertOnlyMasterIsPublishingMetrics() {
Expand Down Expand Up @@ -182,10 +196,10 @@ private static void assertMetricsAreBeingPublished(String nodeName, boolean shou
matcher
);
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_WRITE_LOAD_METRIC_NAME), matcher);
assertThat(testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME), matcher);
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_DISK_USAGE_METRIC_NAME), matcher);
assertThat(testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_SHARD_COUNT_METRIC_NAME), matcher);
assertThat(
testTelemetryPlugin.getDoubleGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME),
testTelemetryPlugin.getLongGaugeMeasurement(DesiredBalanceMetrics.CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME),
matcher
);
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public DesiredBalanceMetrics(MeterRegistry meterRegistry) {
"threads",
this::getCurrentNodeWriteLoadMetrics
);
meterRegistry.registerDoublesGauge(
meterRegistry.registerLongsGauge(
CURRENT_NODE_DISK_USAGE_METRIC_NAME,
"The current disk usage of nodes",
"bytes",
Expand All @@ -148,7 +148,7 @@ public DesiredBalanceMetrics(MeterRegistry meterRegistry) {
"unit",
this::getCurrentNodeShardCountMetrics
);
meterRegistry.registerDoublesGauge(
meterRegistry.registerLongsGauge(
CURRENT_NODE_FORECASTED_DISK_USAGE_METRIC_NAME,
"The current forecasted disk usage of nodes",
"bytes",
Expand Down Expand Up @@ -231,16 +231,16 @@ private List<LongWithAttributes> getDesiredBalanceNodeShardCountMetrics() {
return values;
}

private List<DoubleWithAttributes> getCurrentNodeDiskUsageMetrics() {
private List<LongWithAttributes> getCurrentNodeDiskUsageMetrics() {
if (nodeIsMaster == false) {
return List.of();
}
var stats = allocationStatsPerNodeRef.get();
List<DoubleWithAttributes> doubles = new ArrayList<>(stats.size());
List<LongWithAttributes> values = new ArrayList<>(stats.size());
for (var node : stats.keySet()) {
doubles.add(new DoubleWithAttributes(stats.get(node).currentDiskUsage(), getNodeAttributes(node)));
values.add(new LongWithAttributes(stats.get(node).currentDiskUsage(), getNodeAttributes(node)));
}
return doubles;
return values;
}

private List<DoubleWithAttributes> getCurrentNodeWriteLoadMetrics() {
Expand All @@ -267,16 +267,16 @@ private List<LongWithAttributes> getCurrentNodeShardCountMetrics() {
return values;
}

private List<DoubleWithAttributes> getCurrentNodeForecastedDiskUsageMetrics() {
private List<LongWithAttributes> getCurrentNodeForecastedDiskUsageMetrics() {
if (nodeIsMaster == false) {
return List.of();
}
var stats = allocationStatsPerNodeRef.get();
List<DoubleWithAttributes> doubles = new ArrayList<>(stats.size());
List<LongWithAttributes> values = new ArrayList<>(stats.size());
for (var node : stats.keySet()) {
doubles.add(new DoubleWithAttributes(stats.get(node).forecastedDiskUsage(), getNodeAttributes(node)));
values.add(new LongWithAttributes(stats.get(node).forecastedDiskUsage(), getNodeAttributes(node)));
}
return doubles;
return values;
}

private List<LongWithAttributes> getCurrentNodeUndesiredShardCountMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.core.TimeValue;

import java.util.concurrent.TimeUnit;

Expand All @@ -37,4 +38,8 @@ protected boolean blockingAllowed() {
throw new AssertionError(e);
}
}

public static void setUpdateFrequency(InternalClusterInfoService internalClusterInfoService, TimeValue updateFrequency) {
internalClusterInfoService.setUpdateFrequency(updateFrequency);
}
}

0 comments on commit ea592e3

Please sign in to comment.