From 3cc23f5a381d7bbefbc4ca7153b8a7daf3d27ccd Mon Sep 17 00:00:00 2001 From: nivida Date: Thu, 7 Nov 2019 17:05:12 +0100 Subject: [PATCH] subscriptions are now hold in a Map in the RequestManager, and resubscribing after the connection got lost will now get triggered from the RequestManager instead of the subscriptions --- .../web3-core-requestmanager/src/index.js | 62 +++++++++++-------- .../src/subscription.js | 50 +++------------ 2 files changed, 45 insertions(+), 67 deletions(-) diff --git a/packages/web3-core-requestmanager/src/index.js b/packages/web3-core-requestmanager/src/index.js index d07c0cd9fb0..6c92a141431 100644 --- a/packages/web3-core-requestmanager/src/index.js +++ b/packages/web3-core-requestmanager/src/index.js @@ -42,7 +42,7 @@ var RequestManager = function RequestManager(provider) { this.providers = RequestManager.providers; this.setProvider(provider); - this.subscriptions = {}; + this.subscriptions = new Map(); }; @@ -87,9 +87,9 @@ RequestManager.prototype.setProvider = function (p, net) { } // reset the old one before changing, if still connected - if(this.provider && this.provider.connected) + if(this.provider && this.provider.connected) { this.clearSubscriptions(); - + } this.provider = p || null; @@ -99,20 +99,25 @@ RequestManager.prototype.setProvider = function (p, net) { result = result || deprecatedResult; // this is for possible old providers, which may had the error first handler // check for result.method, to prevent old providers errors to pass as result - if(result.method && _this.subscriptions[result.params.subscription] && _this.subscriptions[result.params.subscription].callback) { - _this.subscriptions[result.params.subscription].callback(null, result.params.result); + if(result.method && _this.subscriptions.has(result.params.subscription)) { + _this.subscriptions.get(result.params.subscription).callback(null, result.params.result); } }); + this.provider.on('connect', function () { + _this.subscriptions.forEach(function(subscription) { + subscription.resubscribe(); + }); + }); + // notify all subscriptions about the error condition this.provider.on('error', function (event) { - Object.keys(_this.subscriptions).forEach(function(id){ - if(_this.subscriptions[id] && _this.subscriptions[id].callback) - _this.subscriptions[id].callback(event.code || new Error('Provider error')); + _this.subscriptions.forEach(function(subscription) { + subscription.callback(event.code || new Error('Provider error')); }); }); - // TODO add end, timeout, connect?? + // TODO add end, timeout?? } }; @@ -133,7 +138,9 @@ RequestManager.prototype.send = function (data, callback) { var payload = Jsonrpc.toPayload(data.method, data.params); this.provider[this.provider.sendAsync ? 'sendAsync' : 'send'](payload, function (err, result) { - if(result && result.id && payload.id !== result.id) return callback(new Error('Wrong response id "'+ result.id +'" (expected: "'+ payload.id +'") in '+ JSON.stringify(payload))); + if(result && result.id && payload.id !== result.id) { + return callback(new Error('Wrong response id "'+ result.id +'" (expected: "'+ payload.id +'") in '+ JSON.stringify(payload))); + } if (err) { return callback(err); @@ -155,7 +162,7 @@ RequestManager.prototype.send = function (data, callback) { * Should be called to asynchronously send batch request * * @method sendBatch - * @param {Array} batch data + * @param {Array} data - array of payload objects * @param {Function} callback */ RequestManager.prototype.sendBatch = function (data, callback) { @@ -182,18 +189,19 @@ RequestManager.prototype.sendBatch = function (data, callback) { * Waits for notifications * * @method addSubscription - * @param {String} id the subscription id - * @param {String} name the subscription name + * @param {Subscription} subscription the subscription * @param {String} type the subscription namespace (eth, personal, etc) * @param {Function} callback the callback to call for incoming notifications */ -RequestManager.prototype.addSubscription = function (id, name, type, callback) { +RequestManager.prototype.addSubscription = function (subscription, callback) { if(this.provider.on) { - this.subscriptions[id] = { + this.subscriptions.set( + subscription.id, + { callback: callback, - type: type, - name: name - }; + subscription: subscription + } + ); } else { throw new Error('The provider doesn\'t support subscriptions: '+ this.provider.constructor.name); @@ -208,18 +216,20 @@ RequestManager.prototype.addSubscription = function (id, name, type, callback) { * @param {Function} callback fired once the subscription is removed */ RequestManager.prototype.removeSubscription = function (id, callback) { - if(this.subscriptions[id]) { - var type = this.subscriptions[id].type; - + if(this.subscriptions.has(id)) { // remove subscription first to avoid reentry - delete this.subscriptions[id]; + this.subscriptions.delete(id); // then, try to actually unsubscribe this.send({ - method: type + '_unsubscribe', + method: this.subscriptions.get(id).subscription.type + '_unsubscribe', params: [id] }, callback); - } else if (typeof callback === 'function') { + + return; + } + + if (typeof callback === 'function') { // call the callback if the subscription was already removed callback(null); } @@ -235,8 +245,8 @@ RequestManager.prototype.clearSubscriptions = function (keepIsSyncing) { // uninstall all subscriptions - Object.keys(this.subscriptions).forEach(function(id){ - if(!keepIsSyncing || _this.subscriptions[id].name !== 'syncing') + this.subscriptions.forEach(function(value, id){ + if(!keepIsSyncing || _this.subscriptions.get(id).name !== 'syncing') _this.removeSubscription(id); }); diff --git a/packages/web3-core-subscriptions/src/subscription.js b/packages/web3-core-subscriptions/src/subscription.js index 9ec301f1615..21a0f297d7c 100644 --- a/packages/web3-core-subscriptions/src/subscription.js +++ b/packages/web3-core-subscriptions/src/subscription.js @@ -249,12 +249,12 @@ Subscription.prototype.subscribe = function() { this.options.requestManager.send(payload, function (err, result) { if(!err && result) { _this.id = result; + _this.method = payload.params[0]; _this.emit('connected', result); // call callback on notifications - _this.options.requestManager.addSubscription(_this.id, payload.params[0] , _this.options.type, function(err, result) { - - if (!err) { + _this.options.requestManager.addSubscription(_this, function(error, result) { + if (!error) { if (!_.isArray(result)) { result = [result]; } @@ -272,18 +272,10 @@ Subscription.prototype.subscribe = function() { _this.callback(null, output, _this); }); } else { - _this._resubscribe(err); + _this.callback(error); + _this.emit('error', error); } }); - - // just in case the provider reconnects silently, resubscribe over the new connection - if (_this.options.requestManager.provider.once) { - _this.options.requestManager.provider.once('connect', function () { - _this._resubscribe(); - }); - } - } else { - _this._resubscribe(err); } }); @@ -291,38 +283,14 @@ Subscription.prototype.subscribe = function() { return this; }; -Subscription.prototype._resubscribe = function (err) { - var _this = this; - +Subscription.prototype.resubscribe = function () { // unsubscribe this.options.requestManager.removeSubscription(this.id); - // re-subscribe, if connection fails - if(this.options.requestManager.provider.once && !_this._reconnectIntervalId) { - this._reconnectIntervalId = setInterval(function () { - // TODO check if that makes sense! - if (_this.options.requestManager.provider.reconnect) { - _this.options.requestManager.provider.reconnect(); - } - }, 500); - - this.options.requestManager.provider.once('connect', function () { - clearInterval(_this._reconnectIntervalId); - _this._reconnectIntervalId = null; - - // delete id to keep the listeners on subscribe - _this.id = null; - - _this.subscribe(_this.callback); - }); - } - - if (err) { - this.emit('error', err); - } + // delete id to keep the listeners on subscribe + this.id = null; - // call the callback, last so that unsubscribe there won't affect the emit above - this.callback(err, null, this); + this.subscribe(this.callback); }; module.exports = Subscription;