Skip to content

Commit

Permalink
Merge pull request #691 from mattrjacobs/larger-semaphore-test
Browse files Browse the repository at this point in the history
Added a test with higher semaphore counts to detect raciness
  • Loading branch information
mattrjacobs committed Feb 20, 2015
2 parents ef5c1de + f4b9a46 commit 15b7a6f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1489,8 +1489,8 @@ C getLatentCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult
return getCommand(isolationStrategy, executionResult, executionLatency, fallbackResult, 0, new HystrixCircuitBreakerTest.TestCircuitBreaker(), null, (executionLatency * 2) + 100, CacheEnabled.NO, "foo", executionSemaphore, fallbackSemaphore, false);
}

C getFallbackLatentCommand(ExecutionIsolationStrategy isolationStrategy, FallbackResult fallbackResult, int fallbackLatency, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, AbstractCommand.TryableSemaphore fallbackSemaphore) {
return getCommand(isolationStrategy, ExecutionResult.FAILURE, 0, fallbackResult, fallbackLatency, circuitBreaker, null, 100, CacheEnabled.NO, "foo", 10, fallbackSemaphore, false);
C getFallbackLatentCommand(ExecutionIsolationStrategy isolationStrategy, FallbackResult fallbackResult, int fallbackLatency, HystrixCircuitBreakerTest.TestCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, AbstractCommand.TryableSemaphore executionSemaphore, AbstractCommand.TryableSemaphore fallbackSemaphore) {
return getCommand(isolationStrategy, ExecutionResult.FAILURE, 0, fallbackResult, fallbackLatency, circuitBreaker, threadPool, 10000, CacheEnabled.NO, "foo", executionSemaphore, fallbackSemaphore, true);
}

C getCircuitOpenCommand(ExecutionIsolationStrategy isolationStrategy, FallbackResult fallbackResult) {
Expand Down
125 changes: 73 additions & 52 deletions hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -44,6 +50,7 @@
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.observers.TestSubscriber;
Expand All @@ -61,6 +68,7 @@
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import com.netflix.hystrix.util.HystrixRollingNumberEvent;
import rx.schedulers.Schedulers;

public class HystrixCommandTest extends CommonHystrixCommandTests<TestHystrixCommand<?>> {

Expand Down Expand Up @@ -3550,67 +3558,80 @@ public void onNext(Boolean args) {

@Test
public void testFallbackRejectionOccursWithLatentFallback() {
int numCommands = 1000;
int semaphoreSize = 600;
List<TestHystrixCommand<?>> cmds = new ArrayList<>();
final AtomicInteger exceptionsSeen = new AtomicInteger(0);
final AtomicInteger fallbacksSeen = new AtomicInteger(0);
final ConcurrentMap<HystrixRuntimeException.FailureType, AtomicInteger> exceptionTypes = new ConcurrentHashMap<>();
final CountDownLatch latch = new CountDownLatch(numCommands);

TestCircuitBreaker circuitBreaker = new TestCircuitBreaker();
TryableSemaphore fallbackSemaphore = new AbstractCommand.TryableSemaphoreActual(HystrixProperty.Factory.asProperty(2));
final TestHystrixCommand<?> cmd = getFallbackLatentCommand(ExecutionIsolationStrategy.THREAD, AbstractTestHystrixCommand.FallbackResult.SUCCESS, 1000, circuitBreaker, fallbackSemaphore);
final TestHystrixCommand<?> cmd2 = getFallbackLatentCommand(ExecutionIsolationStrategy.THREAD, AbstractTestHystrixCommand.FallbackResult.SUCCESS, 1000, circuitBreaker, fallbackSemaphore);
final TestHystrixCommand<?> cmd3 = getFallbackLatentCommand(ExecutionIsolationStrategy.THREAD, AbstractTestHystrixCommand.FallbackResult.SUCCESS, 1000, circuitBreaker, fallbackSemaphore);
HystrixThreadPool largeThreadPool = new HystrixThreadPool.HystrixThreadPoolDefault(HystrixThreadPoolKey.Factory.asKey("LATENT_FALLBACK"), HystrixThreadPoolProperties.Setter.getUnitTestPropertiesBuilder().withCoreSize(numCommands));
TryableSemaphore executionSemaphore = new TryableSemaphoreActual(HystrixProperty.Factory.asProperty(numCommands));
TryableSemaphore fallbackSemaphore = new AbstractCommand.TryableSemaphoreActual(HystrixProperty.Factory.asProperty(semaphoreSize));

//saturate the fallback semaphore
Future<?> result = cmd.queue();
Future<?> result2 = cmd2.queue();
/**
* The goal here is for all commands to fail immediately in the run() method, and then hit the fallback path.
* The fallback path should be latent for all commands, and the fallback semaphore should saturate and
* reject some fallbacks from occurring
*
* To accomplish this, I will set
* - the threadpool that commands run in high (so commands don't get rejected by the threadpool),
* - the execution semaphore high (so commands don't get rejected by that semaphore)
* - the falback semaphore lower than the number of commands (so that some get fallback-rejected)
*/

try {
Thread.sleep(100);
} catch (InterruptedException ie) {
fail("should not get interrupted");
for (int i = 0; i < numCommands; i++) {
cmds.add(getFallbackLatentCommand(ExecutionIsolationStrategy.THREAD, AbstractTestHystrixCommand.FallbackResult.SUCCESS, 1000, circuitBreaker, largeThreadPool, executionSemaphore, fallbackSemaphore));
}

//this should have a rejected fallback, since 2 commands are already executing a fallback with a semaphore count of 2
Future<?> result3 = cmd3.queue();
for (final TestHystrixCommand<?> cmd: cmds) {
final Runnable cmdExecution = new Runnable() {
@Override
public void run() {
try {
cmd.execute();
fallbacksSeen.incrementAndGet();
} catch (HystrixRuntimeException hre) {
HystrixRuntimeException.FailureType ft = hre.getFailureType();
AtomicInteger found = exceptionTypes.get(ft);
if (found != null) {
found.incrementAndGet();
} else {
exceptionTypes.put(ft, new AtomicInteger(1));
}
exceptionsSeen.incrementAndGet();
} finally {
latch.countDown();
}
}
};

try {
//now wait for the third future to complete - this should be a fast-fail while the other 2 commands are executing their fallbacks
result3.get();
fail("should not get a result from a command with a rejected fallback");
} catch (InterruptedException ex) {
fail("Unexpected interruption to future, which should have fast-failed");
} catch (ExecutionException ex) {
assertFalse(result.isDone());
assertFalse(result2.isDone());
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS));
assertEquals(1, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.EXCEPTION_THROWN));
assertEquals(3, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FAILURE));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.BAD_REQUEST));
assertEquals(1, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_REJECTION));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_FAILURE));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_SUCCESS));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.SEMAPHORE_REJECTED));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.SHORT_CIRCUITED));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE));
new Thread(cmdExecution).start();
}

