Skip to content

Commit

Permalink
feat: addition of rolling percentile latency's. GH-ISSUE #38
Browse files Browse the repository at this point in the history
a new option has been added: rollingPercentilesEnabled, which defaults to true.  If this is false, all latency's and means are -1
  • Loading branch information
lholmquist committed Apr 24, 2017
1 parent 4d89ae4 commit ce7b50d
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 29 deletions.
35 changes: 22 additions & 13 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ const CACHE = new WeakMap();
* 10000, and options.rollingCountBuckets is 10, then the statistical window
* will be 1000 1 second snapshots in the statistical window. Default: 10
* @param options.name the name to use for this circuit when reporting stats
* @param options.rollingPercentilesEnabled {boolean} This property indicates whether execution latencies
* should be tracked and calculated as percentiles.
* If they are disabled, all summary statistics (mean, percentiles) are returned as -1.
*/
class CircuitBreaker extends EventEmitter {
constructor (action, options) {
super();
this.options = options;
this.options.rollingCountTimeout = options.rollingCountTimeout || 10000;
this.options.rollingCountBuckets = options.rollingCountBuckets || 10;
this.options.rollingPercentilesEnabled = options.rollingPercentilesEnabled !== false;

this[STATUS] = new Status(this.options);
this[STATE] = CLOSED;
Expand All @@ -60,7 +64,7 @@ class CircuitBreaker extends EventEmitter {

if (options.maxFailures) console.error('options.maxFailures is deprecated. Please use options.errorThresholdPercentage');

const increment = property => _ => this[STATUS].increment(property);
const increment = property => (result, runTime) => this[STATUS].increment(property, runTime);

this.on('success', increment('successes'));
this.on('failure', increment('failures'));
Expand Down Expand Up @@ -263,14 +267,15 @@ class CircuitBreaker extends EventEmitter {
*/
this.emit('reject', new Error('Breaker is open'));

return fallback(this, 'Breaker is open', args) ||
return fallback(this, 'Breaker is open', args, 0) ||
Promise.reject(new Error('Breaker is open'));
}
this[PENDING_CLOSE] = false;

let timeout;
let timeoutError = false;
return new Promise((resolve, reject) => {
const latencyStartTime = Date.now();
timeout = setTimeout(
() => {
timeoutError = true;
Expand All @@ -279,8 +284,9 @@ class CircuitBreaker extends EventEmitter {
* Emitted when the circuit breaker action takes longer than `options.timeout`
* @event CircuitBreaker#timeout
*/
this.emit('timeout', error);
resolve(handleError(error, this, timeout, args, resolve, reject));
const latency = Date.now() - latencyStartTime;
this.emit('timeout', error, latency);
resolve(handleError(error, this, timeout, args, latency, resolve, reject));
}, this.options.timeout);

try {
Expand All @@ -296,17 +302,20 @@ class CircuitBreaker extends EventEmitter {
* Emitted when the circuit breaker action succeeds
* @event CircuitBreaker#success
*/
this.emit('success', result);
this.emit('success', result, (Date.now() - latencyStartTime));
resolve(result);
if (this.options.cache) {
CACHE.set(this, promise);
}
}
})
.catch((error) =>
handleError(error, this, timeout, args, resolve, reject));
.catch((error) => {
const latencyEndTime = Date.now() - latencyStartTime;
handleError(error, this, timeout, args, latencyEndTime, resolve, reject);
});
} catch (error) {
handleError(error, this, timeout, args, resolve, reject);
const latency = Date.now() - latencyStartTime;
handleError(error, this, timeout, args, latency, resolve, reject);
}
});
}
Expand All @@ -319,10 +328,10 @@ class CircuitBreaker extends EventEmitter {
}
}

function handleError (error, circuit, timeout, args, resolve, reject) {
function handleError (error, circuit, timeout, args, latency, resolve, reject) {
clearTimeout(timeout);
fail(circuit, error, args);
const fb = fallback(circuit, error, args);
fail(circuit, error, args, latency);
const fb = fallback(circuit, error, args, latency);
if (fb) resolve(fb);
else reject(error);
}
Expand All @@ -341,12 +350,12 @@ function fallback (circuit, err, args) {
}
}

