Skip to content

Commit

Permalink
Merge branch '4.10' of github.com:Automattic/mongoose into 4.10
Browse files Browse the repository at this point in the history
  • Loading branch information
vkarpov15 committed May 18, 2017
2 parents 9d4c9d4 + b8f0dd8 commit e8c8789
Show file tree
Hide file tree
Showing 14 changed files with 381 additions and 40 deletions.
8 changes: 6 additions & 2 deletions lib/aggregate.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
* Module dependencies
*/

var util = require('util');
var utils = require('./utils');
var AggregationCursor = require('./cursor/AggregationCursor');
var PromiseProvider = require('./promise_provider');
var Query = require('./query');
var util = require('util');
var utils = require('./utils');
var read = Query.prototype.read;

/**
Expand Down Expand Up @@ -631,6 +632,9 @@ Aggregate.prototype.exec = function(callback) {
callback && callback(null, cursor);
});
});
} else if (options.cursor.useMongooseAggCursor) {
delete options.cursor.useMongooseAggCursor;
return new AggregationCursor(this);
}
var cursor = this._model.collection.
aggregate(this._pipeline, this.options || {});
Expand Down
282 changes: 282 additions & 0 deletions lib/cursor/AggregationCursor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
/*!
* Module dependencies.
*/

var PromiseProvider = require('../promise_provider');
var Readable = require('stream').Readable;
var util = require('util');

/**
* An AggregationCursor is a concurrency primitive for processing aggregation
* results one document at a time. It is analogous to QueryCursor.
*
* An AggregationCursor fulfills the [Node.js streams3 API](https://strongloop.com/strongblog/whats-new-io-js-beta-streams3/),
* in addition to several other mechanisms for loading documents from MongoDB
* one at a time.
*
* Unless you're an advanced user, do **not** instantiate this class directly.
* Use [`Aggregate#cursor()`](/docs/api.html#aggregate_Aggregate-cursor) instead.
*
* @param {Aggregate} agg
* @param {Object} options
* @inherits Readable
* @event `cursor`: Emitted when the cursor is created
* @event `error`: Emitted when an error occurred
* @event `data`: Emitted when the stream is flowing and the next doc is ready
* @event `end`: Emitted when the stream is exhausted
* @api public
*/

function AggregationCursor(agg) {
Readable.call(this, { objectMode: true });

this.cursor = null;
this.agg = agg;
this._transforms = [];
var _this = this;
var model = agg._model;
model.collection.aggregate(agg._pipeline, agg.options || {}, function(err, cursor) {
if (_this._error) {
cursor.close(function() {});
_this.listeners('error').length > 0 && _this.emit('error', _this._error);
}
if (err) {
return _this.emit('error', err);
}
_this.cursor = cursor;
_this.emit('cursor', cursor);
});
}

util.inherits(AggregationCursor, Readable);

/*!
* Necessary to satisfy the Readable API
*/

AggregationCursor.prototype._read = function() {
var _this = this;
_next(this, function(error, doc) {
if (error) {
return _this.emit('error', error);
}
if (!doc) {
_this.push(null);
_this.cursor.close(function(error) {
if (error) {
return _this.emit('error', error);
}
setTimeout(function() {
_this.emit('close');
}, 0);
});
return;
}
_this.push(doc);
});
};

/**
* Registers a transform function which subsequently maps documents retrieved
* via the streams interface or `.next()`
*
* ####Example
*
* // Map documents returned by `data` events
* Thing.
* find({ name: /^hello/ }).
* cursor().
* map(function (doc) {
* doc.foo = "bar";
* return doc;
* })
* on('data', function(doc) { console.log(doc.foo); });
*
* // Or map documents returned by `.next()`
* var cursor = Thing.find({ name: /^hello/ }).
* cursor().
* map(function (doc) {
* doc.foo = "bar";
* return doc;
* });
* cursor.next(function(error, doc) {
* console.log(doc.foo);
* });
*
* @param {Function} fn
* @return {QueryCursor}
* @api public
* @method map
*/

AggregationCursor.prototype.map = function(fn) {
this._transforms.push(fn);
return this;
};

/*!
* Marks this cursor as errored
*/

AggregationCursor.prototype._markError = function(error) {
this._error = error;
return this;
};

/**
* Marks this cursor as closed. Will stop streaming and subsequent calls to
* `next()` will error.
*
* @param {Function} callback
* @return {Promise}
* @api public
* @method close
* @emits close
* @see MongoDB driver cursor#close http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html#close
*/

AggregationCursor.prototype.close = function(callback) {
var Promise = PromiseProvider.get();
var _this = this;
return new Promise.ES6(function(resolve, reject) {
_this.cursor.close(function(error) {
if (error) {
callback && callback(error);
reject(error);
return _this.listeners('error').length > 0 &&
_this.emit('error', error);
}
_this.emit('close');
resolve();
callback && callback();
});
});
};

/**
* Get the next document from this cursor. Will return `null` when there are
* no documents left.
*
* @param {Function} callback
* @return {Promise}
* @api public
* @method next
*/

