Skip to content

Commit

Permalink
storage: implement channels
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Nov 25, 2015
1 parent 90a0f09 commit cabee48
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 3 deletions.
62 changes: 59 additions & 3 deletions lib/storage/bucket.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ var RESUMABLE_THRESHOLD = 5000000;
* @constructor
* @alias module:storage/bucket
*
* @throws {Error} if a bucket name isn't provided.
*
* @param {object} options - Configuration options.
* @param {string} options.bucketName - Name of the bucket.
* @param {string=} options.keyFilename - Full path to the JSON key downloaded
Expand All @@ -94,7 +92,6 @@ var RESUMABLE_THRESHOLD = 5000000;
*
* var bucket = gcs.bucket('albums');
*/

function Bucket(storage, name) {
var methods = {
/**
Expand Down Expand Up @@ -429,6 +426,65 @@ Bucket.prototype.combine = function(sources, destination, callback) {
}
};

/**
* Create a channel that will be notified when objects in this bucket changes.
*
* @throws {Error} If an ID is not provided.
* @throws {Error} If an address is not provided.
*
* @resource [Objects: watchAll API Documentation]{@link https://cloud.google.com/storage/docs/json_api/v1/objects/watchAll}
*
* @param {string} id - The ID of the channel to create.
* @param {object} config - See a
* [Objects: watchAll request body](https://cloud.google.com/storage/docs/json_api/v1/objects/watchAll).
* @param {string} config.address - The address where notifications are
* delivered for this channel.
* @param {function} callback - The callback function.
* @param {?error} callback.err - An error returned while making this request
* @param {module:storage/channel} callback.channel - The created Channel
* object.
* @param {object} callback.apiResponse - The full API response.
*
* @example
* bucket.createChannel(function(err, channel, apiResponse) {
* if (!err) {
* // Channel created successfully.
* }
* });
*/
Bucket.prototype.createChannel = function(id, config, callback) {
var self = this;

if (!is.string(id)) {
throw new Error('An ID is required to create a channel.');
}

if (!is.string(config.address)) {
throw new Error('An address is required to create a channel.');
}

this.request({
method: 'POST',
uri: '/o/watch',
json: extend({
id: id,
type: 'web_hook'
}, config)
}, function(err, apiResponse) {
if (err) {
callback(err, null, apiResponse);
return;
}

var resourceId = apiResponse.resourceId;
var channel = self.storage.channel(id, resourceId);

channel.metadata = apiResponse;

callback(null, channel, apiResponse);
});
};

/**
* Iterate over the bucket's files, calling `file.delete()` on each.
*
Expand Down
106 changes: 106 additions & 0 deletions lib/storage/channel.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*!
* Copyright 2015 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*!
* @module storage/channel
*/

'use strict';

var nodeutil = require('util');

/**
* @type {module:common/serviceObject}
* @private
*/
var ServiceObject = require('../common/service-object.js');

/**
* @type {module:common/util}
* @private
*/
var util = require('../common/util.js');

/*! Developer Documenation
*
* @param {module:storage} storage - The Storage instance.
*/
/**
* Create a channel object to interact with a Google Cloud Storage channel.
*
* @resource [Object Change Notification]{@link https://cloud.google.com/storage/docs/object-change-notification}
*
* @constructor
* @alias module:storage/bucket
*
* @param {string} id - The ID of the channel.
* @param {string} resourceId - The resource ID of the channel.
*
* @example
* var gcloud = require('gcloud');
*
* var gcs = gcloud.storage({
* keyFilename: '/path/to/keyfile.json',
* projectId: 'grape-spaceship-123'
* });
*
* var channel = gcs.channel('id', 'resource-id');
*/
function Channel(storage, id, resourceId) {
var config = {
parent: storage,
baseUrl: '/channels',
id: id,
methods: {
// Only need `request`.
}
};

ServiceObject.call(this, config);

this.metadata.id = id;
this.metadata.resourceId = resourceId;
}

nodeutil.inherits(Channel, ServiceObject);

/**
* Stop this channel.
*
* @param {function} callback - The callback function.
* @param {?error} callback.err - An error returned while making this request.
* @param {object} callback.apiResponse - The full API response.
*
* @example
* channel.stop(function(err, apiResponse) {
* if (!err) {
* // Channel stopped successfully.
* }
* });
*/
Channel.prototype.stop = function(callback) {
callback = callback || util.noop;

this.request({
method: 'POST',
uri: '/stop',
json: this.metadata
}, function(err, apiResponse) {
callback(err, apiResponse);
});
};

module.exports = Channel;
20 changes: 20 additions & 0 deletions lib/storage/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ var nodeutil = require('util');
*/
var Bucket = require('./bucket.js');

/**
* @type {module:storage/channel}
* @private
*/
var Channel = require('./channel.js');

/**
* @type {module:common/service}
* @private
Expand Down Expand Up @@ -177,6 +183,20 @@ Storage.prototype.bucket = function(name) {
return new Bucket(this, name);
};

/**
* Reference a channel to receive notifications about changes to your bucket.
*
* @param {string} id - The ID of the channel.
* @param {string} resourceId - The resource ID of the channel.
* @return {module:storage/channel}
*
* @example
* var channel = gcs.channel('id', 'resource-id');
*/
Storage.prototype.channel = function(id, resourceId) {
return new Channel(this, id, resourceId);
};

/**
* Create a bucket.
*
Expand Down
19 changes: 19 additions & 0 deletions system-test/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,25 @@ describe('storage', function() {
});
});

describe('channels', function() {
it('should create a channel', function(done) {
var config = {
address: 'https://yahoo.com'
};

bucket.createChannel('new-channel', config, function(err) {
// Actually creating a channel is pretty complicated. This will at least
// let us know we hit the right endpoint and it received "yahoo.com".
assert.strictEqual(
err.message,
'Unauthorized WebHook callback channel: ' + config.address
);

done();
});
});
});

describe('combine files', function() {
it('should combine multiple files into one', function(done) {
var files = [
Expand Down
98 changes: 98 additions & 0 deletions test/storage/bucket.js
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,104 @@ describe('Bucket', function() {
});
});

describe('createChannel', function() {
var ID = 'id';
var CONFIG = {
address: 'https://...'
};

it('should throw if an ID is not provided', function() {
assert.throws(function() {
bucket.createChannel();
}, 'An ID is required to create a channel.');
});

it('should throw if an address is not provided', function() {
assert.throws(function() {
bucket.createChannel(ID, {});
}, 'An address is required to create a channel.');
});

it('should make the correct request', function(done) {
var config = extend({}, CONFIG, {
a: 'b',
c: 'd'
});
var originalConfig = extend({}, config);

bucket.request = function(reqOpts) {
assert.strictEqual(reqOpts.method, 'POST');
assert.strictEqual(reqOpts.uri, '/o/watch');

var expectedJson = extend({}, config, {
id: ID,
type: 'web_hook'
});
assert.deepEqual(reqOpts.json, expectedJson);
assert.deepEqual(config, originalConfig);

done();
};

bucket.createChannel(ID, config, assert.ifError);
});

describe('error', function() {
var error = new Error('Error.');
var apiResponse = {};

beforeEach(function() {
bucket.request = function(reqOpts, callback) {
callback(error, apiResponse);
};
});

it('should execute callback with error & API response', function(done) {
bucket.createChannel(ID, CONFIG, function(err, channel, apiResponse_) {
assert.strictEqual(err, error);
assert.strictEqual(channel, null);
assert.strictEqual(apiResponse_, apiResponse);

done();
});
});
});

describe('success', function() {
var apiResponse = {
resourceId: 'resource-id'
};

beforeEach(function() {
bucket.request = function(reqOpts, callback) {
callback(null, apiResponse);
};
});

it('should exec a callback with Channel & API response', function(done) {
var channel = {};

bucket.storage.channel = function(id, resourceId) {
assert.strictEqual(id, ID);
assert.strictEqual(resourceId, apiResponse.resourceId);

return channel;
};

bucket.createChannel(ID, CONFIG, function(err, channel_, apiResponse_) {
assert.ifError(err);

assert.strictEqual(channel_, channel);
assert.strictEqual(channel_.metadata, apiResponse);

assert.strictEqual(apiResponse_, apiResponse);

done();
});
});
});
});

describe('deleteFiles', function() {
it('should get files from the bucket', function(done) {
var query = { a: 'b', c: 'd' };
Expand Down
Loading

0 comments on commit cabee48

Please sign in to comment.