Skip to content

Commit

Permalink
Merge pull request #5253 from Automattic/5145
Browse files Browse the repository at this point in the history
Aggregation cursor
  • Loading branch information
vkarpov15 authored May 18, 2017
2 parents f3805fa + 2373502 commit b8f0dd8
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 2 deletions.
8 changes: 6 additions & 2 deletions lib/aggregate.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
* Module dependencies
*/

var util = require('util');
var utils = require('./utils');
var AggregationCursor = require('./cursor/AggregationCursor');
var PromiseProvider = require('./promise_provider');
var Query = require('./query');
var util = require('util');
var utils = require('./utils');
var read = Query.prototype.read;

/**
Expand Down Expand Up @@ -631,6 +632,9 @@ Aggregate.prototype.exec = function(callback) {
callback && callback(null, cursor);
});
});
} else if (options.cursor.useMongooseAggCursor) {
delete options.cursor.useMongooseAggCursor;
return new AggregationCursor(this);
}
var cursor = this._model.collection.
aggregate(this._pipeline, this.options || {});
Expand Down
282 changes: 282 additions & 0 deletions lib/cursor/AggregationCursor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
/*!
* Module dependencies.
*/

var PromiseProvider = require('../promise_provider');
var Readable = require('stream').Readable;
var util = require('util');

/**
* An AggregationCursor is a concurrency primitive for processing aggregation
* results one document at a time. It is analogous to QueryCursor.
*
* An AggregationCursor fulfills the [Node.js streams3 API](https://strongloop.com/strongblog/whats-new-io-js-beta-streams3/),
* in addition to several other mechanisms for loading documents from MongoDB
* one at a time.
*
* Unless you're an advanced user, do **not** instantiate this class directly.
* Use [`Aggregate#cursor()`](/docs/api.html#aggregate_Aggregate-cursor) instead.
*
* @param {Aggregate} agg
* @param {Object} options
* @inherits Readable
* @event `cursor`: Emitted when the cursor is created
* @event `error`: Emitted when an error occurred
* @event `data`: Emitted when the stream is flowing and the next doc is ready
* @event `end`: Emitted when the stream is exhausted
* @api public
*/

function AggregationCursor(agg) {
Readable.call(this, { objectMode: true });

this.cursor = null;
this.agg = agg;
this._transforms = [];
var _this = this;
var model = agg._model;
model.collection.aggregate(agg._pipeline, agg.options || {}, function(err, cursor) {
if (_this._error) {
cursor.close(function() {});
_this.listeners('error').length > 0 && _this.emit('error', _this._error);
}
if (err) {
return _this.emit('error', err);
}
_this.cursor = cursor;
_this.emit('cursor', cursor);
});
}

util.inherits(AggregationCursor, Readable);

/*!
* Necessary to satisfy the Readable API
*/

AggregationCursor.prototype._read = function() {
var _this = this;
_next(this, function(error, doc) {
if (error) {
return _this.emit('error', error);
}
if (!doc) {
_this.push(null);
_this.cursor.close(function(error) {
if (error) {
return _this.emit('error', error);
}
setTimeout(function() {
_this.emit('close');
}, 0);
});
return;
}
_this.push(doc);
});
};

/**
* Registers a transform function which subsequently maps documents retrieved
* via the streams interface or `.next()`
*
* ####Example
*
* // Map documents returned by `data` events
* Thing.
* find({ name: /^hello/ }).
* cursor().
* map(function (doc) {
* doc.foo = "bar";
* return doc;
* })
* on('data', function(doc) { console.log(doc.foo); });
*
* // Or map documents returned by `.next()`
* var cursor = Thing.find({ name: /^hello/ }).
* cursor().
* map(function (doc) {
* doc.foo = "bar";
* return doc;
* });
* cursor.next(function(error, doc) {
* console.log(doc.foo);
* });
*
* @param {Function} fn
* @return {QueryCursor}
* @api public
* @method map
*/

AggregationCursor.prototype.map = function(fn) {
this._transforms.push(fn);
return this;
};

