Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Miscellaneous tweaks to transport handling and auth tests #296

Closed
wants to merge 8 commits into from
30 changes: 13 additions & 17 deletions common/lib/transport/connectionmanager.js
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,10 @@ var ConnectionManager = (function() {

/* Terminate any other pending transport(s), and
* abort any not-yet-pending transport attempts */
Utils.arrForEach(this.pendingTransports, function(transport) {
Utils.safeArrForEach(this.pendingTransports, function(transport) {
transport.disconnect();
});
Utils.arrForEach(this.proposedTransports, function(transport) {
Utils.safeArrForEach(this.proposedTransports, function(transport) {
transport.dispose();
});

Expand All @@ -526,15 +526,16 @@ var ConnectionManager = (function() {
* @param transport
*/
ConnectionManager.prototype.deactivateTransport = function(transport, state, error) {
var currentProtocol = this.activeProtocol,
wasActive = currentProtocol && currentProtocol.getTransport() === transport,
wasPending = Utils.arrDeleteValue(this.pendingTransports, transport),
wasProposed = Utils.arrDeleteValue(this.proposedTransports, transport);

Logger.logAction(Logger.LOG_MINOR, 'ConnectionManager.deactivateTransport()', 'transport = ' + transport);
Logger.logAction(Logger.LOG_MINOR, 'ConnectionManager.deactivateTransport()', 'state = ' + state);
Logger.logAction(Logger.LOG_MINOR, 'ConnectionManager.deactivateTransport()', 'state = ' + state + (wasActive ? '; was active' : wasPending ? '; was pending' : wasProposed ? '; was proposed' : ''));
if(error && error.message)
Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.deactivateTransport()', 'reason = ' + error.message);

var currentProtocol = this.activeProtocol,
wasActive = currentProtocol && currentProtocol.getTransport() === transport,
wasPending = Utils.arrDeleteValue(this.pendingTransports, transport);

if(wasActive) {
Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.deactivateTransport()', 'Getting, clearing, and requeuing ' + this.activeProtocol.messageQueue.count() + ' pending messages');
this.queuePendingMessages(currentProtocol.getPendingMessages());
Expand Down Expand Up @@ -954,15 +955,12 @@ var ConnectionManager = (function() {
transport.disconnect();
Utils.arrDeleteValue(this.pendingTransports, transport);
} else {
clearTimeout(preferenceTimeout);
if(err) {
clearTimeout(preferenceTimeout);
clearTransportPreference();
self.failConnectionIfFatal(err);
self.connectImpl(transportParams);
}
/* If no err then transport is viable (=> pending), so allow
* preferenceTimeout to keep ticking while transport waits to be
* connected */
}
});
};
Expand Down Expand Up @@ -1078,17 +1076,15 @@ var ConnectionManager = (function() {
}
}

Utils.arrForEach(this.pendingTransports, function(transport) {
Utils.safeArrForEach(this.pendingTransports, function(transport) {
Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.closeImpl()', 'Closing pending transport: ' + transport);
closeTransport(transport);
});
this.pendingTransports = [];

Utils.arrForEach(this.proposedTransports, function(transport) {
Utils.safeArrForEach(this.proposedTransports, function(transport) {
Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.closeImpl()', 'Disposing of proposed transport: ' + transport);
transport.dispose();
});
this.proposedTransports = [];

if(this.activeProtocol) {
Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.closeImpl()', 'Closing active transport: ' + this.activeProtocol.getTransport());
Expand Down Expand Up @@ -1131,13 +1127,13 @@ var ConnectionManager = (function() {
}
}

Utils.arrForEach(this.pendingTransports, function(transport) {
Utils.safeArrForEach(this.pendingTransports, function(transport) {
Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.disconnectAllTransports()', 'Disconnecting pending transport: ' + transport);
disconnectTransport(transport);
});
this.pendingTransports = [];

Utils.arrForEach(this.proposedTransports, function(transport) {
Utils.safeArrForEach(this.proposedTransports, function(transport) {
Logger.logAction(Logger.LOG_MICRO, 'ConnectionManager.disconnectAllTransports()', 'Disposing of proposed transport: ' + transport);
transport.dispose();
});
Expand Down
11 changes: 11 additions & 0 deletions common/lib/util/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ var Utils = (function() {
return res;
};

Utils.arrWithoutValue = function(arr, val) {
var newArr = arr.slice();
Utils.arrDeleteValue(newArr, val);
return newArr;
};

/*
* Construct an array of the keys of the enumerable
* properties of a given object, optionally limited
Expand Down Expand Up @@ -239,6 +245,11 @@ var Utils = (function() {
}
};

/* Useful when the function may mutate the array */
Utils.safeArrForEach = function(arr, fn) {
return Utils.arrForEach(arr.slice(), fn);
};

Utils.arrMap = Array.prototype.map ?
function(arr, fn) {
return arr.map(fn);
Expand Down
2 changes: 1 addition & 1 deletion spec/common/globals/environment.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
define(function(require) {
var defaultLogLevel = 2,
environment = isBrowser ? (window.__env__ || {}) : process.env,
ablyEnvironment = environment.ABLY_ENV || 'sandbox',
ablyEnvironment = environment.ABLY_ENV || 'staging',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to commit this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, was just to change the ci run. (Hence the commit message of TEMP NOTFORMERGING :) )

realtimeHost = environment.ABLY_REALTIME_HOST,
restHost = environment.ABLY_REST_HOST,
port = environment.ABLY_PORT || 80,
Expand Down
10 changes: 9 additions & 1 deletion spec/common/modules/shared_helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,15 @@ define(['spec/common/modules/testapp_module', 'spec/common/modules/client_module

/* testFn is assumed to be a function of realtimeOptions that returns a nodeunit test */
function testOnAllTransports(exports, name, testFn, excludeUpgrade) {
utils.arrForEach(availableTransports, function(transport) {
/* Don't include jsonp if possible. Why? Because you can't abort requests. So recv's
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when will jsonp get tested?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In spec/browser/simple.test.js (same as before I added the testOnAllTransports helper)

* stick around for 90s till realtime ends them. So in a test, the
* browsers max-connections-per-host limit fills up quickly, which messes
* up other comet transports too */
var transports = utils.arrIn(availableTransports, 'xhr_polling') ?
utils.arrWithoutValue(availableTransports, 'jsonp') :
availableTransports;

utils.arrForEach(transports, function(transport) {
exports[name + '_with_' + transport + '_binary_transport'] = testFn({transports: [transport], useBinaryProtocol: true});
exports[name + '_with_' + transport + '_text_transport'] = testFn({transports: [transport], useBinaryProtocol: false});
});
Expand Down
55 changes: 30 additions & 25 deletions spec/realtime/auth.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -398,17 +398,20 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) {
test.done();
return;
}
clientRealtime = helper.AblyRealtime(mixin(realtimeOpts, { tokenDetails: tokenDetails, authCallback: function(){}, queryTime: true }));
monitorConnection(test, clientRealtime);
clientRealtime = helper.AblyRealtime(mixin(realtimeOpts, { tokenDetails: tokenDetails, queryTime: true }));

clientRealtime.connection.on('failed', function(){
test.ok(false, 'Failed to connect before token expired');
closeAndFinish(test, clientRealtime);
});
clientRealtime.connection.once('connected', function(){
clientRealtime.connection.off('failed');
test.ok(true, 'Verify connection connected');
clientRealtime.connection.once('disconnected', function(stateChange){
test.ok(true, 'Verify connection disconnected');
test.equal(stateChange.reason.statusCode, 401, 'Verify correct disconnect statusCode');
test.equal(stateChange.reason.code, 40142, 'Verify correct disconnect code');
clientRealtime.close();
test.done();
closeAndFinish(test, clientRealtime);
});
});
});
Expand Down Expand Up @@ -668,12 +671,12 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) {
* use authorise({force: true}) to force a reauth using an existing authCallback
*/
testOnAllTransports(exports, 'reauth_authCallback', function(realtimeOpts) { return function(test) {
test.expect(5);
test.expect(8);
var realtime, rest = helper.AblyRest();
var firstTime = true;
var clientId = "testclientid";
var authCallback = function(tokenParams, callback) {
tokenParams.ttl = firstTime ? 5000 : 60000;
tokenParams.clientId = '*';
tokenParams.capability = firstTime ? {'wrong': ['*']} : {'right': ['*']};
firstTime = false;
rest.auth.requestToken(tokenParams, null, function(err, tokenDetails) {
if(err) {
Expand All @@ -685,30 +688,32 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) {
});
};

realtime = helper.AblyRealtime(mixin(realtimeOpts, { authCallback: authCallback, clientId: clientId }));
realtime = helper.AblyRealtime(mixin(realtimeOpts, { authCallback: authCallback }));
realtime.connection.once('connected', function(){
/* soon after connected, reauth */
setTimeout(function(){
test.ok(true, 'Verify connection connected');
var channel = realtime.channels.get('right');
channel.attach(function(err) {
test.ok(err, 'Check using first token, without channel attach capability');
test.equal(err.code, 40160, 'Check expected error code');

/* soon after connected, reauth */
realtime.auth.authorise(null, {force: true}, function(err) {
test.ok(!err, err && displayError(err));
});
}, 200);
test.ok(true, 'Verify connection connected');

/* statechanges due to the reauth */
realtime.connection.once('disconnected', function(stateChange){
test.ok(true, 'Verify connection disconnected');
test.equal(stateChange.reason.code, 80003, 'Verify disconnect was client-initiated, not server-initiated (ie 40142)');
realtime.connection.once('connected', function(){
test.ok(true, 'Verify connection reconnected');
/* after another 5s (after would be disconnected under the original token), we're done */
setTimeout(function(){
closeAndFinish(test, realtime);
}, 5000);
realtime.connection.once('disconnected', function(){
test.ok(false, 'should not be disconnected a second time, as new token should last till test ends');
/* statechanges due to the reauth */
realtime.connection.once('disconnected', function(stateChange){
test.ok(true, 'Verify connection disconnected');
test.equal(stateChange.reason.code, 80003, 'Verify disconnect was client-initiated, not server-initiated (ie 40142)');
realtime.connection.once('connected', function(){
test.ok(true, 'Verify connection reconnected');

channel.attach(function(err) {
test.ok(!err, 'Check using second token, with channel attach capability');
closeAndFinish(test, realtime);
});
});
});
})
});
});

Expand Down
37 changes: 15 additions & 22 deletions spec/realtime/message.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -250,39 +250,29 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) {

exports.publishVariations = function(test) {
var testData = 'Some data';
var errorCallback = function(err){
if(err) {
test.ok(false, 'Error received by publish callback ' + displayError(err));
closeAndFinish(test, realtime);
return;
}
};
var testArguments = [
[{name: 'objectWithName'}],
[{name: 'objectWithNameAndCallback'}, errorCallback],
[{name: 'objectWithNameAndNullData', data: null}],
[{name: 'objectWithNameAndUndefinedData', data: undefined}],
[{name: 'objectWithNameAndEmptyStringData', data: ''}],
['nameAndNullData', null],
['nameAndUndefinedData', undefined],
['nameAndEmptyStringData', ''],
['nameAndData', testData],
['nameAndDataAndCallback', testData, errorCallback],
[{name: 'objectWithNameAndData', data: testData}],
[{name: 'objectWithNameAndDataAndCallback', data: testData}, errorCallback],
// 6 messages with null name,
// 5 messages with null name,
[null, testData],
[null, testData, errorCallback],
[{name: null, data: testData}],
[null, null],
[{name: null}],
[{name: null, data: null}]
];
var realtime;

test.expect(testArguments.length * 2);
try {
/* set up realtime */
var realtime = helper.AblyRealtime();
realtime = helper.AblyRealtime();
var rest = helper.AblyRest();

/* connect and attach */
Expand Down Expand Up @@ -338,21 +328,24 @@ define(['ably', 'shared_helper', 'async'], function(Ably, helper, async) {
}

if (messagesReceived == testArguments.length) {
closeAndFinish(test, realtime);
setTimeout(function() {
closeAndFinish(test, realtime);
}, 2000);
}
});

/* publish events */
var restChannel = rest.channels.get('publishVariations');
async.eachSeries(testArguments, function iterator(item, callback) {
try {
restChannel.publish(item, function(err) {
callback(err);
});
} catch (e) {
test.ok(false, "Failed to publish");
async.eachSeries(testArguments, function iterator(args, callback) {
args.push(callback);
restChannel.publish.apply(restChannel, args);
}, function(err) {
if(err) {
test.ok(false, 'Error received by publish callback ' + displayError(err));
closeAndFinish(test, realtime);
return;
}
}, function() {});
});
});
});
monitorConnection(test, realtime);
Expand Down