Skip to content

Commit

Permalink
feat: add circuit.shutdown() to shut a circuit down
Browse files Browse the repository at this point in the history
Shutting down a circuit will cause it to always return a
rejected promise with an `ESHUTDOWN` `Error.code` when
`circuit.fire()` is called, and the Hystrix stats stop
accumulating. Once a circuit has been shut down, it cannot
be reset or used again.

When using the `shutdown()` method on a circuit, all listeners
are cleaned up internally, preventing memory leaks.

Fixes: #248
Fixes: #181
Fixes: #245
  • Loading branch information
lance committed Jan 9, 2019
1 parent 1ecb9ca commit e14796c
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 33 deletions.
29 changes: 28 additions & 1 deletion lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const OPEN = Symbol('open');
const CLOSED = Symbol('closed');
const HALF_OPEN = Symbol('half-open');
const PENDING_CLOSE = Symbol('pending-close');
const SHUTDOWN = Symbol('shutdown');
const FALLBACK_FUNCTION = Symbol('fallback');
const STATUS = Symbol('status');
const NAME = Symbol('name');
Expand Down Expand Up @@ -187,6 +188,25 @@ class CircuitBreaker extends EventEmitter {
}
}

/**
* Shuts down this circuit breaker. All subsequent calls to the
* circuit will fail, returning a rejected promise.
*/
shutdown () {
this.disable();
this.removeAllListeners();
this.status.shutdown();
this.hystrixStats.shutdown();
this[STATE] = SHUTDOWN;
}

/**
* Determines if the circuit has been shutdown.
*/
get isShutdown () {
return this[STATE] === SHUTDOWN;
}

/**
* Gets the name of this circuit
*/
Expand Down Expand Up @@ -239,7 +259,9 @@ class CircuitBreaker extends EventEmitter {
}