/*!
* Marks this cursor as errored
*/

AggregationCursor.prototype._markError = function(error) {
this._error = error;
return this;
};

/**
* Marks this cursor as closed. Will stop streaming and subsequent calls to
* `next()` will error.
*
* @param {Function} callback
* @return {Promise}
* @api public
* @method close
* @emits close
* @see MongoDB driver cursor#close http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html#close
*/

AggregationCursor.prototype.close = function(callback) {
var Promise = PromiseProvider.get();
var _this = this;
return new Promise.ES6(function(resolve, reject) {
_this.cursor.close(function(error) {
if (error) {
callback && callback(error);
reject(error);
return _this.listeners('error').length > 0 &&
_this.emit('error', error);
}
_this.emit('close');
resolve();
callback && callback();
});
});
};

/**
* Get the next document from this cursor. Will return `null` when there are
* no documents left.
*
* @param {Function} callback
* @return {Promise}
* @api public
* @method next
*/

AggregationCursor.prototype.next = function(callback) {
var Promise = PromiseProvider.get();
var _this = this;
return new Promise.ES6(function(resolve, reject) {
_next(_this, function(error, doc) {
if (error) {
callback && callback(error);
return reject(error);
}
callback && callback(null, doc);
resolve(doc);
});
});
};

/**
* Execute `fn` for every document in the cursor. If `fn` returns a promise,
* will wait for the promise to resolve before iterating on to the next one.
* Returns a promise that resolves when done.
*
* @param {Function} fn
* @param {Function} [callback] executed when all docs have been processed
* @return {Promise}
* @api public
* @method eachAsync
*/

AggregationCursor.prototype.eachAsync = function(fn, callback) {
var Promise = PromiseProvider.get();
var _this = this;

var handleNextResult = function(doc, callback) {
var promise = fn(doc);
if (promise && typeof promise.then === 'function') {
promise.then(
function() { callback(null); },
function(error) { callback(error); });
} else {
callback(null);
}
};

var iterate = function(callback) {
return _next(_this, function(error, doc) {
if (error) {
return callback(error);
}
if (!doc) {
return callback(null);
}
handleNextResult(doc, function(error) {
if (error) {
return callback(error);
}
// Make sure to clear the stack re: gh-4697
setTimeout(function() {
iterate(callback);
}, 0);
});
});
};

return new Promise.ES6(function(resolve, reject) {
iterate(function(error) {
if (error) {
callback && callback(error);
return reject(error);
}
callback && callback(null);
return resolve();
});
});
};

/*!
* Get the next doc from the underlying cursor and mongooseify it
* (populate, etc.)
*/

function _next(ctx, cb) {
var callback = cb;
if (ctx._transforms.length) {
callback = function(err, doc) {
if (err || doc === null) {
return cb(err, doc);
}
cb(err, ctx._transforms.reduce(function(doc, fn) {
return fn(doc);
}, doc));
};
}

if (ctx._error) {
return process.nextTick(function() {
callback(ctx._error);
});
}

if (ctx.cursor) {
return ctx.cursor.next(function(error, doc) {
if (error) {
return callback(error);
}
if (!doc) {
return callback(null, null);
}

callback(null, doc);
});
} else {
ctx.once('cursor', function() {
_next(ctx, cb);
});
}
}

module.exports = AggregationCursor;
13 changes: 13 additions & 0 deletions test/aggregate.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,19 @@ describe('aggregate: ', function() {
});
});

it('cursor() with useMongooseAggCursor (gh-5145)', function(done) {
var db = start();

var MyModel = db.model('gh5145', { name: String });

var cursor = MyModel.
aggregate([{ $match: { name: 'test' } }]).
cursor({ useMongooseAggCursor: true }).
exec();
assert.ok(cursor instanceof require('stream').Readable);
done();
});

it('cursor() eachAsync (gh-4300)', function(done) {
var db = start();

Expand Down

0 comments on commit b8f0dd8

Please sign in to comment.