Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry: fix polling gauges always reporting NaN on Prometheus and timers #1218

Merged
merged 1 commit into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions igor-core/igor-core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ dependencies {

// TODO(rz): Get rid of this dependency!
implementation "com.squareup.retrofit:retrofit"

testImplementation "com.netflix.spectator:spectator-reg-micrometer"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import com.netflix.spinnaker.kork.discovery.RemoteStatusChangedEvent;
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.PreDestroy;
import net.logstash.logback.argument.StructuredArguments;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -44,16 +48,16 @@ public abstract class CommonPollingMonitor<I extends DeltaItem, T extends Pollin
protected final Id missedNotificationId;
private final DiscoveryStatusListener discoveryStatusListener;
private final AtomicLong lastPoll = new AtomicLong();
private final Id itemsCachedId;
private final Id itemsOverThresholdId;
private final Id pollCycleFailedId;
private final Id pollCycleTimingId;
private final Optional<LockService> lockService;
private ScheduledFuture<?> monitor;
private final CommonPollingMonitorInstrumentation instrumentation;
protected Logger log = LoggerFactory.getLogger(getClass());
protected TaskScheduler scheduler;
protected final DynamicConfigService dynamicConfigService;

private Map<String, AtomicInteger> itemsOverThresholdMap = new ConcurrentHashMap<>();
private Map<String, AtomicInteger> itemsCachedMap = new ConcurrentHashMap<>();

public CommonPollingMonitor(
IgorConfigurationProperties igorProperties,
Registry registry,
Expand All @@ -67,12 +71,9 @@ public CommonPollingMonitor(
this.discoveryStatusListener = discoveryStatusListener;
this.lockService = lockService;
this.scheduler = scheduler;
this.instrumentation = new CommonPollingMonitorInstrumentation(registry);
Copy link
Contributor Author

@massimo-pacher-tw massimo-pacher-tw Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single instance per monitor, I know it's not needed, but given we just run a few, it's not a biggy, compared to fix all the tests and change the constructor of every subclass.
The key bit is the registry, which can be passed in for testing


itemsCachedId = registry.createId("pollingMonitor.newItems");
itemsOverThresholdId = registry.createId("pollingMonitor.itemsOverThreshold");
pollCycleFailedId = registry.createId("pollingMonitor.failed");
missedNotificationId = registry.createId("pollingMonitor.missedEchoNotification");
pollCycleTimingId = registry.createId("pollingMonitor.pollTiming");
}

@Override
Expand All @@ -86,20 +87,19 @@ public void onApplicationEvent(RemoteStatusChangedEvent event) {
this.monitor =
scheduler.schedule(
() ->
registry
.timer(pollCycleTimingId.withTag("monitor", getClass().getSimpleName()))
.record(
() -> {
if (isInService()) {
poll(true);
lastPoll.set(System.currentTimeMillis());
} else {
log.info(
"not in service (lastPoll: {})",
(lastPoll.get() == 0) ? "n/a" : lastPoll.toString());
lastPoll.set(0);
}
}),
instrumentation.trackPollCycleTime(
this.getName(),
() -> {
if (isInService()) {
poll(true);
lastPoll.set(System.currentTimeMillis());
} else {
log.info(
"not in service (lastPoll: {})",
(lastPoll.get() == 0) ? "n/a" : lastPoll.toString());
lastPoll.set(0);
}
}),
new PeriodicTrigger(getPollInterval(), TimeUnit.SECONDS));
}

Expand Down Expand Up @@ -163,7 +163,17 @@ protected String getLockName(String name, String partition) {
}

protected void internalPollSingle(PollContext ctx) {
String monitorName = getClass().getSimpleName();
String monitorName =
!StringUtils.isBlank(this.getName()) ? this.getName() : getClass().getSimpleName();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is was a bug, cause overridden function was totally ignored in metrics


itemsCachedMap.putIfAbsent(ctx.partitionName, new AtomicInteger(0));
itemsOverThresholdMap.putIfAbsent(ctx.partitionName, new AtomicInteger(0));

instrumentation.trackItemsCached(
itemsCachedMap.get(ctx.partitionName), monitorName, ctx.partitionName);

instrumentation.trackItemsOverThreshold(
itemsOverThresholdMap.get(ctx.partitionName), monitorName, ctx.partitionName);

try {
T delta = generateDelta(ctx);
Expand All @@ -175,58 +185,40 @@ protected void internalPollSingle(PollContext ctx) {
boolean sendEvents = !ctx.fastForward;
int deltaSize = delta.getItems().size();
if (deltaSize > upperThreshold) {
registry
.gauge(
itemsOverThresholdId.withTags(
"monitor", monitorName, "partition", ctx.partitionName))
.set(deltaSize);
itemsOverThresholdMap.get(ctx.partitionName).set(deltaSize);
if (ctx.fastForward) {
log.warn(
"Fast forwarding items ({}) in {} {}",
deltaSize,
itemsOverThresholdMap.get(ctx.partitionName).get(),
StructuredArguments.kv("monitor", monitorName),
StructuredArguments.kv("partition", ctx.partitionName));
sendEvents = false;
} else {
log.error(
"Number of items ({}) to cache exceeds upper threshold ({}) in {} {}",
deltaSize,
itemsOverThresholdMap.get(ctx.partitionName).get(),
upperThreshold,
StructuredArguments.kv("monitor", monitorName),
StructuredArguments.kv("partition", ctx.partitionName));
return;
}
} else {
registry
.gauge(
itemsOverThresholdId.withTags(
"monitor", monitorName, "partition", ctx.partitionName))
.set(0);
itemsOverThresholdMap.get(ctx.partitionName).set(0);
}

sendEvents = sendEvents && isSendEventsEnabled();

commitDelta(delta, sendEvents);
registry
.gauge(itemsCachedId.withTags("monitor", monitorName, "partition", ctx.partitionName))
.set(deltaSize);
itemsCachedMap.get(ctx.partitionName).set(deltaSize);
} catch (Exception e) {
log.error(
"Failed to update monitor items for {}:{}",
StructuredArguments.kv("monitor", monitorName),
StructuredArguments.kv("partition", ctx.partitionName),
e);
registry
.counter(
pollCycleFailedId.withTags("monitor", monitorName, "partition", ctx.partitionName))
.increment();
registry
.gauge(itemsCachedId.withTags("monitor", monitorName, "partition", ctx.partitionName))
.set(0);
registry
.gauge(
itemsOverThresholdId.withTags("monitor", monitorName, "partition", ctx.partitionName))
.set(0);
instrumentation.trackPollCycleFailed(monitorName, ctx.partitionName);
itemsCachedMap.get(ctx.partitionName).set(0);
itemsOverThresholdMap.get(ctx.partitionName).set(0);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2024 Wise, PLC.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.igor.polling;

import com.netflix.spectator.api.Gauge;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.patterns.PolledMeter;
import java.util.concurrent.atomic.AtomicInteger;

public class CommonPollingMonitorInstrumentation {

private final Registry registry;
private final Id itemsCachedId;
private final Id itemsOverThresholdId;
private final Id pollCycleFailedId;
private final Id pollCycleTimingId;

public CommonPollingMonitorInstrumentation(Registry registry) {
this.registry = registry;
itemsCachedId = registry.createId("pollingMonitor.newItems");
itemsOverThresholdId = registry.createId("pollingMonitor.itemsOverThreshold");
pollCycleFailedId = registry.createId("pollingMonitor.failed");
pollCycleTimingId = registry.createId("pollingMonitor.pollTiming");
}

public void trackItemsCached(AtomicInteger numberOfItems, String monitor, String partition) {
Gauge gauge =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor... but could move the registry.get into the if check bypassing the need for a cast

(Gauge) registry.get(itemsCachedId.withTags("monitor", monitor, "partition", partition));

/*
Spectator gauges are slightly different from Micrometer ones: the polling gauge
has been deprecated in favour of PolledMeter.
Previous implementation resulted in NaN most of the times, while we want to store observations
on Prometheus. We don't need a DistributionSummary, but just a gauge which doesn't get garbage
collected. For a unique combination of (metricName, tags), PolledMeter will sum up the observed
values, so to avoid this, we set the gauge ONCE and we pass a strong reference holding the
latest value to be observed.
*/

if (gauge == null) {
PolledMeter.using(registry)
.withId(itemsCachedId.withTags("monitor", monitor, "partition", partition))
.monitorValue(numberOfItems);
}
}

public void trackItemsOverThreshold(
AtomicInteger numberOfItems, String monitor, String partition) {
Gauge gauge =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, minor thing just prefer inline checks on some of this :) PERSONAL thing vs. "required"

(Gauge)
registry.get(itemsOverThresholdId.withTags("monitor", monitor, "partition", partition));
if (gauge == null) {
PolledMeter.using(registry)
.withId(itemsOverThresholdId.withTags("monitor", monitor, "partition", partition))
.monitorValue(numberOfItems);
}
}

public void trackPollCycleTime(String monitor, Runnable lambda) {
registry.timer(pollCycleTimingId.withTags("monitor", monitor)).record(lambda);
}

public void trackPollCycleFailed(String monitor, String partition) {
registry
.counter(getPollCycleFailedId().withTags("monitor", monitor, "partition", partition))
.increment();
}

public Id getItemsCachedId() {
return itemsCachedId;
}

public Id getItemsOverThresholdId() {
return itemsOverThresholdId;
}

public Id getPollCycleFailedId() {
return pollCycleFailedId;
}

public Id getPollCycleTimingId() {
return pollCycleTimingId;
}
}
Loading