Skip to content

Commit

Permalink
storage: download integrity check
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Nov 24, 2014
1 parent 7b4e843 commit b4f3d8d
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 42 deletions.
79 changes: 58 additions & 21 deletions lib/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var crypto = require('crypto');
var duplexify = require('duplexify');
var request = require('request');
var streamEvents = require('stream-events');
var through = require('through2');

/**
* @type module:common/util
Expand Down Expand Up @@ -172,7 +173,6 @@ File.prototype.copy = function(destination, callback) {
});
};


/**
* Create a readable stream to read the contents of the remote file. It can be
* piped to a writable stream or listened to for 'data' events to read a file's
Expand All @@ -194,34 +194,71 @@ File.prototype.copy = function(destination, callback) {
* .on('error', function(err) {});
*/
File.prototype.createReadStream = function() {
var storage = this.bucket.storage;
var dup = duplexify();
var that = this;
var throughStream = through();

this.getMetadata(function(err, metadata) {
if (err) {
throughStream.emit('error', err);
throughStream.end();
return;
}

createAuthorizedReq(metadata.mediaLink);
});

return throughStream;

// Authenticate the request, then pipe the remote API request to the stream
// returned to the user.
function createAuthorizedReq(uri) {
var reqOpts = { uri: uri };
storage.makeAuthorizedRequest_(reqOpts, {
var reqOpts = {
uri: uri
};

that.bucket.storage.makeAuthorizedRequest_(reqOpts, {
onAuthorized: function(err, authorizedReqOpts) {
if (err) {
dup.emit('error', err);
dup.end();
throughStream.emit('error', err);
throughStream.end();
return;
}
dup.setReadable(request(authorizedReqOpts));
}
});
}
if (this.metadata.mediaLink) {
createAuthorizedReq(this.metadata.mediaLink);
} else {
this.getMetadata(function(err, metadata) {
if (err) {
dup.emit('error', err);
dup.end();
return;

// For data integrity, hash the contents of the stream as we receive it
// from the server.
var localMd5Hash = crypto.createHash('md5');

request(authorizedReqOpts)
.on('error', function(err) {
throughStream.emit('error', err);
})

.on('data', function(chunk) {
localMd5Hash.update(chunk);
})

.on('complete', function() {
localMd5Hash = localMd5Hash.digest('base64');

if (that.metadata.md5Hash === localMd5Hash) {
throughStream.emit('complete');
} else {
var error = new Error([
'The downloaded data did not match the data from the server. ',
'To be sure the content is the same, you should download the',
'file again.'
].join(''));

throughStream.emit('error', error);
}

throughStream.end();
})

.pipe(throughStream);
}
createAuthorizedReq(metadata.mediaLink);
});
}
return dup;
};

/**
Expand Down
13 changes: 9 additions & 4 deletions regression/storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ describe('storage', function() {
});
});

it('should read/write from/to a file in a directory', function(done) {
it.only('should read/write from/to a file in a directory', function(done) {
var file = bucket.file('directory/file');
var contents = 'test';

Expand All @@ -152,12 +152,17 @@ describe('storage', function() {

writeStream.on('error', done);
writeStream.on('complete', function() {
var data = new Buffer('');

file.createReadStream()
.on('error', done)
.on('data', function(chunk) {
assert.equal(String(chunk), contents);
data = Buffer.concat([data, chunk]);
})
.on('error', done)
.on('end', done);
.on('complete', function() {
assert.equal(data.toString(), contents);
done();
});
});
});

Expand Down
36 changes: 19 additions & 17 deletions test/storage/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var duplexify = require('duplexify');
var extend = require('extend');
var nodeutil = require('util');
var request = require('request');
var stream = require('stream');
var url = require('url');
var util = require('../../lib/common/util');

Expand Down Expand Up @@ -243,13 +244,15 @@ describe('File', function() {
});

it('should create an authorized request', function(done) {
request_Override = function(opts) {
file.bucket.storage.makeAuthorizedRequest_ = function(opts) {
assert.equal(opts.uri, metadata.mediaLink);
done();
};

file.getMetadata = function(callback) {
callback(null, metadata);
};

file.createReadStream();
});

Expand All @@ -272,33 +275,32 @@ describe('File', function() {

it('should get readable stream from request', function(done) {
var fakeRequest = { a: 'b', c: 'd' };
file.getMetadata = function(callback) {
callback(null, metadata);
};

// Faking a stream implementation so we can simulate an actual Request
// request. The only thing we want to know is if the data passed to
// request was correct.
request_Override = function(req) {
if (!(this instanceof request_Override)) {
return new request_Override(req);
}

stream.Readable.call(this);
this._read = util.noop;

assert.deepEqual(req, fakeRequest);
done();
};
file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) {
(callback.onAuthorized || callback)(null, fakeRequest);
};
file.createReadStream();
});
nodeutil.inherits(request_Override, stream.Readable);

it('should set readable stream', function() {
var dup = duplexify();
file.getMetadata = function(callback) {
callback(null, metadata);
};
request_Override = function() {
return dup;
};

file.bucket.storage.makeAuthorizedRequest_ = function(opts, callback) {
(callback.onAuthorized || callback)();
(callback.onAuthorized || callback)(null, fakeRequest);
};

file.createReadStream();
assert.deepEqual(readableStream, dup);
readableStream = null;
});
});

Expand Down

0 comments on commit b4f3d8d

Please sign in to comment.