diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java index d4742a0d6..43b751b11 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java @@ -21,6 +21,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory; +import com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +72,7 @@ public abstract class HystrixObservableCollapser collapserFactory; private final HystrixRequestCache requestCache; private final HystrixCollapserBridge collapserInstanceWrapper; + private final HystrixCollapserMetrics metrics; /** * The scope of request collapsing. @@ -112,20 +115,31 @@ protected HystrixObservableCollapser(HystrixCollapserKey collapserKey) { * Fluent interface for constructor arguments */ protected HystrixObservableCollapser(Setter setter) { - this(setter.collapserKey, setter.scope, new RealCollapserTimer(), setter.propertiesSetter); + this(setter.collapserKey, setter.scope, new RealCollapserTimer(), setter.propertiesSetter, null); } - /* package for tests */HystrixObservableCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder) { + /* package for tests */HystrixObservableCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) { if (collapserKey == null || collapserKey.name().trim().equals("")) { String defaultKeyName = getDefaultNameFromClass(getClass()); collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName); } - this.collapserFactory = new RequestCollapserFactory<>(collapserKey, scope, timer, propertiesBuilder); + HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder); + this.collapserFactory = new RequestCollapserFactory<>(collapserKey, scope, timer, properties); this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy()); + if (metrics == null) { + this.metrics = HystrixCollapserMetrics.getInstance(collapserKey, properties); + } else { + this.metrics = metrics; + } + final HystrixObservableCollapser self = this; + /* strategy: HystrixMetricsPublisherCollapser */ + HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties); + + /** * Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class. */ @@ -133,7 +147,9 @@ protected HystrixObservableCollapser(Setter setter) { @Override public Collection>> shardRequests(Collection> requests) { - return self.shardRequests(requests); + Collection>> shards = self.shardRequests(requests); + self.metrics.markShards(shards.size()); + return shards; } @Override @@ -142,7 +158,7 @@ public Observable createObservableCommand(Collection @@ -373,17 +397,14 @@ public Observable toObservable(Scheduler observeOn) { if (getProperties().requestCacheEnabled().get()) { Observable fromCache = requestCache.get(getCacheKey()); if (fromCache != null) { - /* mark that we received this response from cache */ - // TODO Add collapser metrics so we can capture this information - // we can't add it to the command metrics because the command can change each time (dynamic key for example) - // and we don't have access to it when responding from cache - // collapserMetrics.markResponseFromCache(); + metrics.markResponseFromCache(); return fromCache; } } RequestCollapser requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper); Observable response = requestCollapser.submitRequest(getRequestArgument()); + metrics.markRequestBatched(); if (getProperties().requestCacheEnabled().get()) { /* * A race can occur here with multiple threads queuing but only one will be cached. diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java index f90689105..c6fe5bb71 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java @@ -16,12 +16,16 @@ package com.netflix.hystrix; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import java.util.Collection; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import com.netflix.hystrix.strategy.properties.HystrixPropertiesCollapserDefault; +import com.netflix.hystrix.util.HystrixRollingNumberEvent; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -38,11 +42,8 @@ import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; public class HystrixObservableCollapserTest { - static AtomicInteger counter = new AtomicInteger(); - @Before public void init() { - counter.set(0); // since we're going to modify properties of the same class between tests, wipe the cache each time HystrixCollapser.reset(); /* we must call this to simulate a new request lifecycle running and clearing caches */ @@ -61,57 +62,66 @@ public void cleanup() { @Test public void testTwoRequests() throws Exception { TestCollapserTimer timer = new TestCollapserTimer(); - Future response1 = new TestRequestCollapser(timer, counter, 1).observe().toBlocking().toFuture(); - Future response2 = new TestRequestCollapser(timer, counter, 2).observe().toBlocking().toFuture(); + HystrixObservableCollapser collapser1 = new TestRequestCollapser(timer, 1); + HystrixObservableCollapser collapser2 = new TestRequestCollapser(timer, 2); + Future response1 = collapser1.observe().toBlocking().toFuture(); + Future response2 = collapser2.observe().toBlocking().toFuture(); timer.incrementTime(10); // let time pass that equals the default delay/period assertEquals("1", response1.get()); assertEquals("2", response2.get()); - assertEquals(1, counter.get()); - assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size()); + + HystrixCollapserMetrics metrics = collapser1.getMetrics(); + assertSame(metrics, collapser2.getMetrics()); + assertEquals(2L, metrics.getRollingCount(HystrixRollingNumberEvent.COLLAPSER_REQUEST_BATCHED)); + assertEquals(1L, metrics.getRollingCount(HystrixRollingNumberEvent.COLLAPSER_BATCH)); + assertEquals(0L, metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE)); } private static class TestRequestCollapser extends HystrixObservableCollapser { - private final AtomicInteger count; private final String value; private ConcurrentLinkedQueue> commandsExecuted; - public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value) { - this(timer, counter, String.valueOf(value)); + public TestRequestCollapser(TestCollapserTimer timer, int value) { + this(timer, String.valueOf(value)); } - public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value) { - this(timer, counter, value, 10000, 10); + public TestRequestCollapser(TestCollapserTimer timer, String value) { + this(timer, value, 10000, 10); } - public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, ConcurrentLinkedQueue> executionLog) { - this(timer, counter, value, 10000, 10, executionLog); + public TestRequestCollapser(TestCollapserTimer timer, String value, ConcurrentLinkedQueue> executionLog) { + this(timer, value, 10000, 10, executionLog); } - public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { - this(timer, counter, String.valueOf(value), defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds); + public TestRequestCollapser(TestCollapserTimer timer, int value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { + this(timer, String.valueOf(value), defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds); } - public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { - this(timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null); + public TestRequestCollapser(TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { + this(timer, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null); } - public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { - this(scope, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null); + public TestRequestCollapser(Scope scope, TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) { + this(scope, timer, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null); } - public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue> executionLog) { - this(Scope.REQUEST, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, executionLog); + public TestRequestCollapser(TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue> executionLog) { + this(Scope.REQUEST, timer, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, executionLog); } - public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue> executionLog) { + private static HystrixCollapserMetrics createMetrics() { + HystrixCollapserKey key = HystrixCollapserKey.Factory.asKey("COLLAPSER_ONE"); + return HystrixCollapserMetrics.getInstance(key, new HystrixPropertiesCollapserDefault(key, HystrixCollapserProperties.Setter())); + } + + public TestRequestCollapser(Scope scope, TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue> executionLog) { // use a CollapserKey based on the CollapserTimer object reference so it's unique for each timer as we don't want caching // of properties to occur and we're using the default HystrixProperty which typically does caching - super(collapserKeyFromString(timer), scope, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(defaultMaxRequestsInBatch).withTimerDelayInMilliseconds(defaultTimerDelayInMilliseconds)); - this.count = counter; + super(collapserKeyFromString(timer), scope, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(defaultMaxRequestsInBatch).withTimerDelayInMilliseconds(defaultTimerDelayInMilliseconds), createMetrics()); this.value = value; this.commandsExecuted = executionLog; } @@ -133,9 +143,6 @@ public HystrixObservableCommand createCommand(final Collection getBatchReturnTypeToResponseTypeMapper() { - // count how many times a batch is executed (this method is executed once per batch) - System.out.println("increment count: " + count.incrementAndGet()); - return new Func1() { @Override