AggregationCursor.prototype.next = function(callback) {
var Promise = PromiseProvider.get();
var _this = this;
return new Promise.ES6(function(resolve, reject) {
_next(_this, function(error, doc) {
if (error) {
callback && callback(error);
return reject(error);
}
callback && callback(null, doc);
resolve(doc);
});
});
};

/**
* Execute `fn` for every document in the cursor. If `fn` returns a promise,
* will wait for the promise to resolve before iterating on to the next one.
* Returns a promise that resolves when done.
*
* @param {Function} fn
* @param {Function} [callback] executed when all docs have been processed
* @return {Promise}
* @api public
* @method eachAsync
*/

AggregationCursor.prototype.eachAsync = function(fn, callback) {
var Promise = PromiseProvider.get();
var _this = this;

var handleNextResult = function(doc, callback) {
var promise = fn(doc);
if (promise && typeof promise.then === 'function') {
promise.then(
function() { callback(null); },
function(error) { callback(error); });
} else {
callback(null);
}
};

var iterate = function(callback) {
return _next(_this, function(error, doc) {
if (error) {
return callback(error);
}
if (!doc) {
return callback(null);
}
handleNextResult(doc, function(error) {
if (error) {
return callback(error);
}
// Make sure to clear the stack re: gh-4697
setTimeout(function() {
iterate(callback);
}, 0);
});
});
};

return new Promise.ES6(function(resolve, reject) {
iterate(function(error) {
if (error) {
callback && callback(error);
return reject(error);
}
callback && callback(null);
return resolve();
});
});
};

/*!
* Get the next doc from the underlying cursor and mongooseify it
* (populate, etc.)
*/

function _next(ctx, cb) {
var callback = cb;
if (ctx._transforms.length) {
callback = function(err, doc) {
if (err || doc === null) {
return cb(err, doc);
}
cb(err, ctx._transforms.reduce(function(doc, fn) {
return fn(doc);
}, doc));
};
}

if (ctx._error) {
return process.nextTick(function() {
callback(ctx._error);
});
}

if (ctx.cursor) {
return ctx.cursor.next(function(error, doc) {
if (error) {
return callback(error);
}
if (!doc) {
return callback(null, null);
}

callback(null, doc);
});
} else {
ctx.once('cursor', function() {
_next(ctx, cb);
});
}
}

module.exports = AggregationCursor;
4 changes: 4 additions & 0 deletions lib/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,10 @@ Schema.interpretAsType = function(path, obj, options) {
'You can only nest using refs or arrays.');
}

obj = utils.clone(obj, { retainKeyOrder: true });
if (!('runSettersOnQuery' in obj)) {
obj.runSettersOnQuery = options.runSettersOnQuery;
}
return new MongooseTypes[name](path, obj);
};

Expand Down
4 changes: 2 additions & 2 deletions lib/schema/boolean.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ SchemaBoolean.prototype.castForQuery = function($conditional, val) {
return handler.call(this, val);
}

return this.cast(val);
return this._castForQuery(val);
}

return this.cast($conditional);
return this._castForQuery($conditional);
};

/*!
Expand Down
2 changes: 1 addition & 1 deletion lib/schema/buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ SchemaBuffer.prototype.castForQuery = function($conditional, val) {
return handler.call(this, val);
}
val = $conditional;
var casted = this.cast(val);
var casted = this._castForQuery(val);
return casted ? casted.toObject({ transform: false, virtuals: false }) : casted;
};

Expand Down
2 changes: 1 addition & 1 deletion lib/schema/date.js
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ SchemaDate.prototype.castForQuery = function($conditional, val) {
var handler;

if (arguments.length !== 2) {
return this.cast($conditional);
return this._castForQuery($conditional);
}

handler = this.$conditionalHandlers[$conditional];
Expand Down
20 changes: 0 additions & 20 deletions lib/schema/decimal128.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,6 @@ Decimal128.prototype.$conditionalHandlers =
$lte: handleSingle
});

/**
* Casts contents for queries.
*
* @param {String} $conditional
* @param {any} [val]
* @api private
*/

Decimal128.prototype.castForQuery = function($conditional, val) {
var handler;
if (arguments.length === 2) {
handler = this.$conditionalHandlers[$conditional];
if (!handler) {
throw new Error('Can\'t use ' + $conditional + ' with ObjectId.');
}
return handler.call(this, val);
}
return this.cast($conditional);
};

/*!
* Module exports.
*/
Expand Down
4 changes: 4 additions & 0 deletions lib/schema/embedded.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ Embedded.prototype.castForQuery = function($conditional, val) {
return val;
}

if (this.options.runSetters) {
val = this._applySetters(val);
}

return new this.caster(val);
};

Expand Down
2 changes: 1 addition & 1 deletion lib/schema/number.js
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ SchemaNumber.prototype.castForQuery = function($conditional, val) {
}
return handler.call(this, val);
}
val = this.cast($conditional);
val = this._castForQuery($conditional);
return val;
};

Expand Down
Loading

0 comments on commit e8c8789

Please sign in to comment.