Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compute: turn operations into event emitters #939

Merged
merged 1 commit into from
Nov 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 80 additions & 61 deletions lib/compute/operation.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@

'use strict';

var extend = require('extend');
var is = require('is');
var nodeutil = require('util');
var events = require('events');
var modelo = require('modelo');

/**
* @type {module:common/serviceObject}
Expand Down Expand Up @@ -79,6 +78,30 @@ var util = require('../common/util.js');
* //-
* var zone = gce.zone('us-central1-a');
* var operation = zone.operation('operation-id');
*
* //-
* // All operations are event emitters. The status of each operation is polled
* // continuously, starting only after you register a "complete" listener.
* //-
* operation.on('complete', function(metadata) {
* // The operation is complete.
* });
*
* //-
* // Be sure to register an error handler as well to catch any issues which
* // impeded the operation.
* //-
* operation.on('error', function(err) {
* // An error occurred during the operation.
* });
*
* //-
* // To force the Operation object to stop polling for updates, simply remove
* // any "complete" listeners you've registered.
* //
* // The easiest way to do this is with `removeAllListeners()`.
* //-
* operation.removeAllListeners();
*/
function Operation(scope, name) {
var isCompute = scope.constructor.name === 'Compute';
Expand Down Expand Up @@ -132,10 +155,16 @@ function Operation(scope, name) {
methods: methods
});

events.EventEmitter.call(this);

this.completeListeners = 0;
this.hasActiveListeners = false;
this.name = name;

this.listenForEvents_();
}

nodeutil.inherits(Operation, ServiceObject);
modelo.inherits(Operation, ServiceObject, events.EventEmitter);

