-
Notifications
You must be signed in to change notification settings - Fork 57
/
index.js
554 lines (467 loc) · 13.9 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
'use strict';
var SimpleCache = require("simple-lru-cache")
, parse = require('connection-parse')
, crypto = require('crypto');
/**
* Generate the hash of the value.
*
* @api private
*/
function hashValueHash(a, b, c, d) {
return ((a << 24) | (b << 16) | (c << 8) | d) >>> 0;
}
/**
* Add a virtual node parser to the connection string parser.
*
* @param {Object} data server data
* @param {Mixed} value optional value
* @api private
*/
parse.extension('vnodes', function vnode(data, value) {
if (typeof value === 'object' && !Array.isArray(value) && 'vnodes' in value) {
data.vnodes = +value.vnodes || 0;
} else {
data.vnodes = 0;
}
});
/**
* HashRing implements consistent hashing so adding or removing servers of one
* slot does not significantly change the mapping of the key to slots. The
* consistent hashing algorithm is based on ketama or libketama.
*
* @constructor
* @param {Mixed} server Servers that need to be added to the ring.
* @param {Mixed} algorithm Either a Crypto compatible algorithm or custom hasher.
* @param {Object} options Optional configuration and options for the ring.
* @api public
*/
function HashRing(servers, algorithm, options) {
options = options || {};
//
// These properties can be configured
//
this.vnode = 'vnode count' in options ? options['vnode count'] : 40;
this.algorithm = algorithm || 'md5';
//
// If the default port is set, and a host uses it, then it is excluded from
// the hash.
//
this.defaultport = options['default port'] || null;
//
// There's a slight difference between libketama and python's hash_ring
// module, libketama creates 160 points per server:
//
// 40 hashes (vnodes) and 4 replicas per hash = 160 points per server.
//
// The hash_ring module only uses 120 points per server:
//
// 40 hashes (vnodes) and 3 replicas per hash = 120 points per server.
//
// And that's the only difference between the original ketama hash and the
// hash_ring package. Small, but important.
//
this.replicas = options.compatibility
? (options.compatibility === 'hash_ring' ? 3 : 4)
: ('replicas' in options ? +options.replicas : 4);
//
// Replicas cannot be 0 as it means we have nothing to iterate over when
// creating the initial hash ring.
//
if (this.replicas <= 0) this.replicas = 1;
// Private properties.
var connections = parse(servers);
this.ring = [];
this.size = 0;
this.vnodes = connections.vnodes;
this.servers = connections.servers;
// Set up a cache as we don't want to preform a hashing operation every single
// time we lookup a key.
this.cache = new SimpleCache({
maxSize: 'max cache size' in options ? options['max cache size'] : 5000
});
// Override the hashing function if people want to use a hashing algorithm
// that is not supported by Node, for example if you want to MurMur hashing or
// something else exotic.
if ('function' === typeof this.algorithm) {
this.hash = this.algorithm;
}
// Generate the continuum of the HashRing.
this.continuum();
}
/**
* Generates the continuum of server a.k.a. the Hash Ring based on their weights
* and virtual nodes assigned.
*
* @returns {HashRing}
* @api private
*/
HashRing.prototype.continuum = function generate() {
var servers = this.servers
, self = this
, index = 0
, total = 0;
// No servers, bailout.
if (!servers.length) return this;
// Generate the total weight of all the servers.
total = servers.reduce(function reduce(total, server) {
return total + server.weight;
}, 0);
servers.forEach(function each(server) {
var percentage = server.weight / total
, vnodes = self.vnodes[server.string] || self.vnode
, length = Math.floor(percentage * vnodes * servers.length)
, key
, x;
// If you supply us with a custom vnode size, we will use that instead of
// our computed distribution.
if (vnodes !== self.vnode) length = vnodes;
for (var i = 0; i < length; i++) {
if (self.defaultport && server.port === self.defaultport) {
x = self.digest(server.host +'-'+ i);
} else {
x = self.digest(server.string +'-'+ i);
}
for (var j = 0; j < self.replicas; j++) {
key = hashValueHash(x[3 + j * 4], x[2 + j * 4], x[1 + j * 4], x[j * 4]);
self.ring[index] = new Node(key, server.string);
index++;
}
}
});
// Sort the keys using the continuum points compare that is used in ketama
// hashing.
this.ring = this.ring.sort(function sorted(a, b) {
if (a.value === b.value) return 0;
else if (a.value > b.value) return 1;
return -1;
});
this.size = this.ring.length;
return this;
};
/**
* Find the correct node for which the key is closest to the point after what
* the given key hashes to.
*
* @param {String} key Key whose server we need to figure out.
* @returns {String} Server address.
* @api public
*/
HashRing.prototype.get = function get(key) {
var cache = this.cache.get(key);
if (cache) return cache;
var node = this.ring[this.find(this.hashValue(key))];
if (!node) return undefined;
this.cache.set(key, node.server);
return node.server;
};
/**
* Returns the position of the hashValue in the hashring.
*
* @param {Number} hashValue Find the nearest server close to this hash.
* @returns {Number} Position of the server in the hash ring.
* @api public
*/
HashRing.prototype.find = function find(hashValue) {
var ring = this.ring
, high = this.size
, low = 0
, middle
, prev
, mid;
// Perform a search on the array to find the server with the next biggest
// point after what the given key hashes to.
while (true) {
mid = (low + high) >> 1;
if (mid === this.size) return 0;
middle = ring[mid].value;
prev = mid === 0 ? 0 : ring[mid - 1].value;
if (hashValue <= middle && hashValue > prev) return mid;
if (middle < hashValue) {
low = mid + 1;
} else {
high = mid - 1;
}
if (low > high) return 0;
}
};
/**
* Generates a hash of the string.
*
* @param {String} key
* @returns {String|Buffer} Hash, depends on node version.
* @api private
*/
HashRing.prototype.hash = function hash(key) {
return crypto.createHash(this.algorithm).update(key).digest();
};
/**
* Digest hash so we can make a numeric representation from the hash.
*
* @param {String} key The key that needs to be hashed.
* @returns {Array}
* @api private
*/
HashRing.prototype.digest = function digest(key) {
var hash = this.hash(key +'');
// Support for Node 0.10 which returns buffers so we don't need charAt
// lookups.
if ('string' !== typeof hash) return hash;
return hash.split('').map(function charCode(char) {
return char.charCodeAt(0);
});
};
/**
* Get the hashed value for the given key.
*
* @param {String} key
* @returns {Number}
* @api private
*/
HashRing.prototype.hashValue = function hasher(key) {
var x = this.digest(key);
return hashValueHash(x[3], x[2], x[1], x[0]);
};
/**
* None ketama:
*
* The following changes are not ported from the ketama algorithm and are hash
* ring specific. Add, remove or replace servers with as less disruption as
* possible.
*/
/**
* Get a range of different servers.
*
* @param {String} key
* @param {Number} size Amount of servers it should return.
* @param {Boolean} unique Return only unique keys.
* @return {Array}
* @api public
*/
HashRing.prototype.range = function range(key, size, unique) {
if (!this.size) return [];
size = size || this.servers.length;
unique = unique || 'undefined' === typeof unique;
var position = this.find(this.hashValue(key))
, length = this.ring.length
, servers = []
, node;
// Start searching for servers from the position of the key to the end of
// HashRing.
for (var i = position; i < length; i++) {
node = this.ring[i];
// Do we need to make sure that we retrieve a unique list of servers?
if (unique) {
if (!~servers.indexOf(node.server)) servers.push(node.server);
} else {
servers.push(node.server);
}
if (servers.length === size) return servers;
}
// Not enough results yet, so iterate from the start of the hash ring to the
// position of the hash ring. So we reach full circle again.
for (i = 0; i < position; i++) {
node = this.ring[i];
// Do we need to make sure that we retrieve a unique list of servers?
if (unique) {
if (!~servers.indexOf(node.server)) servers.push(node.server);
} else {
servers.push(node.server);
}
if (servers.length === size) return servers;
}
return servers;
};
/**
* Returns the points per server.
*
* @param {String} server Optional server to filter down.
* @returns {Object} server -> Array(points).
* @api public
*/
HashRing.prototype.points = function points(servers) {
servers = Array.isArray(servers) ? servers : Object.keys(this.vnodes);
var nodes = Object.create(null)
, node;
servers.forEach(function servers(server) {
nodes[server] = [];
});
for (var i = 0; i < this.size; i++) {
node = this.ring[i];
if (node.server in nodes) {
nodes[node.server].push(node.value);
}
}
return nodes;
};
/**
* Hotswap identical servers with each other. This doesn't require the cache to
* be completely nuked and the hash ring distribution to be re-calculated.
*
* Please note that removing the server and adding a new server could
* potentially create a different distribution.
*
* @param {String} from The server that needs to be replaced.
* @param {String} to The server that replaces the server.
* @returns {HashRing}
* @api public
*/
HashRing.prototype.swap = function swap(from, to) {
var connection = parse(to).servers.pop()
, self = this;
this.ring.forEach(function forEach(node) {
if (node.server === from) node.server = to;
});
this.cache.forEach(function forEach(value, key) {
if (value === from) self.cache.set(key, to);
}, this);
// Update the virtual nodes
this.vnodes[to] = this.vnodes[from];
delete this.vnodes[from];
// Update the servers
this.servers = this.servers.map(function mapswap(server) {
if (server.string === from) {
server.string = to;
server.host = connection.host;
server.port = connection.port;
}
return server;
});
return this;
};
/**
* Add a new server to ring without having to re-initialize the hashring. It
* accepts the same arguments as you can use in the constructor.
*
* @param {Mixed} servers Servers that need to be added to the ring.
* @returns {HashRing}
* @api public
*/
HashRing.prototype.add = function add(servers) {
var connections = Object.create(null);
// Add the current servers to the set.
this.servers.forEach(function forEach(server) {
connections[server.string] = server;
});
parse(servers).servers.forEach(function forEach(server) {
// Don't add duplicate servers
if (server.string in connections) return;
connections[server.string] = server;
});
// Now that we generated a complete set of servers, we can update the re-parse
// the set and correctly added all the servers again.
connections = parse(connections);
this.vnodes = connections.vnodes;
this.servers = connections.servers;
// Rebuild the hash ring.
this.reset();
return this.continuum();
};
/**
* Remove a server from the hashring.
*
* @param {Mixed} server The sever we want to remove.
* @returns {HashRing}
* @api public
*/
HashRing.prototype.remove = function remove(server) {
var connection = parse(server).servers.pop();
delete this.vnodes[connection.string];
this.servers = this.servers.map(function map(server) {
if (server.string === connection.string) return undefined;
return server;
}).filter(Boolean);
// Rebuild the hash ring
this.reset();
return this.continuum();
};
/**
* Checks if a given server exists in the hash ring.
*
* @param {String} server Server for whose existence we're checking
* @returns {Boolean} Indication if we have that server.
* @api public
*/
HashRing.prototype.has = function has(server) {
for (var i = 0; i < this.ring.length; i++) {
if (this.ring[i].server === server) return true;
}
return false;
};
/**
* Reset the HashRing to clean up all references
*
* @returns {HashRing}
* @api public
*/
HashRing.prototype.reset = function reset() {
this.ring.length = 0;
this.size = 0;
this.cache.reset();
return this;
};
/**
* End the hashring and clean up all of its references.
*
* @returns {HashRing}
* @api public
*/
HashRing.prototype.end = function end() {
this.reset();
this.vnodes = {};
this.servers.length = 0;
return this;
};
/**
* A single Node in our hash ring.
*
* @constructor
* @param {Number} hashvalue
* @param {String} server
* @api private
*/
function Node(hashvalue, server) {
this.value = hashvalue;
this.server = server;
}
//
// Set up the legacy API aliases. These will be deprecated in the next release.
//
[
{ from: 'replaceServer' },
{ from: 'replace' },
{ from: 'removeServer', to: 'remove' },
{ from: 'addServer', to: 'add' },
{ from: 'getNode', to: 'get' },
{ from: 'getNodePosition', to: 'find' },
{ from: 'position', to: 'find' }
].forEach(function depricate(api) {
var notified = false;
HashRing.prototype[api.from] = function depricating() {
if (!notified) {
console.warn();
console.warn('[depricated] HashRing#'+ api.from +' is removed.');
// Not every API has a replacement API that should be used
if (api.to) {
console.warn('[depricated] use HashRing#'+ api.to +' as replacement.');
} else {
console.warn('[depricated] the API has no replacement');
}
console.warn();
notified = true;
}
if (api.to) return HashRing.prototype[api.to].apply(this, arguments);
};
});
/**
* Expose the current version number.
*
* @type {String}
* @public
*/
HashRing.version = require('./package.json').version;
/**
* Expose the module.
*
* @api public
*/
module.exports = HashRing;