/**
A convenience function that returns the hystrixStats
A convenience function that returns the hystrixStats. If the circuit has
been shutdown, returns undefined and any existing external references to
the HystrixStats instance will stop emitting data.
*/
get hystrixStats () {
return this[HYSTRIX_STATS];
Expand Down Expand Up @@ -295,6 +317,11 @@ class CircuitBreaker extends EventEmitter {
* @fires CircuitBreaker#semaphore-locked
*/
fire () {
if (this.isShutdown) {
const err = new Error('The circuit has been shutdown.');
err.code = 'ESHUTDOWN';
return Promise.reject(err);
}
const args = Array.prototype.slice.call(arguments);

/**
Expand Down
27 changes: 15 additions & 12 deletions lib/hystrix-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ const hystrixStream = new Transform({
}
});

// TODO: This number is somewhat arbitrary. However, we need to allow
// a potentially large number of listeners on this transform stream
// because all circuits are connected to it. In an application with
// a large number of circuits (or tests using circuits), if left to
// the default MAX_LISTENERS, the user will see errors.
hystrixStream.setMaxListeners(100);
hystrixStream.resume();

/**
Expand All @@ -41,34 +35,43 @@ hystrixStream.resume();
*/
class HystrixStats {
constructor (circuit) {
const _readableStream = new Readable({
this._readableStream = new Readable({
objectMode: true,
read () {}
});

// Listen for the stats's snapshot event
circuit.status.on('snapshot', function snapshotListener (stats) {
// when we get a snapshot push it onto the stream
_readableStream.push(
this._readableStream.push(
Object.assign({},
{
name: circuit.name,
closed: circuit.closed,
group: circuit.group,
options: circuit.options
}, stats));
});
}.bind(this));

_readableStream.resume();
_readableStream.pipe(hystrixStream);
this._readableStream.resume();
this._readableStream.pipe(hystrixStream);
}

/**
A convenience function that returns the hystrxStream
A convenience function that returns the hystrixStream
*/
getHystrixStream () {
return hystrixStream;
}

/**
* Shuts down this instance, freeing memory.
* When a circuit is shutdown, it should call shutdown() on
* its HystrixStats instance to avoid memory leaks.
*/
shutdown () {
this._readableStream.unpipe(hystrixStream);
}
}

HystrixStats.stream = hystrixStream;
Expand Down
4 changes: 4 additions & 0 deletions lib/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ class Status extends EventEmitter {
close () {
this[WINDOW][0].isCircuitBreakerOpen = false;
}

shutdown () {
this.removeAllListeners();
}
}

const nextBucket = window => _ => {
Expand Down
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"test:headless": "node test/browser/webpack-test.js",
"test:browser": "opener http://localhost:9007/test/browser/index.html && http-server . -p 9007",
"test:coverage": "nyc tape test/*.js | tap-spec",
"prepare": "nsp check && npm run build:browser",
"prepare": "npm audit && npm run build:browser",
"postpublish": "./publish-docs.sh",
"prerelease": "npm test",
"release": "standard-version -s",
Expand Down Expand Up @@ -42,7 +42,6 @@
"jsdoc": "3.5.5",
"marked": "~0.5.0",
"moment": "~2.22.0",
"nsp": "~3.2.1",
"nyc": "~12.0.2",
"opener": "1.5.1",
"semistandard": "~12.0.1",
Expand Down
27 changes: 27 additions & 0 deletions test/circuit-shutdown-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';

const test = require('tape');
const circuit = require('..');
const passFail = require('./common').passFail;

// tests that we are not leaving listeners open to
// chew up memory
test('EventEmitter max listeners', t => {
let i = 100;
while (--i >= 0) {
const breaker = circuit(passFail);
breaker.fire(1);
breaker.shutdown(); // required for cleanup
}
t.end();
});

test('Circuit shuts down properly', t => {
t.plan(3);
const breaker = circuit(passFail);
t.ok(breaker.fire(1), 'breaker is active');
breaker.shutdown();
t.ok(breaker.isShutdown, 'breaker is shutdown');
t.notOk(breaker.enabled, 'breaker has been disabled');
t.end();
});
6 changes: 5 additions & 1 deletion test/enable-disable-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ test('Defaults to enabled', t => {
t.plan(1);
const breaker = opossum(passFail);
t.equals(breaker.enabled, true);
breaker.shutdown();
t.end();
});

test('Accepts options.enabled', t => {
t.plan(1);
const breaker = opossum(passFail, { enabled: false });
t.equals(breaker.enabled, false);
breaker.shutdown();
t.end();
});

Expand Down Expand Up @@ -50,6 +52,8 @@ test('When disabled the circuit should always be closed', t => {
.catch(e => t.equals(e, 'Error: -1 is < 0'))
.then(() => {
t.ok(breaker.opened, 'should be closed');
});
})
.then(_ => breaker.shutdown())
.then(t.end);
});
});
14 changes: 9 additions & 5 deletions test/half-open-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,14 @@ test('When half-open, the circuit only allows one request through', t => {
'should not be halfOpen after long failing function');
t.notOk(breaker.pendingClose,
'should not be pending close after long failing func');
});
// fire the breaker again, and be sure it fails as expected
breaker
.fire(1)
.catch(e => t.equals(e.message, 'Breaker is open'));
})
.then(_ => {
// fire the breaker again, and be sure it fails as expected
breaker
.fire(1)
.catch(e => t.equals(e.message, 'Breaker is open'));
})
.then(_ => breaker.shutdown())
.then(t.end);
}, options.resetTimeout * 1.5);
});
6 changes: 6 additions & 0 deletions test/health-check-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ test('Circuits accept a health check function', t => {
const circuit = opossum(common.passFail);
circuit.healthCheck(healthChecker(_ => {
t.ok(true, 'function called');
circuit.shutdown();
t.end();
return Promise.resolve();
}), 10000);
Expand All @@ -19,6 +20,7 @@ test('health-check-failed is emitted on failure', t => {
const circuit = opossum(common.passFail);
circuit.on('health-check-failed', e => {
t.equals(e.message, 'Too many tacos', 'health-check-failed emitted');
circuit.shutdown();
t.end();
});
circuit.healthCheck(
Expand All @@ -30,6 +32,7 @@ test('circuit opens on health check failure', t => {
const circuit = opossum(common.passFail);
circuit.on('open', e => {
t.ok(circuit.opened, 'circuit opened');
circuit.shutdown();
t.end();
});
circuit.healthCheck(
Expand All @@ -43,6 +46,7 @@ test('Health check function executes in the circuit breaker context', t => {
circuit.healthCheck(function healthCheck () {
if (!called) {
t.equal(this, circuit, 'health check executes in circuit context');
circuit.shutdown();
t.end();
}
called = true;
Expand All @@ -60,6 +64,7 @@ test('healthCheck() throws TypeError if interval duration is NaN', t => {
t.equals(e.constructor, TypeError, 'throws TypeError');
t.equals(e.message, 'Health check interval must be a number',
'include correct message');
circuit.shutdown();
t.end();
}
});
Expand All @@ -74,6 +79,7 @@ test('healthCheck() throws TypeError if parameter is not a function', t => {
t.equals(e.constructor, TypeError, 'throws TypeError');
t.equals(e.message, 'Health check function must be a function',
'include correct message');
circuit.shutdown();
t.end();
}
});
Expand Down
2 changes: 1 addition & 1 deletion test/hystrix-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ test('A circuit should provide stats to a hystrix compatible stream', t => {
});
const stream = circuitOne.hystrixStats.getHystrixStream();
let circuitOneStatsSeen = false;
let circuitTwoStatsSeen = true;
let circuitTwoStatsSeen = false;
stream.on('data', blob => {
const obj = JSON.parse(blob.substring(6));
if (obj.name === 'circuit one') circuitOneStatsSeen = true;
Expand Down
Loading

0 comments on commit e14796c

Please sign in to comment.