Skip to content

Commit

Permalink
Polishing #1995
Browse files Browse the repository at this point in the history
Lazily create NoPauseDetector and ensure immediate shutdown of the pause thread. Add since tags.

Original pull request: #2005.
  • Loading branch information
mp911de committed Feb 25, 2022
1 parent 705857e commit 32abb73
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ static CommandLatencyCollectorOptions.Builder builder() {
boolean isEnabled();

/**
* Returns whether PauseDetector is enabled.
* Returns whether PauseDetector is enabled. Defaults to no pause detector.
*
* @return {@code true} if the PauseDetector is enabled
* @since 6.1.7
*/
boolean usePauseDetector();

Expand All @@ -132,22 +133,24 @@ interface Builder {
Builder enable();

/**
* Use LatencyUtils.SimplePauseDetector to detects pauses.
* See {@link org.LatencyUtils.SimplePauseDetector}
* Use {@code LatencyUtils.SimplePauseDetector} to detect pauses. Defaults to no pause detector.
*
* @return this {@link DefaultCommandLatencyCollectorOptions.Builder}.
* @since 6.1.7
* @see org.LatencyUtils.SimplePauseDetector
*/
Builder usePauseDetector();

/**
* Do not detects pauses.
* Do not detect pauses. Defaults to no pause detector.
*
* @return this {@link DefaultCommandLatencyCollectorOptions.Builder}.
* @since 6.1.7
*/
Builder useNoPauseDetector();

/**
* Enables per connection metrics tracking insead of per host/port. If {@code true}, multiple connections to the same
* Enables per connection metrics tracking instead of per host/port. If {@code true}, multiple connections to the same
* host/connection point will be recorded separately which allows to inspect every connection individually. If
* {@code false}, multiple connections to the same host/connection point will be recorded together. This allows a
* consolidated view on one particular service. Defaults to {@code false}. See
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public class DefaultCommandLatencyCollector implements CommandLatencyCollector {

private static final boolean HDR_UTILS_AVAILABLE = isPresent("org.HdrHistogram.Histogram");

private static final PauseDetectorWrapper GLOBAL_PAUSE_DETECTOR = PauseDetectorWrapper.create(true);
private static final PauseDetectorWrapper GLOBAL_PAUSE_DETECTOR = PauseDetectorWrapper.create();

private static final PauseDetectorWrapper GLOBAL_NO_PAUSE_DETECTOR = PauseDetectorWrapper.create(false);
private static final PauseDetectorWrapper GLOBAL_NO_PAUSE_DETECTOR = PauseDetectorWrapper.noop();

private static final long MIN_LATENCY = 1000;

Expand Down Expand Up @@ -326,7 +326,12 @@ public Histogram getCompletionHistogram() {
* No-operation {@link PauseDetector} implementation.
*/
static class NoPauseDetector extends PauseDetector {
protected NoPauseDetector() {

protected static final NoPauseDetector INSTANCE = new NoPauseDetector();

private NoPauseDetector() {
super();
super.shutdown();
}

@Override
Expand Down Expand Up @@ -359,7 +364,6 @@ interface PauseDetectorWrapper {
* No-operation {@link PauseDetectorWrapper} implementation.
*/
PauseDetectorWrapper NO_OP = new PauseDetectorWrapper() {
private final PauseDetector pauseDetector = new NoPauseDetector();

@Override
public void release() {
Expand All @@ -371,20 +375,24 @@ public void retain() {

@Override
public PauseDetector getPauseDetector() {
return pauseDetector;
return NoPauseDetector.INSTANCE;
}

};

static PauseDetectorWrapper create(boolean usePauseDetector) {
static PauseDetectorWrapper create() {

if (HDR_UTILS_AVAILABLE && LATENCY_UTILS_AVAILABLE && usePauseDetector) {
if (HDR_UTILS_AVAILABLE && LATENCY_UTILS_AVAILABLE) {
return new DefaultPauseDetectorWrapper();
}

return NO_OP;
}

static PauseDetectorWrapper noop() {
return NO_OP;
}

/**
* Retain reference to {@link PauseDetectorWrapper} and increment reference counter.
*/
Expand Down Expand Up @@ -421,6 +429,7 @@ static class DefaultPauseDetectorWrapper implements PauseDetectorWrapper {
*
* @return
*/
@Override
public PauseDetector getPauseDetector() {
return pauseDetector;
}
Expand All @@ -429,6 +438,7 @@ public PauseDetector getPauseDetector() {
* Creates or initializes a {@link PauseDetector} instance after incrementing the usage counter to one. Should be
* {@link #release() released} once it is no longer in use.
*/
@Override
public void retain() {

if (counter.incrementAndGet() == 1) {
Expand Down Expand Up @@ -476,6 +486,7 @@ public void run() {
/**
* Decrements the usage counter. When reaching {@code 0}, the {@link PauseDetector} instance is released.
*/
@Override
public void release() {

if (counter.decrementAndGet() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ public class DefaultCommandLatencyCollectorOptions implements CommandLatencyColl

private final boolean enabled;

private final boolean usePauseDetector;

private final Builder builder;

private boolean usePauseDetector;

protected DefaultCommandLatencyCollectorOptions(Builder builder) {
this.targetUnit = builder.targetUnit;
Expand Down Expand Up @@ -151,11 +152,11 @@ public Builder enable() {
}

/**
* Use LatencyUtils.SimplePauseDetector to detects pauses.
* See {@link org.LatencyUtils.SimplePauseDetector}.
* Defaults to useNoPauseDetector.See {@link DefaultCommandLatencyCollectorOptions#DEFAULT_USE_NO_PAUSE_DETECTOR}.
* Use {@code LatencyUtils.SimplePauseDetector} to detect pauses. Defaults to no pause detector.
*
* @return this {@link Builder}.
* @return this {@link DefaultCommandLatencyCollectorOptions.Builder}.
* @since 6.1.7
* @see org.LatencyUtils.SimplePauseDetector
*/
@Override
public Builder usePauseDetector() {
Expand All @@ -164,10 +165,10 @@ public Builder usePauseDetector() {
}

/**
* Do not detects pauses.
* Defaults to useNoPauseDetector.See {@link DefaultCommandLatencyCollectorOptions#DEFAULT_USE_NO_PAUSE_DETECTOR}.
* Do not detect pauses. Defaults to no pause detector.
*
* @return this {@link Builder}.
* @return this {@link DefaultCommandLatencyCollectorOptions.Builder}.
* @since 6.1.7
*/
@Override
public Builder useNoPauseDetector() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,18 @@
*/
package io.lettuce.core.metrics;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static java.util.concurrent.TimeUnit.*;
import static org.assertj.core.api.Assertions.*;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import io.lettuce.test.ReflectionTestUtils;

import io.lettuce.core.metrics.DefaultCommandLatencyCollector.PauseDetectorWrapper;
import io.lettuce.core.protocol.CommandType;
import io.lettuce.test.ReflectionTestUtils;
import io.netty.channel.local.LocalAddress;

/**
Expand Down Expand Up @@ -151,65 +147,4 @@ private void setupData() {
sut.recordCommandLatency(LocalAddress.ANY, LocalAddress.ANY, CommandType.BGSAVE, MILLISECONDS.toNanos(300),
MILLISECONDS.toNanos(1000));
}

@Test
void verifyNoPauseDetector() {
int loop = 100000;

sut = new DefaultCommandLatencyCollector(DefaultCommandLatencyCollectorOptions
.builder()
// PauseDetection will not work as expected
// .usePauseDetector()
.build()
);

setupLoopData(loop);

Map<CommandLatencyId, CommandMetrics> latencies = sut.retrieveMetrics();
assertThat(latencies).hasSize(1);

Map.Entry<CommandLatencyId, CommandMetrics> entry = latencies.entrySet().iterator().next();

assertThat(entry.getKey().commandType()).isSameAs(CommandType.BGSAVE);

CommandMetrics metrics = entry.getValue();

assertThat(metrics.getCount()).isEqualTo(loop);
assertThat(metrics.getCompletion().getMin()).isBetween(990000L, 1100000L);
assertThat(metrics.getCompletion().getMax()).isBetween(990000L, 1100000L);
assertThat(metrics.getCompletion().getPercentiles()).hasSize(5);

assertThat(metrics.getFirstResponse().getMin()).isBetween(90000L, 110000L);
assertThat(metrics.getFirstResponse().getMax()).isBetween(90000L, 110000L);
assertThat(metrics.getCompletion().getPercentiles()).containsKey(50.0d);

assertThat(metrics.getFirstResponse().getPercentiles().get(50d)).isLessThanOrEqualTo(
metrics.getCompletion().getPercentiles().get(50d));

assertThat(metrics.getTimeUnit()).isEqualTo(MICROSECONDS);

assertThat(sut.retrieveMetrics()).isEmpty();

sut.shutdown();
}

private void setupLoopData(int loop) {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final Runner runner = new Runner();

for (int ndx = 0; ndx < loop; ndx++) executorService.submit(runner);
try {
executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
}
}

class Runner implements Runnable {
@Override
public void run() {
sut.recordCommandLatency(LocalAddress.ANY, LocalAddress.ANY, CommandType.BGSAVE, MILLISECONDS.toNanos(100),
MILLISECONDS.toNanos(1000));
}
}
}

0 comments on commit 32abb73

Please sign in to comment.