/**
* Get the operation's metadata. For a detailed description of metadata see
Expand Down Expand Up @@ -167,9 +196,9 @@ Operation.prototype.getMetadata = function(callback) {
// this callback. We have to make sure this isn't a false error by seeing if
// the response body contains a property that wouldn't exist on a failed API
// request (`name`).
var isActualError = err && (!apiResponse || apiResponse.name !== self.name);
var requestFailed = err && (!apiResponse || apiResponse.name !== self.name);

if (isActualError) {
if (requestFailed) {
callback(err, null, apiResponse);
return;
}
Expand All @@ -181,80 +210,70 @@ Operation.prototype.getMetadata = function(callback) {
};

/**
* Register a callback for when the operation is complete.
* Begin listening for events on the operation. This method keeps track of how
* many "complete" listeners are registered and removed, making sure polling is
* handled automatically.
*
* If the operation doesn't complete after the maximum number of attempts have
* been made (see `options.maxAttempts` and `options.interval`), an error will
* be provided to your callback with code: `OPERATION_INCOMPLETE`.
* As long as there is one active "complete" listener, the connection is open.
* When there are no more listeners, the polling stops.
*
* @param {object=} options - Configuration object.
* @param {number} options.maxAttempts - Maximum number of attempts to make an
* API request to check if the operation is complete. (Default: `10`)
* @param {number} options.interval - Amount of time in milliseconds between
* each request. (Default: `3000`)
* @param {function} callback - The callback function.
* @param {?error} callback.err - An error returned while making this request.
* @param {object} callback.metadata - The operation's metadata.
*
* @example
* operation.onComplete(function(err, metadata) {
* if (err.code === 'OPERATION_INCOMPLETE') {
* // The operation is not complete yet. You may want to register another
* // `onComplete` listener or queue for later.
* }
*
* if (!err) {
* // Operation complete!
* }
* });
* @private
*/
Operation.prototype.onComplete = function(options, callback) {
Operation.prototype.listenForEvents_ = function() {
var self = this;

if (is.fn(options)) {
callback = options;
options = {};
}

options = extend({
maxAttempts: 10,
interval: 3000
}, options);

var didNotCompleteError = new Error('Operation did not complete.');
didNotCompleteError.code = 'OPERATION_INCOMPLETE';

var numAttempts = 0;
this.on('newListener', function(event) {
if (event === 'complete') {
self.completeListeners++;

function checkMetadata() {
numAttempts++;
if (!self.hasActiveListeners) {
self.hasActiveListeners = true;
self.startPolling_();
}
}
});

if (numAttempts > options.maxAttempts) {
callback(didNotCompleteError, self.metadata);
return;
this.on('removeListener', function(event) {
if (event === 'complete' && --self.completeListeners === 0) {
self.hasActiveListeners = false;
}
});
};

setTimeout(function() {
self.getMetadata(onMetadata);
}, options.interval);
/**
* Poll `getMetadata` to check the operation's status. This runs a loop to ping
* the API on an interval.
*
* Note: This method is automatically called once a "complete" event handler is
* registered on the operation.
*
* @private
*/
Operation.prototype.startPolling_ = function() {
var self = this;

if (!this.hasActiveListeners) {
return;
}

function onMetadata(err, metadata) {
this.getMetadata(function(err, metadata, apiResponse) {
// Parsing the response body will automatically create an ApiError object if
// the operation failed.
var parsedHttpRespBody = util.parseHttpRespBody(apiResponse);
err = err || parsedHttpRespBody.err;

if (err) {
callback(err, metadata);
self.emit('error', err);
return;
}

if (metadata.status !== 'DONE') {
checkMetadata();
setTimeout(self.startPolling_.bind(self), 500);
return;
}

// The operation is complete.
callback(null, metadata);
}

checkMetadata();
self.emit('complete', metadata);
});
};

module.exports = Operation;
87 changes: 61 additions & 26 deletions system-test/compute.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ describe('Compute', function() {
before(function(done) {
address.create(function(err, disk, operation) {
assert.ifError(err);
operation.onComplete(done);

operation
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand Down Expand Up @@ -110,7 +115,12 @@ describe('Compute', function() {

disk.create(config, function(err, disk, operation) {
assert.ifError(err);
operation.onComplete(done);

operation
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand Down Expand Up @@ -154,7 +164,12 @@ describe('Compute', function() {

disk.snapshot(generateName()).create(function(err, snapshot, operation) {
assert.ifError(err);
operation.onComplete(getOperationOptions(MAX_TIME_ALLOWED), done);

operation
.on('error', done)
.on('complete', function() {
done();
});
});
});
});
Expand Down Expand Up @@ -192,7 +207,12 @@ describe('Compute', function() {

firewall.create(CONFIG, function(err, firewall, operation) {
assert.ifError(err);
operation.onComplete(getOperationOptions(MAX_TIME_ALLOWED), done);

operation
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand Down Expand Up @@ -239,7 +259,12 @@ describe('Compute', function() {
before(function(done) {
network.create(CONFIG, function(err, network, operation) {
assert.ifError(err);
operation.onComplete(done);

operation
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand Down Expand Up @@ -402,7 +427,12 @@ describe('Compute', function() {

vm.create(config, function(err, vm, operation) {
assert.ifError(err);
operation.onComplete(done);

operation
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand All @@ -420,7 +450,11 @@ describe('Compute', function() {
return;
}

operation.onComplete(getOperationOptions(MAX_TIME_ALLOWED), done);
operation
.on('error', done)
.on('complete', function() {
done();
});
});
});

Expand Down Expand Up @@ -460,6 +494,7 @@ describe('Compute', function() {

it('should attach and detach a disk', function(done) {
var name = generateName();
var disk = zone.disk(name);

// This test waits on a lot of operations.
this.timeout(90000);
Expand All @@ -468,22 +503,29 @@ describe('Compute', function() {
createDisk,
attachDisk,
detachDisk
], done);
], function(err) {
if (err) {
done(err);
return;
}

disk.delete(execAfterOperationComplete(done));
});

function createDisk(callback) {
var config = {
os: 'ubuntu'
};

zone.createDisk(name, config, execAfterOperationComplete(callback));
disk.create(config, execAfterOperationComplete(callback));
}

function attachDisk(callback) {
vm.attachDisk(zone.disk(name), execAfterOperationComplete(callback));
vm.attachDisk(disk, execAfterOperationComplete(callback));
}

function detachDisk(callback) {
vm.detachDisk(zone.disk(name), execAfterOperationComplete(callback));
vm.detachDisk(disk, execAfterOperationComplete(callback));
}
});

Expand Down Expand Up @@ -523,8 +565,7 @@ describe('Compute', function() {
var MAX_TIME_ALLOWED = 90000 * 2;
this.timeout(MAX_TIME_ALLOWED);

var options = getOperationOptions(MAX_TIME_ALLOWED);
vm.stop(execAfterOperationComplete(options, done));
vm.stop(execAfterOperationComplete(done));
});
});

Expand Down Expand Up @@ -624,26 +665,20 @@ describe('Compute', function() {
});
}

function getOperationOptions(maxTimeAllowed) {
var interval = 10000;

return {
maxAttempts: maxTimeAllowed / interval,
interval: interval
};
}

function execAfterOperationComplete(options, callback) {
callback = callback || options;

function execAfterOperationComplete(callback) {
return function(err) {
if (err) {
callback(err);
return;
}

var operation = arguments[arguments.length - 2]; // [..., op, apiResponse]
operation.onComplete(options || {}, callback);

operation
.on('error', callback)
.on('complete', function() {
callback();
});
};
}
});
Loading