function fail (circuit, err, args) {
function fail (circuit, err, args, latency) {
/**
* Emitted when the circuit breaker action fails
* @event CircuitBreaker#failure
*/
circuit.emit('failure', err);
circuit.emit('failure', err, latency);

// check stats to see if the circuit should be opened
const stats = circuit.stats;
Expand Down
35 changes: 23 additions & 12 deletions lib/hystrix-formatter.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,31 @@ function hystrixFormatter (stats) {
json.currentConcurrentExecutionCount = 0;
json.rollingMaxConcurrentExecutionCount = 0;
// TODO: caluclate these latency values
json.latencyExecute_mean = 0;
json.latencyExecute_mean = stats.latencyMean || 0;
json.latencyExecute = {
'0': 0,
'25': 0,
'50': 0,
'75': 0,
'90': 0,
'95': 0,
'99': 0,
'99.5': 0,
'100': 0
0: stats.percentiles['0'],
25: stats.percentiles['0.25'],
50: stats.percentiles['0.5'],
75: stats.percentiles['0.75'],
90: stats.percentiles['0.9'],
95: stats.percentiles['0.95'],
99: stats.percentiles['0.99'],
99.5: stats.percentiles['0.995'],
100: stats.percentiles['1']
};
// Whats the difference between execute and total?
json.latencyTotal_mean = stats.latencyMean;
json.latencyTotal = {
0: stats.percentiles['0'],
25: stats.percentiles['0.25'],
50: stats.percentiles['0.5'],
75: stats.percentiles['0.75'],
90: stats.percentiles['0.9'],
95: stats.percentiles['0.95'],
99: stats.percentiles['0.99'],
99.5: stats.percentiles['0.995'],
100: stats.percentiles['1']
};
json.latencyTotal_mean = 0;
json.latencyTotal = { '0': 0, '25': 0, '50': 0, '75': 0, '90': 0, '95': 0, '99': 0, '99.5': 0, '100': 0 };
json.propertyValue_circuitBreakerRequestVolumeThreshold = 5;
json.propertyValue_circuitBreakerSleepWindowInMilliseconds = stats.options.resetTimeout;
json.propertyValue_circuitBreakerErrorThresholdPercentage = stats.options.errorThresholdPercentage;
Expand Down
59 changes: 55 additions & 4 deletions lib/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const WINDOW = Symbol('window');
const BUCKETS = Symbol('buckets');
const TIMEOUT = Symbol('timeout');
const PERCENTILES = Symbol('percentiles');

const EventEmitter = require('events').EventEmitter;

Expand Down Expand Up @@ -60,6 +61,10 @@ class Status extends EventEmitter {
this[BUCKETS] = options.rollingCountBuckets;
this[TIMEOUT] = options.rollingCountTimeout;
this[WINDOW] = new Array(this[BUCKETS]);
this[PERCENTILES] = [0.0, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.995, 1];

// Default this value to true
this.rollingPercentilesEnabled = options.rollingPercentilesEnabled;

// prime the window with buckets
for (let i = 0; i < this[BUCKETS]; i++) this[WINDOW][i] = bucket();
Expand All @@ -81,12 +86,45 @@ class Status extends EventEmitter {
* Get the cumulative stats for the current window
*/
get stats () {
return this[WINDOW].reduce((acc, val) => {
const totals = this[WINDOW].reduce((acc, val) => {
// the window starts with all but one bucket undefined
if (!val) return acc;
Object.keys(acc).forEach(key => (acc[key] += val[key] || 0));
Object.keys(acc).forEach(key => {
if (key !== 'latencyTimes' && key !== 'percentiles') {
(acc[key] += val[key] || 0);
}
});

if (this.rollingPercentilesEnabled) {
acc.latencyTimes.push.apply(acc.latencyTimes, val.latencyTimes || []);
}
return acc;
}, bucket());

if (this.rollingPercentilesEnabled) {
// Sort the latencyTimess
totals.latencyTimes.sort((a, b) => a - b);

// Get the mean latency
// Mean = sum of all values in the array/length of array
if (totals.latencyTimes.length) {
totals.latencyMean = (totals.latencyTimes.reduce((a, b) => a + b, 0)) / totals.latencyTimes.length;
} else {
totals.latencyMean = 0;
}

// Calculate Percentiles
this[PERCENTILES].forEach(percentile => {
totals.percentiles[percentile] = calculatePercentile(percentile, totals.latencyTimes);
});
} else {
totals.latencyMean = -1;
this[PERCENTILES].forEach(percentile => {
totals.percentiles[percentile] = -1;
});
}

return totals;
}

/**
Expand All @@ -96,8 +134,11 @@ class Status extends EventEmitter {
return this[WINDOW].slice();
}

increment (property) {
increment (property, latencyRunTime) {
this[WINDOW][0][property]++;
if (property === 'successes' || property === 'failures' || property === 'timeouts') {
this[WINDOW][0].latencyTimes.push(latencyRunTime || 0);
}
}

open () {
Expand All @@ -122,7 +163,17 @@ const bucket = _ => ({
fires: 0,
timeouts: 0,
cacheHits: 0,
cacheMisses: 0
cacheMisses: 0,
percentiles: {},
latencyTimes: []
});

function calculatePercentile (percentile, arr) {
if (percentile === 0) {
return arr[0] || 0;
}
const idx = Math.ceil(percentile * arr.length);
return arr[idx - 1] || 0;
}

module.exports = exports = Status;
55 changes: 55 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,61 @@ test('options.maxFailures should be deprecated', (t) => {
cb(passFail, options);
});

test('rolling percentile enabled option defaults to true', (t) => {
const breaker = cb(passFail);
t.equals(breaker.status.rollingPercentilesEnabled, true, 'rollingPercentilesEnabled should default to true');
t.equals(breaker.status.stats.latencyMean, 0, 'latencyMean is starts at 0 when rollingPercentilesEnabled is true');
[0.0, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.995, 1].forEach((p) => {
t.equals(breaker.status.stats.percentiles[p], 0, `${p} percentile should be 0 at the start`);
});
t.end();
});

test('rolling percentile enabled option set to false', (t) => {
const options = { rollingPercentilesEnabled: false };
const breaker = cb(passFail, options);
t.equals(breaker.status.rollingPercentilesEnabled, false, 'rollingPercentilesEnabled set to false');
t.equals(breaker.status.stats.latencyMean, -1, 'latencyMean is -1 when rollingPercentilesEnabled is false');
[0.0, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 0.995, 1].forEach((p) => {
t.equals(breaker.status.stats.percentiles[p], -1, `${p} percentile should be -1 when rollingPercentilesEnabled is false`);
});
t.end();
});

test('Circuit Breaker success event emits latency', (t) => {
t.plan(1);
const breaker = cb(passFail);
breaker.on('success', (result, latencyTime) => {
t.ok(latencyTime, 'second argument is the latency');
t.end();
});

breaker.fire(1);
});

test('Circuit Breaker timeout event emits latency', (t) => {
t.plan(1);
const breaker = cb(passFail);
breaker.on('failure', (result, latencyTime) => {
t.ok(latencyTime, 'second argument is the latency');
t.end();
});

breaker.fire(-1).catch(() => {});
});

test('Circuit Breaker failure event emits latency', (t) => {
t.plan(1);
const breaker = cb(slowFunction, { timeout: 10 });

breaker.on('timeout', (result, latencyTime) => {
t.ok(latencyTime, 'second argument is the latency');
t.end();
});

breaker.fire(-1).catch(() => {});
});

/**
* Returns a promise that resolves if the parameter
* 'x' evaluates to >= 0. Otherwise the returned promise fails.
Expand Down

0 comments on commit ce7b50d

Please sign in to comment.