Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full support for websocket reconnection/resubscription #1966

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions packages/web3-core-requestmanager/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,16 @@ RequestManager.prototype.setProvider = function (p, net) {
_this.subscriptions[result.params.subscription].callback(null, result.params.result);
}
});
// TODO add error, end, timeout, connect??
// this.provider.on('error', function requestManagerNotification(result){
// Object.keys(_this.subscriptions).forEach(function(id){
// if(_this.subscriptions[id].callback)
// _this.subscriptions[id].callback(err);
// });
// }

// notify all subscriptions about the error condition
this.provider.on('error', function (event) {
Object.keys(_this.subscriptions).forEach(function(id){
if(_this.subscriptions[id] && _this.subscriptions[id].callback)
_this.subscriptions[id].callback(event.code || new Error('Provider error'));
});
});

// TODO add end, timeout, connect??
}
};

Expand Down Expand Up @@ -205,17 +208,20 @@ RequestManager.prototype.addSubscription = function (id, name, type, callback) {
* @param {Function} callback fired once the subscription is removed
*/
RequestManager.prototype.removeSubscription = function (id, callback) {
var _this = this;

if(this.subscriptions[id]) {
var type = this.subscriptions[id].type;

// remove subscription first to avoid reentry
delete this.subscriptions[id];

// then, try to actually unsubscribe
this.send({
method: this.subscriptions[id].type + '_unsubscribe',
method: type + '_unsubscribe',
params: [id]
}, callback);

// remove subscription
delete _this.subscriptions[id];
} else if (typeof callback === 'function') {
// call the callback if the subscription was already removed
callback(null);
}
};

Expand Down
57 changes: 34 additions & 23 deletions packages/web3-core-subscriptions/src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,37 +271,48 @@ Subscription.prototype.subscribe = function() {
_this.callback(null, output, _this);
});
} else {
// unsubscribe, but keep listeners
_this.options.requestManager.removeSubscription(_this.id);

// re-subscribe, if connection fails
if(_this.options.requestManager.provider.once) {
_this._reconnectIntervalId = setInterval(function () {
// TODO check if that makes sense!
if (_this.options.requestManager.provider.reconnect) {
_this.options.requestManager.provider.reconnect();
}
}, 500);

_this.options.requestManager.provider.once('connect', function () {
clearInterval(_this._reconnectIntervalId);
_this.subscribe(_this.callback);
});
}
_this.emit('error', err);

// call the callback, last so that unsubscribe there won't affect the emit above
_this.callback(err, null, _this);
_this._resubscribe(err);
}
});
} else {
_this.callback(err, null, _this);
_this.emit('error', err);
_this._resubscribe(err);
}
});

// return an object to cancel the subscription
return this;
};

Subscription.prototype._resubscribe = function (err) {
var _this = this;

// unsubscribe
this.options.requestManager.removeSubscription(this.id);

// re-subscribe, if connection fails
if(this.options.requestManager.provider.once && !_this._reconnectIntervalId) {
this._reconnectIntervalId = setInterval(function () {
// TODO check if that makes sense!
if (_this.options.requestManager.provider.reconnect) {
_this.options.requestManager.provider.reconnect();
}
}, 500);

this.options.requestManager.provider.once('connect', function () {
clearInterval(_this._reconnectIntervalId);
_this._reconnectIntervalId = null;

// delete id to keep the listeners on subscribe
_this.id = null;

_this.subscribe(_this.callback);
});
}

this.emit('error', err);

// call the callback, last so that unsubscribe there won't affect the emit above
this.callback(err, null, this);
};

module.exports = Subscription;
10 changes: 9 additions & 1 deletion packages/web3-providers-ws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ This will expose the `Web3WsProvider` object on the window object.
// in node.js
var Web3WsProvider = require('web3-providers-ws');

var options = { timeout: 30000, headers: {authorization: 'Basic username:password'} } // set a custom timeout at 30 seconds, and credentials (you can also add the credentials to the URL: ws://username:password@localhost:8546)
var options = {
// set credentials (you can also add the credentials to the URL:
// ws://username:password@localhost:8546)
headers: {authorization: 'Basic username:password'},
// set a custom timeout at 30 seconds
timeout: 30000,
// enable WebSocket auto-reconnection
autoReconnect: true
}
var ws = new Web3WsProvider('ws://localhost:8546', options);
```

Expand Down
5 changes: 5 additions & 0 deletions packages/web3-providers-ws/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion packages/web3-providers-ws/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"dependencies": {
"underscore": "1.8.3",
"web3-core-helpers": "1.0.0-beta.36",
"websocket": "git://github.com/frozeman/WebSocket-Node.git#browserifyCompatible"
"websocket": "git://github.com/frozeman/WebSocket-Node.git#browserifyCompatible",
"websocket-reconnector": "1.1.1"
}
}
50 changes: 45 additions & 5 deletions packages/web3-providers-ws/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
var _ = require('underscore');
var errors = require('web3-core-helpers').errors;

var WsReconnector = require('websocket-reconnector');

var Ws = null;
var _btoa = null;
var parseURL = null;
Expand Down Expand Up @@ -80,6 +82,11 @@ var WebsocketProvider = function WebsocketProvider(url, options) {
// Allow a custom client configuration
var clientConfig = options.clientConfig || undefined;

// Enable automatic reconnection wrapping `Ws` with reconnector
if (options.autoReconnect) {
Ws = WsReconnector(Ws);
}

// When all node core implementations that do not have the
// WHATWG compatible URL parser go out of service this line can be removed.
if (parsedURL.auth) {
Expand Down Expand Up @@ -232,7 +239,13 @@ WebsocketProvider.prototype._addResponseCallback = function(payload, callback) {
setTimeout(function () {
if (_this.responseCallbacks[id]) {
_this.responseCallbacks[id](errors.ConnectionTimeout(_this._customTimeout));

delete _this.responseCallbacks[id];

// try to reconnect
if (_this.connection.reconnect) {
_this.connection.reconnect();
}
}
}, this._customTimeout);
}
Expand Down Expand Up @@ -299,15 +312,15 @@ WebsocketProvider.prototype.on = function (type, callback) {
break;

case 'connect':
this.connection.onopen = callback;
this.connection.addEventListener('open', callback);
break;

case 'end':
this.connection.onclose = callback;
this.connection.addEventListener('close', callback);
break;

case 'error':
this.connection.onerror = callback;
this.connection.addEventListener('error', callback);
break;

// default:
Expand All @@ -316,7 +329,24 @@ WebsocketProvider.prototype.on = function (type, callback) {
}
};

// TODO add once
/**
Subscribes to provider only once

@method once
@param {String} type 'notifcation', 'connect', 'error', 'end' or 'data'
@param {Function} callback the callback to call
*/
WebsocketProvider.prototype.once = function (type, callback) {
var _this = this;

function onceCallback(event) {
_this.removeListener(type, onceCallback);

callback(event);
}

this.on(type, onceCallback);
};

/**
Removes event listener
Expand All @@ -336,7 +366,17 @@ WebsocketProvider.prototype.removeListener = function (type, callback) {
});
break;

// TODO remvoving connect missing
case 'connect':
this.connection.removeEventListener('open', callback);
break;

case 'end':
this.connection.removeEventListener('close', callback);
break;

case 'error':
this.connection.removeEventListener('error', callback);
break;

// default:
// this.connection.removeListener(type, callback);
Expand Down