Skip to content

Commit

Permalink
feat: semaphore added (#72)
Browse files Browse the repository at this point in the history
Add semaphore concurrency rate limiting.

Closes #60
  • Loading branch information
helio-frota authored and lance committed Jun 20, 2017
1 parent ff84026 commit 8c0a46b
Show file tree
Hide file tree
Showing 9 changed files with 5,512 additions and 44 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ Here are the events you can listen for.
* `close` - emitted when the breaker state changes to `closed`
* `halfOpen` - emitted when the breaker state changes to `halfOpen`
* `fallback` - emitted when the breaker has a fallback function and executes it
* `semaphore-locked` - emitted when the breaker is at capacity and cannot execute the request

Handling events gives a greater level of control over your application behavior.

Expand Down
93 changes: 56 additions & 37 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const EventEmitter = require('events');
const Status = require('./status');
const HystrixStats = require('./hystrix-stats');
const Semaphore = require('./semaphore');

const STATE = Symbol('state');
const OPEN = Symbol('open');
Expand Down Expand Up @@ -42,6 +43,9 @@ const CACHE = new WeakMap();
* @param options.rollingPercentilesEnabled {boolean} This property indicates whether execution latencies
* should be tracked and calculated as percentiles.
* If they are disabled, all summary statistics (mean, percentiles) are returned as -1.
* @param options.capacity the number of concurrent requests allowed. If the number of
* currently executing function calls is equal to options.capacity, further calls to `fire()`
* are rejected until at least one of the current requests completes.
*/
class CircuitBreaker extends EventEmitter {
constructor (action, options) {
Expand All @@ -50,6 +54,8 @@ class CircuitBreaker extends EventEmitter {
this.options.rollingCountTimeout = options.rollingCountTimeout || 10000;
this.options.rollingCountBuckets = options.rollingCountBuckets || 10;
this.options.rollingPercentilesEnabled = options.rollingPercentilesEnabled !== false;
this.options.capacity = typeof options.capacity === 'number' ? options.capacity : 10;
this.semaphore = new Semaphore(this.options.capacity);

this[STATUS] = new Status(this.options);
this[STATE] = CLOSED;
Expand All @@ -76,6 +82,7 @@ class CircuitBreaker extends EventEmitter {
this.on('cacheMiss', increment('cacheMisses'));
this.on('open', _ => this[STATUS].open());
this.on('close', _ => this[STATUS].close());
this.on('semaphore-locked', increment('semaphoreRejections'));

/**
* Emitted after `options.resetTimeout` has elapsed, allowing for
Expand Down Expand Up @@ -276,47 +283,59 @@ class CircuitBreaker extends EventEmitter {
let timeoutError = false;
return new Promise((resolve, reject) => {
const latencyStartTime = Date.now();
timeout = setTimeout(
() => {
timeoutError = true;
const error = new Error(`Timed out after ${this.options.timeout}ms`);
error.code = 'ETIMEDOUT';
/**
* Emitted when the circuit breaker action takes longer than `options.timeout`
* @event CircuitBreaker#timeout
*/
const latency = Date.now() - latencyStartTime;
this.emit('timeout', error, latency);
resolve(handleError(error, this, timeout, args, latency, resolve, reject));
}, this.options.timeout);

try {
const result = this.action.apply(this.action, args);
const promise = (typeof result.then === 'function')
? result
: Promise.resolve(result);

promise.then((result) => {
if (!timeoutError) {
clearTimeout(timeout);
if (this.semaphore.test()) {
timeout = setTimeout(
() => {
timeoutError = true;
const error = new Error(`Timed out after ${this.options.timeout}ms`);
error.code = 'ETIMEDOUT';
/**
* Emitted when the circuit breaker action succeeds
* @event CircuitBreaker#success
* Emitted when the circuit breaker action takes longer than `options.timeout`
* @event CircuitBreaker#timeout
*/
this.emit('success', result, (Date.now() - latencyStartTime));
resolve(result);
if (this.options.cache) {
CACHE.set(this, promise);
const latency = Date.now() - latencyStartTime;
this.semaphore.release();
this.emit('timeout', error, latency);
resolve(handleError(error, this, timeout, args, latency, resolve, reject));
}, this.options.timeout);

try {
const result = this.action.apply(this.action, args);
const promise = (typeof result.then === 'function')
? result
: Promise.resolve(result);

promise.then((result) => {
if (!timeoutError) {
clearTimeout(timeout);
/**
* Emitted when the circuit breaker action succeeds
* @event CircuitBreaker#success
*/
this.emit('success', result, (Date.now() - latencyStartTime));
this.semaphore.release();
resolve(result);
if (this.options.cache) {
CACHE.set(this, promise);
}
}
}
})
.catch((error) => {
const latencyEndTime = Date.now() - latencyStartTime;
handleError(error, this, timeout, args, latencyEndTime, resolve, reject);
});
} catch (error) {
})
.catch((error) => {
this.semaphore.release();
const latencyEndTime = Date.now() - latencyStartTime;
handleError(error, this, timeout, args, latencyEndTime, resolve, reject);
});
} catch (error) {
this.semaphore.release();
const latency = Date.now() - latencyStartTime;
handleError(error, this, timeout, args, latency, resolve, reject);
}
} else {
const latency = Date.now() - latencyStartTime;
handleError(error, this, timeout, args, latency, resolve, reject);
const err = new Error('Semaphore locked');
err.code = 'ESEMLOCKED';
this.emit('semaphore-locked', err, latency);
handleError(err, this, timeout, args, latency, resolve, reject);
}
});
}
Expand Down
6 changes: 3 additions & 3 deletions lib/hystrix-formatter.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function hystrixFormatter (stats) {
json.rollingCountFallbackRejection = 0;
json.rollingCountFallbackSuccess = 0;
json.rollingCountResponsesFromCache = stats.cacheHits;
json.rollingCountSemaphoreRejected = stats.rejects;
json.rollingCountSemaphoreRejected = stats.semaphoreRejections;
json.rollingCountShortCircuited = stats.rejects;
json.rollingCountSuccess = stats.successes;
json.rollingCountThreadPoolRejected = 0;
Expand Down Expand Up @@ -67,8 +67,8 @@ function hystrixFormatter (stats) {
json.propertyValue_executionTimeoutInMilliseconds = stats.options.timeout;
json.propertyValue_executionIsolationThreadInterruptOnTimeout = true;
json.propertyValue_executionIsolationThreadPoolKeyOverride = null;
json.propertyValue_executionIsolationSemaphoreMaxConcurrentRequests = 10;
json.propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests = 10;
json.propertyValue_executionIsolationSemaphoreMaxConcurrentRequests = stats.options.capacity;
json.propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests = stats.options.capacity;
json.propertyValue_metricsRollingStatisticalWindowInMilliseconds = 10000;
json.propertyValue_requestCacheEnabled = stats.options.cache || false;
json.propertyValue_requestLogEnabled = true;
Expand Down
54 changes: 54 additions & 0 deletions lib/semaphore.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';

module.exports = exports = semaphore;

function semaphore (count) {
const resolvers = [];
let counter = count;

let sem = {
take,
release,
test
};

Object.defineProperty(sem, 'count', {
get: _ => counter,
enumerable: true
});

return sem;

function take (timeout) {
if (counter > 0) {
--counter;
return Promise.resolve(release);
}
return new Promise((resolve, reject) => {
resolvers.push(_ => {
--counter;
resolve(release);
});
if (timeout) {
setTimeout(_ => {
resolvers.shift();
const err = new Error(`Timed out after ${timeout}ms`);
err.code = 'ETIMEDOUT';
reject(err);
}, timeout);
}
});
}

function release () {
counter++;
if (resolvers.length > 0) {
resolvers.shift()();
}
}

function test () {
if (counter < 1) return false;
return take() && true;
}
}
1 change: 1 addition & 0 deletions lib/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ const bucket = _ => ({
timeouts: 0,
cacheHits: 0,
cacheMisses: 0,
semaphoreRejections: 0,
percentiles: {},
latencyTimes: []
});
Expand Down
Loading

0 comments on commit 8c0a46b

Please sign in to comment.