Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re-subscribing moved to RequestManager #3187

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 36 additions & 26 deletions packages/web3-core-requestmanager/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var RequestManager = function RequestManager(provider) {
this.providers = RequestManager.providers;

this.setProvider(provider);
this.subscriptions = {};
this.subscriptions = new Map();
};


Expand Down Expand Up @@ -87,9 +87,9 @@ RequestManager.prototype.setProvider = function (p, net) {
}

// reset the old one before changing, if still connected
if(this.provider && this.provider.connected)
if(this.provider && this.provider.connected) {
this.clearSubscriptions();

}

this.provider = p || null;

Expand All @@ -99,20 +99,25 @@ RequestManager.prototype.setProvider = function (p, net) {
result = result || deprecatedResult; // this is for possible old providers, which may had the error first handler

// check for result.method, to prevent old providers errors to pass as result
if(result.method && _this.subscriptions[result.params.subscription] && _this.subscriptions[result.params.subscription].callback) {
_this.subscriptions[result.params.subscription].callback(null, result.params.result);
if(result.method && _this.subscriptions.has(result.params.subscription)) {
_this.subscriptions.get(result.params.subscription).callback(null, result.params.result);
}
});

this.provider.on('connect', function () {
_this.subscriptions.forEach(function(subscription) {
subscription.resubscribe();
});
});

// notify all subscriptions about the error condition
this.provider.on('error', function (event) {
Object.keys(_this.subscriptions).forEach(function(id){
if(_this.subscriptions[id] && _this.subscriptions[id].callback)
_this.subscriptions[id].callback(event.code || new Error('Provider error'));
_this.subscriptions.forEach(function(subscription) {
subscription.callback(event.code || new Error('Provider error'));
});
});

// TODO add end, timeout, connect??
// TODO add end, timeout??
}
};

Expand All @@ -133,7 +138,9 @@ RequestManager.prototype.send = function (data, callback) {

var payload = Jsonrpc.toPayload(data.method, data.params);
this.provider[this.provider.sendAsync ? 'sendAsync' : 'send'](payload, function (err, result) {
if(result && result.id && payload.id !== result.id) return callback(new Error('Wrong response id "'+ result.id +'" (expected: "'+ payload.id +'") in '+ JSON.stringify(payload)));
if(result && result.id && payload.id !== result.id) {
return callback(new Error('Wrong response id "'+ result.id +'" (expected: "'+ payload.id +'") in '+ JSON.stringify(payload)));
}

if (err) {
return callback(err);
Expand All @@ -155,7 +162,7 @@ RequestManager.prototype.send = function (data, callback) {
* Should be called to asynchronously send batch request
*
* @method sendBatch
* @param {Array} batch data
* @param {Array} data - array of payload objects
* @param {Function} callback
*/
RequestManager.prototype.sendBatch = function (data, callback) {
Expand All @@ -182,18 +189,19 @@ RequestManager.prototype.sendBatch = function (data, callback) {
* Waits for notifications
*
* @method addSubscription
* @param {String} id the subscription id
* @param {String} name the subscription name
* @param {Subscription} subscription the subscription
* @param {String} type the subscription namespace (eth, personal, etc)
* @param {Function} callback the callback to call for incoming notifications
*/
RequestManager.prototype.addSubscription = function (id, name, type, callback) {
RequestManager.prototype.addSubscription = function (subscription, callback) {
if(this.provider.on) {
this.subscriptions[id] = {
this.subscriptions.set(
subscription.id,
{
callback: callback,
type: type,
name: name
};
subscription: subscription
}
);

} else {
throw new Error('The provider doesn\'t support subscriptions: '+ this.provider.constructor.name);
Expand All @@ -208,18 +216,20 @@ RequestManager.prototype.addSubscription = function (id, name, type, callback) {
* @param {Function} callback fired once the subscription is removed
*/
RequestManager.prototype.removeSubscription = function (id, callback) {
if(this.subscriptions[id]) {
var type = this.subscriptions[id].type;

if(this.subscriptions.has(id)) {
// remove subscription first to avoid reentry
delete this.subscriptions[id];
this.subscriptions.delete(id);

// then, try to actually unsubscribe
this.send({
method: type + '_unsubscribe',
method: this.subscriptions.get(id).subscription.type + '_unsubscribe',
params: [id]
}, callback);
} else if (typeof callback === 'function') {

return;
}

if (typeof callback === 'function') {
// call the callback if the subscription was already removed
callback(null);
}
Expand All @@ -235,8 +245,8 @@ RequestManager.prototype.clearSubscriptions = function (keepIsSyncing) {


// uninstall all subscriptions
Object.keys(this.subscriptions).forEach(function(id){
if(!keepIsSyncing || _this.subscriptions[id].name !== 'syncing')
this.subscriptions.forEach(function(value, id){
if(!keepIsSyncing || _this.subscriptions.get(id).name !== 'syncing')
_this.removeSubscription(id);
});

Expand Down
50 changes: 9 additions & 41 deletions packages/web3-core-subscriptions/src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,12 @@ Subscription.prototype.subscribe = function() {
this.options.requestManager.send(payload, function (err, result) {
if(!err && result) {
_this.id = result;
_this.method = payload.params[0];
_this.emit('connected', result);

// call callback on notifications
_this.options.requestManager.addSubscription(_this.id, payload.params[0] , _this.options.type, function(err, result) {

if (!err) {
_this.options.requestManager.addSubscription(_this, function(error, result) {
if (!error) {
if (!_.isArray(result)) {
result = [result];
}
Expand All @@ -272,57 +272,25 @@ Subscription.prototype.subscribe = function() {
_this.callback(null, output, _this);
});
} else {
_this._resubscribe(err);
_this.callback(error);
_this.emit('error', error);
}
});

// just in case the provider reconnects silently, resubscribe over the new connection
if (_this.options.requestManager.provider.once) {
_this.options.requestManager.provider.once('connect', function () {
_this._resubscribe();
});
}
} else {
_this._resubscribe(err);
}
});

// return an object to cancel the subscription
return this;
};

Subscription.prototype._resubscribe = function (err) {
var _this = this;

Subscription.prototype.resubscribe = function () {
// unsubscribe
this.options.requestManager.removeSubscription(this.id);

// re-subscribe, if connection fails
if(this.options.requestManager.provider.once && !_this._reconnectIntervalId) {
this._reconnectIntervalId = setInterval(function () {
// TODO check if that makes sense!
if (_this.options.requestManager.provider.reconnect) {
_this.options.requestManager.provider.reconnect();
}
}, 500);

this.options.requestManager.provider.once('connect', function () {
clearInterval(_this._reconnectIntervalId);
_this._reconnectIntervalId = null;

// delete id to keep the listeners on subscribe
_this.id = null;

_this.subscribe(_this.callback);
});
}

if (err) {
this.emit('error', err);
}
// delete id to keep the listeners on subscribe
this.id = null;

// call the callback, last so that unsubscribe there won't affect the emit above
this.callback(err, null, this);
this.subscribe(this.callback);
};

module.exports = Subscription;