From 7dffcea79b6058930aceb596b0389b1ea2d23442 Mon Sep 17 00:00:00 2001 From: Stephen Sawchuk Date: Wed, 11 Nov 2015 16:07:10 -0500 Subject: [PATCH] compute: turn operations into event emitters --- lib/compute/operation.js | 141 +++++++++++--------- system-test/compute.js | 87 ++++++++---- test/compute/operation.js | 273 +++++++++++++++++++++++++------------- 3 files changed, 325 insertions(+), 176 deletions(-) diff --git a/lib/compute/operation.js b/lib/compute/operation.js index 6c143834c1f..a594c92af3f 100644 --- a/lib/compute/operation.js +++ b/lib/compute/operation.js @@ -20,9 +20,8 @@ 'use strict'; -var extend = require('extend'); -var is = require('is'); -var nodeutil = require('util'); +var events = require('events'); +var modelo = require('modelo'); /** * @type {module:common/serviceObject} @@ -79,6 +78,30 @@ var util = require('../common/util.js'); * //- * var zone = gce.zone('us-central1-a'); * var operation = zone.operation('operation-id'); + * + * //- + * // All operations are event emitters. The status of each operation is polled + * // continuously, starting only after you register a "complete" listener. + * //- + * operation.on('complete', function(metadata) { + * // The operation is complete. + * }); + * + * //- + * // Be sure to register an error handler as well to catch any issues which + * // impeded the operation. + * //- + * operation.on('error', function(err) { + * // An error occurred during the operation. + * }); + * + * //- + * // To force the Operation object to stop polling for updates, simply remove + * // any "complete" listeners you've registered. + * // + * // The easiest way to do this is with `removeAllListeners()`. + * //- + * operation.removeAllListeners(); */ function Operation(scope, name) { var isCompute = scope.constructor.name === 'Compute'; @@ -132,10 +155,16 @@ function Operation(scope, name) { methods: methods }); + events.EventEmitter.call(this); + + this.completeListeners = 0; + this.hasActiveListeners = false; this.name = name; + + this.listenForEvents_(); } -nodeutil.inherits(Operation, ServiceObject); +modelo.inherits(Operation, ServiceObject, events.EventEmitter); /** * Get the operation's metadata. For a detailed description of metadata see @@ -167,9 +196,9 @@ Operation.prototype.getMetadata = function(callback) { // this callback. We have to make sure this isn't a false error by seeing if // the response body contains a property that wouldn't exist on a failed API // request (`name`). - var isActualError = err && (!apiResponse || apiResponse.name !== self.name); + var requestFailed = err && (!apiResponse || apiResponse.name !== self.name); - if (isActualError) { + if (requestFailed) { callback(err, null, apiResponse); return; } @@ -181,80 +210,70 @@ Operation.prototype.getMetadata = function(callback) { }; /** - * Register a callback for when the operation is complete. + * Begin listening for events on the operation. This method keeps track of how + * many "complete" listeners are registered and removed, making sure polling is + * handled automatically. * - * If the operation doesn't complete after the maximum number of attempts have - * been made (see `options.maxAttempts` and `options.interval`), an error will - * be provided to your callback with code: `OPERATION_INCOMPLETE`. + * As long as there is one active "complete" listener, the connection is open. + * When there are no more listeners, the polling stops. * - * @param {object=} options - Configuration object. - * @param {number} options.maxAttempts - Maximum number of attempts to make an - * API request to check if the operation is complete. (Default: `10`) - * @param {number} options.interval - Amount of time in milliseconds between - * each request. (Default: `3000`) - * @param {function} callback - The callback function. - * @param {?error} callback.err - An error returned while making this request. - * @param {object} callback.metadata - The operation's metadata. - * - * @example - * operation.onComplete(function(err, metadata) { - * if (err.code === 'OPERATION_INCOMPLETE') { - * // The operation is not complete yet. You may want to register another - * // `onComplete` listener or queue for later. - * } - * - * if (!err) { - * // Operation complete! - * } - * }); + * @private */ -Operation.prototype.onComplete = function(options, callback) { +Operation.prototype.listenForEvents_ = function() { var self = this; - if (is.fn(options)) { - callback = options; - options = {}; - } - - options = extend({ - maxAttempts: 10, - interval: 3000 - }, options); - - var didNotCompleteError = new Error('Operation did not complete.'); - didNotCompleteError.code = 'OPERATION_INCOMPLETE'; - - var numAttempts = 0; + this.on('newListener', function(event) { + if (event === 'complete') { + self.completeListeners++; - function checkMetadata() { - numAttempts++; + if (!self.hasActiveListeners) { + self.hasActiveListeners = true; + self.startPolling_(); + } + } + }); - if (numAttempts > options.maxAttempts) { - callback(didNotCompleteError, self.metadata); - return; + this.on('removeListener', function(event) { + if (event === 'complete' && --self.completeListeners === 0) { + self.hasActiveListeners = false; } + }); +}; - setTimeout(function() { - self.getMetadata(onMetadata); - }, options.interval); +/** + * Poll `getMetadata` to check the operation's status. This runs a loop to ping + * the API on an interval. + * + * Note: This method is automatically called once a "complete" event handler is + * registered on the operation. + * + * @private + */ +Operation.prototype.startPolling_ = function() { + var self = this; + + if (!this.hasActiveListeners) { + return; } - function onMetadata(err, metadata) { + this.getMetadata(function(err, metadata, apiResponse) { + // Parsing the response body will automatically create an ApiError object if + // the operation failed. + var parsedHttpRespBody = util.parseHttpRespBody(apiResponse); + err = err || parsedHttpRespBody.err; + if (err) { - callback(err, metadata); + self.emit('error', err); return; } if (metadata.status !== 'DONE') { - checkMetadata(); + setTimeout(self.startPolling_.bind(self), 500); return; } - // The operation is complete. - callback(null, metadata); - } - - checkMetadata(); + self.emit('complete', metadata); + }); }; module.exports = Operation; diff --git a/system-test/compute.js b/system-test/compute.js index 45196a28d75..ddcba9e9c26 100644 --- a/system-test/compute.js +++ b/system-test/compute.js @@ -60,7 +60,12 @@ describe('Compute', function() { before(function(done) { address.create(function(err, disk, operation) { assert.ifError(err); - operation.onComplete(done); + + operation + .on('error', done) + .on('complete', function() { + done(); + }); }); }); @@ -110,7 +115,12 @@ describe('Compute', function() { disk.create(config, function(err, disk, operation) { assert.ifError(err); - operation.onComplete(done); + + operation + .on('error', done) + .on('complete', function() { + done(); + }); }); }); @@ -154,7 +164,12 @@ describe('Compute', function() { disk.snapshot(generateName()).create(function(err, snapshot, operation) { assert.ifError(err); - operation.onComplete(getOperationOptions(MAX_TIME_ALLOWED), done); + + operation + .on('error', done) + .on('complete', function() { + done(); + }); }); }); }); @@ -192,7 +207,12 @@ describe('Compute', function() { firewall.create(CONFIG, function(err, firewall, operation) { assert.ifError(err); - operation.onComplete(getOperationOptions(MAX_TIME_ALLOWED), done); + + operation + .on('error', done) + .on('complete', function() { + done(); + }); }); }); @@ -239,7 +259,12 @@ describe('Compute', function() { before(function(done) { network.create(CONFIG, function(err, network, operation) { assert.ifError(err); - operation.onComplete(done); + + operation + .on('error', done) + .on('complete', function() { + done(); + }); }); }); @@ -402,7 +427,12 @@ describe('Compute', function() { vm.create(config, function(err, vm, operation) { assert.ifError(err); - operation.onComplete(done); + + operation + .on('error', done) + .on('complete', function() { + done(); + }); }); }); @@ -420,7 +450,11 @@ describe('Compute', function() { return; } - operation.onComplete(getOperationOptions(MAX_TIME_ALLOWED), done); + operation + .on('error', done) + .on('complete', function() { + done(); + }); }); }); @@ -460,6 +494,7 @@ describe('Compute', function() { it('should attach and detach a disk', function(done) { var name = generateName(); + var disk = zone.disk(name); // This test waits on a lot of operations. this.timeout(90000); @@ -468,22 +503,29 @@ describe('Compute', function() { createDisk, attachDisk, detachDisk - ], done); + ], function(err) { + if (err) { + done(err); + return; + } + + disk.delete(execAfterOperationComplete(done)); + }); function createDisk(callback) { var config = { os: 'ubuntu' }; - zone.createDisk(name, config, execAfterOperationComplete(callback)); + disk.create(config, execAfterOperationComplete(callback)); } function attachDisk(callback) { - vm.attachDisk(zone.disk(name), execAfterOperationComplete(callback)); + vm.attachDisk(disk, execAfterOperationComplete(callback)); } function detachDisk(callback) { - vm.detachDisk(zone.disk(name), execAfterOperationComplete(callback)); + vm.detachDisk(disk, execAfterOperationComplete(callback)); } }); @@ -523,8 +565,7 @@ describe('Compute', function() { var MAX_TIME_ALLOWED = 90000 * 2; this.timeout(MAX_TIME_ALLOWED); - var options = getOperationOptions(MAX_TIME_ALLOWED); - vm.stop(execAfterOperationComplete(options, done)); + vm.stop(execAfterOperationComplete(done)); }); }); @@ -624,18 +665,7 @@ describe('Compute', function() { }); } - function getOperationOptions(maxTimeAllowed) { - var interval = 10000; - - return { - maxAttempts: maxTimeAllowed / interval, - interval: interval - }; - } - - function execAfterOperationComplete(options, callback) { - callback = callback || options; - + function execAfterOperationComplete(callback) { return function(err) { if (err) { callback(err); @@ -643,7 +673,12 @@ describe('Compute', function() { } var operation = arguments[arguments.length - 2]; // [..., op, apiResponse] - operation.onComplete(options || {}, callback); + + operation + .on('error', callback) + .on('complete', function() { + callback(); + }); }; } }); diff --git a/test/compute/operation.js b/test/compute/operation.js index 02312256349..1bcf5e6f155 100644 --- a/test/compute/operation.js +++ b/test/compute/operation.js @@ -17,6 +17,7 @@ 'use strict'; var assert = require('assert'); +var extend = require('extend'); var mockery = require('mockery'); var nodeutil = require('util'); @@ -30,6 +31,17 @@ function FakeServiceObject() { nodeutil.inherits(FakeServiceObject, ServiceObject); +var parseHttpRespBodyOverride = null; +var fakeUtil = extend({}, util, { + parseHttpRespBody: function() { + if (parseHttpRespBodyOverride) { + return parseHttpRespBodyOverride.apply(null, arguments); + } else { + return util.parseHttpRespBody.apply(this, arguments); + } + } +}); + describe('Operation', function() { var Operation; var operation; @@ -39,6 +51,7 @@ describe('Operation', function() { before(function() { mockery.registerMock('../common/service-object.js', FakeServiceObject); + mockery.registerMock('../common/util.js', fakeUtil); mockery.enable({ useCleanCache: true, warnOnUnregistered: false @@ -53,16 +66,20 @@ describe('Operation', function() { }); beforeEach(function() { + parseHttpRespBodyOverride = null; operation = new Operation(SCOPE, OPERATION_NAME); }); + afterEach(function() { + operation.removeAllListeners(); + }); + describe('instantiation', function() { it('should localize the name', function() { assert.strictEqual(operation.name, OPERATION_NAME); }); it('should inherit from ServiceObject', function() { - var operation = new Operation(SCOPE, OPERATION_NAME); assert(operation instanceof ServiceObject); var calledWith = operation.calledWith_[0]; @@ -87,6 +104,11 @@ describe('Operation', function() { var calledWith = operation.calledWith_[0]; assert.strictEqual(calledWith.baseUrl, '/global/operations'); }); + + it('should correctly initialize variables', function() { + assert.strictEqual(operation.completeListeners, 0); + assert.strictEqual(operation.hasActiveListeners, false); + }); }); describe('getMetadata', function() { @@ -179,136 +201,209 @@ describe('Operation', function() { }); }); - describe('onComplete', function() { - // Set interval to 0 so our tests don't waste time. - var OPTIONS = { interval: 0 }; + describe('listenForEvents_', function() { + beforeEach(function() { + operation.startPolling_ = util.noop; + }); - var error = new Error('Error.'); - var apiResponse = { a: 'b', c: 'd' }; - var apiResponseWithIncompleteStatus = { status: 'INCOMPLETE' }; - var apiResponseWithCompleteStatus = { status: 'DONE' }; + it('should start polling when complete listener is bound', function(done) { + operation.startPolling_ = function() { + done(); + }; - function getMetadataRespondsWithError(callback) { - callback(error, apiResponse); - } + operation.on('complete', util.noop); + }); - function getMetadataRespondsWithIncompleteStatus(callback) { - callback(null, apiResponseWithIncompleteStatus); - } + it('should track the number of listeners', function() { + assert.strictEqual(operation.completeListeners, 0); - function getMetadataRespondsWithCompleteStatus(callback) { - callback(null, apiResponseWithCompleteStatus); - } + operation.on('complete', util.noop); + assert.strictEqual(operation.completeListeners, 1); - describe('options.maxAttempts', function() { - it('should default to 10', function(done) { - var numAttemptsMade = 0; + operation.removeListener('complete', util.noop); + assert.strictEqual(operation.completeListeners, 0); + }); - operation.getMetadata = function(callback) { - numAttemptsMade++; - getMetadataRespondsWithIncompleteStatus(callback); - }; + it('should only run a single pulling loop', function() { + var startPollingCallCount = 0; - operation.onComplete(OPTIONS, function() { - assert.strictEqual(numAttemptsMade, 10); - done(); - }); - }); + operation.startPolling_ = function() { + startPollingCallCount++; + }; + + operation.on('complete', util.noop); + operation.on('complete', util.noop); + + assert.strictEqual(startPollingCallCount, 1); + }); + + it('should close when no more message listeners are bound', function() { + operation.on('complete', util.noop); + operation.on('complete', util.noop); + assert.strictEqual(operation.hasActiveListeners, true); + + operation.removeListener('complete', util.noop); + assert.strictEqual(operation.hasActiveListeners, true); + + operation.removeListener('complete', util.noop); + assert.strictEqual(operation.hasActiveListeners, false); + }); + }); + + describe('startPolling_', function() { + var listenForEvents_; + var operation; + + before(function() { + listenForEvents_ = Operation.prototype.listenForEvents_; + }); + + after(function() { + Operation.prototype.listenForEvents_ = listenForEvents_; + }); + + beforeEach(function() { + Operation.prototype.listenForEvents_ = util.noop; + operation = new Operation(SCOPE, OPERATION_NAME); + operation.hasActiveListeners = true; + }); + + afterEach(function() { + operation.hasActiveListeners = false; + }); + + it('should not call getMetadata if no listeners', function(done) { + operation.hasActiveListeners = false; + + operation.getMetadata = done; // if called, test will fail. + + operation.startPolling_(); + done(); + }); + + it('should call getMetadata if listeners are registered', function(done) { + operation.hasActiveListeners = true; + + operation.getMetadata = function() { + done(); + }; - it('should allow overriding', function(done) { - var options = { maxAttempts: 3, interval: 0 }; - var numAttemptsMade = 0; + operation.startPolling_(); + }); + + describe('API error', function() { + var error = new Error('Error.'); + beforeEach(function() { operation.getMetadata = function(callback) { - numAttemptsMade++; - getMetadataRespondsWithIncompleteStatus(callback); + callback(error); }; + }); - operation.onComplete(options, function() { - assert.strictEqual(numAttemptsMade, options.maxAttempts); + it('should emit the error', function(done) { + operation.on('error', function(err) { + assert.strictEqual(err, error); done(); }); + + operation.startPolling_(); }); }); - describe('options.interval', function() { - it('should default to 3000ms', function(done) { - this.timeout(3100); + describe('operation failure', function() { + var error = new Error('Error.'); + var apiResponse = { error: error }; - operation.getMetadata = getMetadataRespondsWithIncompleteStatus; + beforeEach(function() { + operation.getMetadata = function(callback) { + callback(null, apiResponse, apiResponse); + }; + }); - var started = Date.now(); - operation.onComplete({ maxAttempts: 1 }, function() { - var ended = Date.now(); + it('should parse the response body', function(done) { + parseHttpRespBodyOverride = function(body) { + assert.strictEqual(body, apiResponse); + setImmediate(done); + return {}; + }; - assert(ended - started > 2900 && ended - started < 3100); - done(); - }); + operation.startPolling_(); }); - it('should allow overriding', function(done) { - operation.getMetadata = getMetadataRespondsWithIncompleteStatus; + it('should detect and emit the error', function(done) { + parseHttpRespBodyOverride = function(body) { + assert.strictEqual(body, apiResponse); - var started = Date.now(); - operation.onComplete({ maxAttempts: 1, interval: 1000 }, function() { - var ended = Date.now(); + return { + err: error + }; + }; - assert(ended - started > 900 && ended - started < 1100); + operation.on('error', function(err) { + assert.strictEqual(err, error); done(); }); + + operation.startPolling_(); }); }); - it('should put the interval on the leading side', function(done) { - // (It should wait interval before making first request) - var started = Date.now(); - operation.getMetadata = function() { - var ended = Date.now(); + describe('operation pending', function() { + var apiResponse = { status: 'PENDING' }; + var setTimeoutCached = global.setTimeout; - assert(ended - started > 900 && ended - started < 1100); - done(); - }; + beforeEach(function() { + operation.getMetadata = function(callback) { + callback(null, apiResponse, apiResponse); + }; + }); - operation.onComplete({ maxAttempts: 1, interval: 1000 }, util.noop); - }); + after(function() { + global.setTimeout = setTimeoutCached; + }); - it('should return an error if maxAttempts is exceeded', function(done) { - var options = { maxAttempts: 1, interval: 0 }; + it('should call startPolling_ after 500 ms', function(done) { + var startPolling_ = operation.startPolling_; + var startPollingCalled = false; - operation.getMetadata = getMetadataRespondsWithIncompleteStatus; + global.setTimeout = function(fn, timeoutMs) { + fn(); // should call startPolling_ + assert.strictEqual(timeoutMs, 500); + }; - operation.onComplete(options, function(err, metadata) { - assert.strictEqual(err.code, 'OPERATION_INCOMPLETE'); - assert.strictEqual(err.message, 'Operation did not complete.'); + operation.startPolling_ = function() { + if (!startPollingCalled) { + // Call #1. + startPollingCalled = true; + startPolling_.apply(this, arguments); + return; + } - assert.strictEqual(metadata, operation.metadata); - done(); + // This is from the setTimeout call. + assert.strictEqual(this, operation); + done(); + }; + + operation.startPolling_(); }); }); - describe('getMetadata', function() { - describe('error', function() { - it('should execute callback with error & API response', function(done) { - operation.getMetadata = getMetadataRespondsWithError; + describe('operation complete', function() { + var apiResponse = { status: 'DONE' }; - operation.onComplete(OPTIONS, function(err, metadata) { - assert.strictEqual(err, error); - assert.strictEqual(metadata, apiResponse); - done(); - }); - }); + beforeEach(function() { + operation.getMetadata = function(callback) { + callback(null, apiResponse, apiResponse); + }; }); - describe('success', function() { - it('should exec callback with metadata when done', function(done) { - operation.getMetadata = getMetadataRespondsWithCompleteStatus; - - operation.onComplete(OPTIONS, function(err, metadata) { - assert.ifError(err); - assert.strictEqual(metadata, apiResponseWithCompleteStatus); - done(); - }); + it('should emit complete with metadata', function(done) { + operation.on('complete', function(metadata) { + assert.strictEqual(metadata, apiResponse); + done(); }); + + operation.startPolling_(); }); }); });