Skip to content

Commit

Permalink
fix: ensure that promise is rejected on fallback
Browse files Browse the repository at this point in the history
When a circuit times out, we need to be sure that the result is not
eventually resolved successfully.
  • Loading branch information
lance committed Dec 22, 2016
1 parent a15e6ab commit d4496d8
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 62 deletions.
24 changes: 23 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ const defaults = {
Promise: Fidelity
};

/** Detect free variable `global` from Node.js. */
const freeGlobal = typeof global === 'object' && global && global.Object === Object && global;

/* eslint no-new-func: 0 */
/** Used as a reference to the global object. */
const root = freeGlobal || Function('return this')();

/**
* @module opossum
*/
Expand Down Expand Up @@ -49,4 +56,19 @@ function circuitBreaker (action, options) {
*/
circuitBreaker.promisify = require('./lib/promisify');

module.exports = exports = circuitBreaker;
function exportModule (exported) {
console.log('Exporting', exported);
console.log('Module', module);
if (typeof document === 'object') {
// in a browser environment
root[exported.name] = exported;
} else if (typeof module === 'object' && module.exports) {
// we're in a node.js environment
module.exports = exports = exported;
} else {
// ??
root[exported.name] = exported;
}
}

exportModule(circuitBreaker);
73 changes: 45 additions & 28 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class CircuitBreaker extends EventEmitter {
return () => {
const timer = setTimeout(() => {
circuit[STATE] = HALF_OPEN;
circuit.emit('halfOpen');
circuit.emit('halfOpen', circuit.options.resetTimeout);
}, circuit.options.resetTimeout);
if (typeof timer.unref === 'function') {
timer.unref();
Expand All @@ -63,7 +63,6 @@ class CircuitBreaker extends EventEmitter {

this.on('open', _startTimer(this));
this.on('success', () => this.close());
this.on('failure', () => this.open());
}

/**
Expand All @@ -83,9 +82,7 @@ class CircuitBreaker extends EventEmitter {
* that has been provided.
*/
open () {
this[NUM_FAILURES] += 1;
if (this[NUM_FAILURES] >= this.options.maxFailures &&
this[STATE] !== OPEN) {
if (this[STATE] !== OPEN) {
this[STATE] = OPEN;
this.emit('open');
}
Expand Down Expand Up @@ -136,42 +133,62 @@ class CircuitBreaker extends EventEmitter {
* fallback action that is executed, the Promise will resolve.
*/
fire () {
this.emit('fire');
const args = Array.prototype.slice.call(arguments);
this.emit('fire', args);

if (this.opened || (this.halfOpen && this[PENDING_CLOSE])) {
this.emit('reject');
this.emit('reject', new Error('Breaker is open'));
return failFast(this, 'Breaker is open', args);
}

this[PENDING_CLOSE] = this.halfOpen;
return new this.Promise((resolve, reject) => {
const timeout = setTimeout(
() => {
const timeOutFailure = failFast(this, `Time out after ${this.options.timeout}ms`, args);
reject(timeOutFailure.value);
}, this.options.timeout);

this.Promise.resolve(this.action.apply(this.action, args))
.then((result) => {
this.emit('success');
resolve(result);
clearTimeout(timeout);
})
.catch((e) => {
resolve(failFast(this, e, args));
clearTimeout(timeout);
});
});

let timeout;
let timeoutError = false;
try {
return new this.Promise((resolve, reject) => {
const timeout = setTimeout(
() => {
timeoutError = true;
const error = new Error(`Timed out after ${this.options.timeout}ms`);
this.emit('timeout', error);
failFast(this, error, args);
reject(error);
}, this.options.timeout);

Promise.resolve(this.action.apply(this.action, args))
.then((result) => {
if (!timeoutError) {
this.emit('success', result);
resolve(result);
clearTimeout(timeout);
}
})
.catch((error) => {
failFast(this, error, args);
reject(error);
clearTimeout(timeout);
});
});
} catch (error) {
clearTimeout(timeout);
return failFast(this, error, args);
}
}
}

function failFast (circuit, err, args) {
circuit.emit('failure', err);
circuit[NUM_FAILURES] += 1;

if (circuit[NUM_FAILURES] >= circuit.options.maxFailures) {
circuit.open();
}

if (circuit[FALLBACK_FUNCTION]) {
return new circuit.Promise((resolve, reject) => {
circuit.emit('fallback');
resolve(circuit[FALLBACK_FUNCTION].apply(circuit[FALLBACK_FUNCTION], args));
const result = circuit[FALLBACK_FUNCTION].apply(circuit[FALLBACK_FUNCTION], args);
circuit.emit('fallback', result);
resolve(result);
});
}
return circuit.Promise.reject.apply(null, [err]);
Expand Down
62 changes: 29 additions & 33 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ test('Fails when the circuit function fails', (t) => {
});

test('Fails when the circuit function times out', (t) => {
const expected = 'Time out after 10ms';
const expected = 'Timed out after 10ms';
const breaker = circuitBreaker(slowFunction, { timeout: 10 });

breaker.fire()
.then(t.fail)
.catch((e) => t.equals(e, expected))
.catch((e) => t.equals(e.message, expected))
.then(t.end);
});

Expand Down Expand Up @@ -127,52 +127,48 @@ test('Breaker resets for circuits with a fallback function', (t) => {
const breaker = circuitBreaker(passFail, { maxFailures: 1, resetTimeout });
breaker.fallback((x) => x * 2);

breaker.fire(fails)
.then((result) => {
t.deepEqual(result, -2);
// Now the breaker should be open. Wait for reset and
// fire again.
setTimeout(() => {
breaker.fire(100)
.then((arg) => t.equals(arg, 100))
.then(t.end)
.catch(t.fail);
}, resetTimeout * 1.25);
})
.catch(t.fail);
breaker.on('fallback', (result) => {
t.equal(result, -2);
// Now the breaker should be open. Wait for reset and
// fire again.
setTimeout(() => {
breaker.fire(100)
.then((arg) => {
t.equals(arg, 100);
})
.then(t.end)
.catch(t.fail);
}, resetTimeout * 1.25);
});

breaker.fire(fails);
});

test('Executes fallback action, if one exists, when breaker is open', (t) => {
const fails = -1;
const expected = 100;
const breaker = circuitBreaker(passFail, { maxFailures: 1 });
breaker.fallback(() => 'fallback');
breaker.fallback(() => expected);
breaker.on('fallback', (result) => {
t.equals(result, expected);
t.end();
});
breaker.fire(fails)
.then(() => {
// Now the breaker should be open. See if fallback fires.
breaker.fire()
.then((arg) => {
t.equals(arg, 'fallback');
})
.then(t.end)
.catch(t.fail);
breaker.fire().then(t.fail);
});
});

test('Passes arguments to the fallback function', (t) => {
const fails = -1;
const expected = 100;
const breaker = circuitBreaker(passFail, { maxFailures: 1 });
breaker.on('fallback', (result) => {
t.equals(result, fails);
t.end();
});
breaker.fallback((x) => x);
breaker.fire(fails)
.then(() => {
// Now the breaker should be open. See if fallback fires.
breaker.fire(expected)
.then((arg) => {
t.equals(arg, expected);
})
.then(t.end)
.catch(t.fail);
});
breaker.fire(fails).then(t.fail);
});

test('Returns self from fallback()', (t) => {
Expand Down

0 comments on commit d4496d8

Please sign in to comment.