try {
//now wait for the first 2 futures to complete. They should both have successful fallbacks 1000 millis after they started
assertEquals(FlexibleTestHystrixCommand.FALLBACK_VALUE, result.get());
assertEquals(FlexibleTestHystrixCommand.FALLBACK_VALUE, result2.get());
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS));
assertEquals(1, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.EXCEPTION_THROWN));
assertEquals(3, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FAILURE));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.BAD_REQUEST));
assertEquals(1, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_REJECTION));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_FAILURE));
assertEquals(2, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.FALLBACK_SUCCESS));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.SEMAPHORE_REJECTED));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.SHORT_CIRCUITED));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT));
assertEquals(0, circuitBreaker.metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE));
} catch (InterruptedException | ExecutionException e) {
fail("Interrupted during Future.get()");
}
latch.await(5, TimeUnit.SECONDS);

System.out.println("MAP : " + exceptionTypes);
} catch (InterruptedException ie) {
fail("Interrupted!");
}

System.out.println("NUM EXCEPTIONS : " + exceptionsSeen.get());
assertEquals(0, circuitBreaker.metrics.getCumulativeCount(HystrixRollingNumberEvent.SUCCESS));
assertEquals(numCommands - semaphoreSize, circuitBreaker.metrics.getCumulativeCount(HystrixRollingNumberEvent.EXCEPTION_THROWN));
assertEquals(numCommands, circuitBreaker.metrics.getCumulativeCount(HystrixRollingNumberEvent.FAILURE));
assertEquals(0, circuitBreaker.metrics.getCumulativeCount(HystrixRollingNumberEvent.BAD_REQUEST));
assertEquals(numCommands - semaphoreSize, circuitBreaker.metrics.getCumulativeCount(HystrixRollingNumberEvent.FALLBACK_REJECTION));
assertEquals(0, circuitBreaker.metrics.getCumulativeCount(HystrixRollingNumberEvent.FALLBACK_FAILURE));
assertEquals(semaphoreSize, circuitBreaker.metrics.getCumulativeCount(HystrixRollingNumberEvent.FALLBACK_SUCCESS));
assertEquals(0, circuitBreaker.metrics.getCumulativeCount(HystrixRollingNumberEvent.SEMAPHORE_REJECTED));
assertEquals(0, circuitBreaker.metrics.getCumulativeCount(HystrixRollingNumberEvent.SHORT_CIRCUITED));
assertEquals(0, circuitBreaker.metrics.getCumulativeCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED));
assertEquals(0, circuitBreaker.metrics.getCumulativeCount(HystrixRollingNumberEvent.TIMEOUT));
assertEquals(0, circuitBreaker.metrics.getCumulativeCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE));
}

@Test
Expand Down

0 comments on commit 15b7a6f

Please sign in to comment.