From 325b204d2c0987abc139d5da4e42d853aa8933d8 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Tue, 26 Jan 2016 13:21:21 +0100 Subject: [PATCH 01/20] initial commit --- lib/databases/elasticsearch.js | 573 +++++++++++++++++++++++++++++++++ 1 file changed, 573 insertions(+) create mode 100644 lib/databases/elasticsearch.js diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js new file mode 100644 index 00000000..175a0f7e --- /dev/null +++ b/lib/databases/elasticsearch.js @@ -0,0 +1,573 @@ +// 'use strict'; + +// var util = require('util'), +// Store = require('../base'), +// _ = require('lodash'), +// async = require('async'), +// mongo = Store.use('mongodb'), +// mongoVersion = Store.use('mongodb/package.json').version, +// isNew = mongoVersion.indexOf('1.') !== 0, +// ObjectID = isNew ? mongo.ObjectID : mongo.BSONPure.ObjectID, +// debug = require('debug')('eventstore:store:mongodb'); + +// function Mongo(options) { +// options = options || {}; + +// Store.call(this, options); + +// var defaults = { +// host: 'localhost', +// port: 27017, +// dbName: 'eventstore', +// eventsCollectionName: 'events', +// snapshotsCollectionName: 'snapshots', +// transactionsCollectionName: 'transactions' +// }; + +// _.defaults(options, defaults); + +// var defaultOpt = { +// auto_reconnect: false, +// ssl: false +// }; + +// options.options = options.options || {}; + +// _.defaults(options.options, defaultOpt); + +// this.options = options; +// } + +// util.inherits(Mongo, Store); + +// _.extend(Mongo.prototype, { + +// connect: function (callback) { +// var self = this; + +// var options = this.options; + +// var server; + +// if (options.servers && Array.isArray(options.servers)){ +// var servers = []; + +// options.servers.forEach(function(item){ +// if(item.host && item.port) { +// servers.push(new mongo.Server(item.host, item.port, item.options)); +// } +// }); + +// server = new mongo.ReplSet(servers); +// } else { +// server = new mongo.Server(options.host, options.port, options.options); +// } + +// this.db = new mongo.Db(options.dbName, server, { safe: true }); +// this.db.on('close', function() { +// self.emit('disconnect'); +// self.stopHeartbeat(); +// }); + +// this.db.open(function (err, client) { +// if (err) { +// debug(err); +// if (callback) callback(err); +// return; +// } +// function finish (err) { +// if (err) { +// debug(err); +// if (callback) callback(err); +// return; +// } + +// self.client = client; + +// self.events = self.db.collection(options.eventsCollectionName); +// self.events.ensureIndex({ aggregateId: 1, streamRevision: 1 }, +// function (err) { if (err) { debug(err); } }); +// self.events.ensureIndex({ commitStamp: 1 }, +// function (err) { if (err) { debug(err); } }); +// self.events.ensureIndex({ dispatched: 1 }, { sparse: true }, +// function (err) { if (err) { debug(err); } }); + +// self.snapshots = self.db.collection(options.snapshotsCollectionName); +// self.snapshots.ensureIndex({ aggregateId: 1, revision: -1 }, +// function (err) { if (err) { debug(err); } }); + +// self.transactions = self.db.collection(options.transactionsCollectionName); +// self.transactions.ensureIndex({ aggregateId: 1, 'events.streamRevision': 1 }, +// function (err) { if (err) { debug(err); } }); + +// self.emit('connect'); +// if (self.options.heartbeat) { +// self.startHeartbeat(); +// } +// if (callback) callback(null, self); +// } + +// // Authenticate with authSource +// if (options.authSource && options.username) { +// // Authenticate with authSource +// return client.authenticate(options.username, options.password, {authSource: options.authSource}, finish); +// } + +// if (options.username) { +// return client.authenticate(options.username, options.password, finish); +// } + +// finish(); +// }); +// }, + +// stopHeartbeat: function () { +// if (this.heartbeatInterval) { +// clearInterval(this.heartbeatInterval); +// delete this.heartbeatInterval; +// } +// }, + +// startHeartbeat: function () { +// var self = this; + +// var gracePeriod = Math.round(this.options.heartbeat / 2); +// this.heartbeatInterval = setInterval(function () { +// var graceTimer = setTimeout(function () { +// if (self.heartbeatInterval) { +// console.error((new Error ('Heartbeat timeouted after ' + gracePeriod + 'ms')).stack); +// self.db.close(function () {}); +// } +// }, gracePeriod); + +// self.db.command({ ping: 1 }, function (err) { +// if (graceTimer) clearTimeout(graceTimer); +// if (err) { +// console.error(err.stack || err); +// self.db.close(function () {}); +// } +// }); +// }, this.options.heartbeat); +// }, + +// disconnect: function (callback) { +// this.stopHeartbeat(); + +// if (!this.db) { +// if (callback) callback(null); +// return; +// } + +// this.db.close(function (err) { +// if (err) { +// debug(err); +// } +// if (callback) callback(err); +// }); +// }, + +// clear: function (callback) { +// var self = this; +// async.parallel([ +// function (callback) { +// self.events.remove({}, callback); +// }, +// function (callback) { +// self.snapshots.remove({}, callback); +// }, +// function (callback) { +// self.transactions.remove({}, callback); +// } +// ], function (err) { +// if (err) { +// debug(err); +// } +// if (callback) callback(err); +// }); +// }, + +// getNewId: function(callback) { +// callback(null, new ObjectID().toString()); +// }, + +// addEvents: function (events, callback) { +// if (events.length === 0) { +// if (callback) { callback(null); } +// return; +// } + +// var commitId = events[0].commitId; + +// var noAggregateId = false, +// invalidCommitId = false; + +// _.forEach(events, function (evt) { +// if (!evt.aggregateId) { +// noAggregateId = true; +// } + +// if (!evt.commitId || evt.commitId !== commitId) { +// invalidCommitId = true; +// } + +// evt._id = evt.id; +// evt.dispatched = false; +// }); + +// if (noAggregateId) { +// var errMsg = 'aggregateId not defined!'; +// debug(errMsg); +// if (callback) callback(new Error(errMsg)); +// return; +// } + +// if (invalidCommitId) { +// var errMsg = 'commitId not defined or different!'; +// debug(errMsg); +// if (callback) callback(new Error(errMsg)); +// return; +// } + +// var self = this; + +// if (events.length === 1) { +// return this.events.insert(events, callback); +// } + +// var tx = { +// _id: commitId, +// events: events, +// aggregateId: events[0].aggregateId, +// aggregate: events[0].aggregate, +// context: events[0].context +// }; + +// this.transactions.insert(tx, function (err) { +// if (err) { +// debug(err); +// if (callback) callback(err); +// return; +// } + +// self.events.insert(events, function (err) { +// if (err) { +// debug(err); +// if (callback) callback(err); +// return; +// } + +// self.removeTransactions(events[events.length - 1]); + +// if (callback) { callback(null); } +// }); +// }); +// }, + +// getEvents: function (query, skip, limit, callback) { +// var findStatement = {}; + +// if (query.aggregate) { +// findStatement.aggregate = query.aggregate; +// } + +// if (query.context) { +// findStatement.context = query.context; +// } + +// if (query.aggregateId) { +// findStatement.aggregateId = query.aggregateId; +// } + +// if (limit === -1) { +// return this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).skip(skip).toArray(callback); +// } + +// this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).skip(skip).limit(limit).toArray(callback); +// }, + +// getEventsSince: function (date, skip, limit, callback) { +// var findStatement = { commitStamp: { '$gte': date } }; + +// if (limit === -1) { +// return this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).skip(skip).toArray(callback); +// } + +// this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).skip(skip).limit(limit).toArray(callback); +// }, + +// getEventsByRevision: function (query, revMin, revMax, callback) { +// if (!query.aggregateId) { +// var errMsg = 'aggregateId not defined!'; +// debug(errMsg); +// if (callback) callback(new Error(errMsg)); +// return; +// } + +// var streamRevOptions = { '$gte': revMin, '$lt': revMax }; +// if (revMax === -1) { +// streamRevOptions = { '$gte': revMin }; +// } + +// var findStatement = { +// aggregateId: query.aggregateId, +// streamRevision: streamRevOptions +// }; + +// if (query.aggregate) { +// findStatement.aggregate = query.aggregate; +// } + +// if (query.context) { +// findStatement.context = query.context; +// } + +// var self = this; + +// this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).toArray(function (err, res) { +// if (err) { +// debug(err); +// return callback(err); +// } + +// if (!res || res.length === 0) { +// return callback(null, []); +// } + +// var lastEvt = res[res.length - 1]; + +// var txOk = (revMax === -1 && !lastEvt.restInCommitStream) || +// (revMax !== -1 && (lastEvt.streamRevision === revMax - 1 || !lastEvt.restInCommitStream)); + +// if (txOk) { +// // the following is usually unnecessary +// self.removeTransactions(lastEvt); + +// return callback(null, res); +// } + +// self.repairFailedTransaction(lastEvt, function (err) { +// if (err) { +// if (err.message.indexOf('missing tx entry') >= 0) { +// return callback(null, res); +// } +// debug(err); +// return callback(err); +// } + +// self.getEventsByRevision(query, revMin, revMax, callback); +// }); +// }); +// }, + +// getUndispatchedEvents: function (query, callback) { +// var findStatement = { +// dispatched: false +// }; + +// if (query && query.aggregate) { +// findStatement.aggregate = query.aggregate; +// } + +// if (query && query.context) { +// findStatement.context = query.context; +// } + +// if (query && query.aggregateId) { +// findStatement.aggregateId = query.aggregateId; +// } + +// this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).toArray(callback); +// }, + +// setEventToDispatched: function (id, callback) { +// var updateCommand = { '$unset' : { 'dispatched': null } }; +// this.events.update({'_id' : id}, updateCommand, callback); +// }, + +// addSnapshot: function(snap, callback) { +// if (!snap.aggregateId) { +// var errMsg = 'aggregateId not defined!'; +// debug(errMsg); +// if (callback) callback(new Error(errMsg)); +// return; +// } + +// snap._id = snap.id; +// this.snapshots.insert(snap, callback); +// }, + +// getSnapshot: function (query, revMax, callback) { +// if (!query.aggregateId) { +// var errMsg = 'aggregateId not defined!'; +// debug(errMsg); +// if (callback) callback(new Error(errMsg)); +// return; +// } + +// var findStatement = { +// aggregateId: query.aggregateId +// }; + +// if (query.aggregate) { +// findStatement.aggregate = query.aggregate; +// } + +// if (query.context) { +// findStatement.context = query.context; +// } + +// if (revMax > -1) { +// findStatement.revision = { '$lte': revMax }; +// } + +// this.snapshots.findOne(findStatement, { sort: [['revision', 'desc'], ['version', 'desc'], ['commitStamp', 'desc']] }, callback); +// }, + +// removeTransactions: function (evt, callback) { +// if (!evt.aggregateId) { +// var errMsg = 'aggregateId not defined!'; +// debug(errMsg); +// if (callback) callback(new Error(errMsg)); +// return; +// } + +// var findStatement = { aggregateId: evt.aggregateId }; + +// if (evt.aggregate) { +// findStatement.aggregate = evt.aggregate; +// } + +// if (evt.context) { +// findStatement.context = evt.context; +// } + +// // the following is usually unnecessary +// this.transactions.remove(findStatement, function (err) { +// if (err) { +// debug(err); +// } +// if (callback) { callback(err); } +// }); +// }, + +// getPendingTransactions: function (callback) { +// var self = this; +// this.transactions.find({}).toArray(function (err, txs) { +// if (err) { +// debug(err); +// return callback(err); +// } + +// if (txs.length === 0) { +// return callback(null, txs); +// } + +// var goodTxs = []; + +// async.map(txs, function (tx, clb) { +// var findStatement = { commitId: tx._id, aggregateId: tx.aggregateId }; + +// if (tx.aggregate) { +// findStatement.aggregate = tx.aggregate; +// } + +// if (tx.context) { +// findStatement.context = tx.context; +// } + +// self.events.findOne(findStatement, function (err, evt) { +// if (err) { +// return clb(err); +// } + +// if (evt) { +// goodTxs.push(evt); +// } else { +// self.transactions.remove({ _id: tx._id }, function (err) { +// if (err) { +// debug(err); +// } +// }); +// } + +// clb(null); +// }); +// }, function (err) { +// if (err) { +// debug(err); +// return callback(err); +// } + +// callback(null, goodTxs); +// }) +// }); +// }, + +// getLastEvent: function (query, callback) { +// if (!query.aggregateId) { +// var errMsg = 'aggregateId not defined!'; +// debug(errMsg); +// if (callback) callback(new Error(errMsg)); +// return; +// } + +// var findStatement = { aggregateId: query.aggregateId }; + +// if (query.aggregate) { +// findStatement.aggregate = query.aggregate; +// } + +// if (query.context) { +// findStatement.context = query.context; +// } + +// this.events.findOne(findStatement, { sort: [['commitStamp', 'desc'], ['streamRevision', 'desc'], ['commitSequence', 'desc']] }, callback); +// }, + +// repairFailedTransaction: function (lastEvt, callback) { +// var self = this; + +// //var findStatement = { +// // aggregateId: lastEvt.aggregateId, +// // 'events.streamRevision': lastEvt.streamRevision + 1 +// //}; +// // +// //if (lastEvt.aggregate) { +// // findStatement.aggregate = lastEvt.aggregate; +// //} +// // +// //if (lastEvt.context) { +// // findStatement.context = lastEvt.context; +// //} + +// //this.transactions.findOne(findStatement, function (err, tx) { +// this.transactions.findOne({ _id: lastEvt.commitId }, function (err, tx) { +// if (err) { +// debug(err); +// return callback(err); +// } + +// if (!tx) { +// var err = new Error('missing tx entry for aggregate ' + lastEvt.aggregateId); +// debug(err); +// return callback(err); +// } + +// var missingEvts = tx.events.slice(tx.events.length - lastEvt.restInCommitStream); + +// self.events.insert(missingEvts, function (err) { +// if (err) { +// debug(err); +// return callback(err); +// } + +// self.removeTransactions(lastEvt); + +// callback(null); +// }); +// }); +// } + +// }); + +// module.exports = Mongo; From 4b89e3e91718af51a2d393bcfdb0a3b9ef5df3cd Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Tue, 26 Jan 2016 14:40:19 +0100 Subject: [PATCH 02/20] first starting point elasticsearch driver --- lib/databases/elasticsearch.js | 264 +++++++++------------------------ package.json | 4 +- test/eventstoreTest.js | 2 +- test/storeTest.js | 2 +- 4 files changed, 76 insertions(+), 196 deletions(-) diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index 175a0f7e..7703202a 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -1,194 +1,72 @@ -// 'use strict'; - -// var util = require('util'), -// Store = require('../base'), -// _ = require('lodash'), -// async = require('async'), -// mongo = Store.use('mongodb'), -// mongoVersion = Store.use('mongodb/package.json').version, -// isNew = mongoVersion.indexOf('1.') !== 0, -// ObjectID = isNew ? mongo.ObjectID : mongo.BSONPure.ObjectID, -// debug = require('debug')('eventstore:store:mongodb'); - -// function Mongo(options) { -// options = options || {}; - -// Store.call(this, options); - -// var defaults = { -// host: 'localhost', -// port: 27017, -// dbName: 'eventstore', -// eventsCollectionName: 'events', -// snapshotsCollectionName: 'snapshots', -// transactionsCollectionName: 'transactions' -// }; - -// _.defaults(options, defaults); - -// var defaultOpt = { -// auto_reconnect: false, -// ssl: false -// }; - -// options.options = options.options || {}; - -// _.defaults(options.options, defaultOpt); - -// this.options = options; -// } - -// util.inherits(Mongo, Store); - -// _.extend(Mongo.prototype, { - -// connect: function (callback) { -// var self = this; - -// var options = this.options; - -// var server; - -// if (options.servers && Array.isArray(options.servers)){ -// var servers = []; - -// options.servers.forEach(function(item){ -// if(item.host && item.port) { -// servers.push(new mongo.Server(item.host, item.port, item.options)); -// } -// }); - -// server = new mongo.ReplSet(servers); -// } else { -// server = new mongo.Server(options.host, options.port, options.options); -// } - -// this.db = new mongo.Db(options.dbName, server, { safe: true }); -// this.db.on('close', function() { -// self.emit('disconnect'); -// self.stopHeartbeat(); -// }); - -// this.db.open(function (err, client) { -// if (err) { -// debug(err); -// if (callback) callback(err); -// return; -// } -// function finish (err) { -// if (err) { -// debug(err); -// if (callback) callback(err); -// return; -// } - -// self.client = client; - -// self.events = self.db.collection(options.eventsCollectionName); -// self.events.ensureIndex({ aggregateId: 1, streamRevision: 1 }, -// function (err) { if (err) { debug(err); } }); -// self.events.ensureIndex({ commitStamp: 1 }, -// function (err) { if (err) { debug(err); } }); -// self.events.ensureIndex({ dispatched: 1 }, { sparse: true }, -// function (err) { if (err) { debug(err); } }); - -// self.snapshots = self.db.collection(options.snapshotsCollectionName); -// self.snapshots.ensureIndex({ aggregateId: 1, revision: -1 }, -// function (err) { if (err) { debug(err); } }); - -// self.transactions = self.db.collection(options.transactionsCollectionName); -// self.transactions.ensureIndex({ aggregateId: 1, 'events.streamRevision': 1 }, -// function (err) { if (err) { debug(err); } }); - -// self.emit('connect'); -// if (self.options.heartbeat) { -// self.startHeartbeat(); -// } -// if (callback) callback(null, self); -// } - -// // Authenticate with authSource -// if (options.authSource && options.username) { -// // Authenticate with authSource -// return client.authenticate(options.username, options.password, {authSource: options.authSource}, finish); -// } - -// if (options.username) { -// return client.authenticate(options.username, options.password, finish); -// } - -// finish(); -// }); -// }, - -// stopHeartbeat: function () { -// if (this.heartbeatInterval) { -// clearInterval(this.heartbeatInterval); -// delete this.heartbeatInterval; -// } -// }, - -// startHeartbeat: function () { -// var self = this; - -// var gracePeriod = Math.round(this.options.heartbeat / 2); -// this.heartbeatInterval = setInterval(function () { -// var graceTimer = setTimeout(function () { -// if (self.heartbeatInterval) { -// console.error((new Error ('Heartbeat timeouted after ' + gracePeriod + 'ms')).stack); -// self.db.close(function () {}); -// } -// }, gracePeriod); - -// self.db.command({ ping: 1 }, function (err) { -// if (graceTimer) clearTimeout(graceTimer); -// if (err) { -// console.error(err.stack || err); -// self.db.close(function () {}); -// } -// }); -// }, this.options.heartbeat); -// }, - -// disconnect: function (callback) { -// this.stopHeartbeat(); - -// if (!this.db) { -// if (callback) callback(null); -// return; -// } - -// this.db.close(function (err) { -// if (err) { -// debug(err); -// } -// if (callback) callback(err); -// }); -// }, - -// clear: function (callback) { -// var self = this; -// async.parallel([ -// function (callback) { -// self.events.remove({}, callback); -// }, -// function (callback) { -// self.snapshots.remove({}, callback); -// }, -// function (callback) { -// self.transactions.remove({}, callback); -// } -// ], function (err) { -// if (err) { -// debug(err); -// } -// if (callback) callback(err); -// }); -// }, - -// getNewId: function(callback) { -// callback(null, new ObjectID().toString()); -// }, +'use strict'; + +var util = require('util'), + Store = require('../base'), + _ = require('lodash'), + async = require('async'), + uuid = require('uuid'), + elasticsearch = Store.use('elasticsearch'), + elasticsearchVersion = Store.use('elasticsearch/package.json').version, + // isNew = mongoVersion.indexOf('1.') !== 0, + // ObjectID = isNew ? mongo.ObjectID : mongo.BSONPure.ObjectID, + debug = require('debug')('eventstore:store:elasticsearch'); + +function Elastic(options) { + options = options || {}; + + Store.call(this, options); + + var defaults = { + host: '52.58.4.8', + port: 9200, + indexName: 'eventstore', + eventsTypeName: 'events', + snapshotsTypeName: 'snapshots', + transactionsTypeName: 'transactions', + log: 'trace' + }; + + _.defaults(options, defaults); + + var defaultOpt = { + auto_reconnect: false, + ssl: false + }; + + options.options = options.options || {}; + + _.defaults(options.options, defaultOpt); + + this.options = options; +} + +util.inherits(Elastic, Store); + +_.extend(Elastic.prototype, { + + connect: function (callback) { + var options = this.options; + this.client = new elasticsearch.Client({host: options.host+':'+options.port, log: options.log}); + this.emit('connect'); + if (callback) callback(null); + }, + + disconnect: function (callback) { + this.client = null; + this.emit('disconnect'); + if (callback) callback(null); + }, + + clear: function (callback) { + var options = this.options; + this.client.indices.delete({index: options.indexName}, function (err) { + if (callback) callback(err); + }); + }, + + getNewId: function(callback) { + callback(null, uuid.v4()); + }, // addEvents: function (events, callback) { // if (events.length === 0) { @@ -568,6 +446,6 @@ // }); // } -// }); +}); -// module.exports = Mongo; +module.exports = Elastic; diff --git a/package.json b/package.json index dac43765..2ee03332 100644 --- a/package.json +++ b/package.json @@ -46,11 +46,13 @@ "lodash": "3.10.1", "node-uuid": "1.4.7", "parent-require": "1.0.0", - "tolerance": "1.0.0" + "tolerance": "1.0.0", + "uuid": "^2.0.1" }, "devDependencies": { "azure-storage": ">= 0.3.0", "cradle": ">=0.6.7", + "elasticsearch": ">= 10.0.0", "eslint": ">=1.0.0", "expect.js": ">= 0.1.2", "mocha": ">= 1.0.1", diff --git a/test/eventstoreTest.js b/test/eventstoreTest.js index 3027635d..b46751ee 100644 --- a/test/eventstoreTest.js +++ b/test/eventstoreTest.js @@ -800,7 +800,7 @@ describe('eventstore', function () { describe('with options containing a type property with the value of', function () { - var types = ['inmemory', 'mongodb', 'tingodb', 'redis'/*, 'azuretable'*/]; + var types = [/*'inmemory', */'elasticsearch'/*, 'mongodb', 'tingodb', 'redis'/*, 'azuretable'*/]; types.forEach(function (type) { diff --git a/test/storeTest.js b/test/storeTest.js index 088436de..63154074 100644 --- a/test/storeTest.js +++ b/test/storeTest.js @@ -3,7 +3,7 @@ var expect = require('expect.js'), async = require('async'), _ = require('lodash'); -var types = ['inmemory', 'mongodb', 'tingodb', 'redis'/*, 'azuretable'*/]; +var types = [/*'inmemory', */'elasticsearch'/*, 'mongodb', 'tingodb', 'redis'/*, 'azuretable'*/]; types.forEach(function (type) { From 4e284f3777382e7ecf12a20036a1b8809a9a6ba5 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Tue, 26 Jan 2016 15:16:09 +0100 Subject: [PATCH 03/20] added the addEvents function --- lib/databases/elasticsearch.js | 111 +++++++++++---------------------- 1 file changed, 38 insertions(+), 73 deletions(-) diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index 7703202a..49f9a4a1 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -58,9 +58,16 @@ _.extend(Elastic.prototype, { }, clear: function (callback) { + var self = this; var options = this.options; - this.client.indices.delete({index: options.indexName}, function (err) { - if (callback) callback(err); + this.client.indices.exists({index: options.indexName}, function (err, result) { + if (result){ + self.client.indices.delete({index: options.indexName}, function (err) { + if (callback) callback(err); + }); + } else { + if (callback) callback(err); + } }); }, @@ -68,78 +75,36 @@ _.extend(Elastic.prototype, { callback(null, uuid.v4()); }, -// addEvents: function (events, callback) { -// if (events.length === 0) { -// if (callback) { callback(null); } -// return; -// } - -// var commitId = events[0].commitId; - -// var noAggregateId = false, -// invalidCommitId = false; - -// _.forEach(events, function (evt) { -// if (!evt.aggregateId) { -// noAggregateId = true; -// } - -// if (!evt.commitId || evt.commitId !== commitId) { -// invalidCommitId = true; -// } - -// evt._id = evt.id; -// evt.dispatched = false; -// }); - -// if (noAggregateId) { -// var errMsg = 'aggregateId not defined!'; -// debug(errMsg); -// if (callback) callback(new Error(errMsg)); -// return; -// } - -// if (invalidCommitId) { -// var errMsg = 'commitId not defined or different!'; -// debug(errMsg); -// if (callback) callback(new Error(errMsg)); -// return; -// } - -// var self = this; - -// if (events.length === 1) { -// return this.events.insert(events, callback); -// } - -// var tx = { -// _id: commitId, -// events: events, -// aggregateId: events[0].aggregateId, -// aggregate: events[0].aggregate, -// context: events[0].context -// }; - -// this.transactions.insert(tx, function (err) { -// if (err) { -// debug(err); -// if (callback) callback(err); -// return; -// } - -// self.events.insert(events, function (err) { -// if (err) { -// debug(err); -// if (callback) callback(err); -// return; -// } - -// self.removeTransactions(events[events.length - 1]); + addEvents: function (events, callback) { + var options = this.options; + + if (events.length === 0) { + if (callback) callback(null); + return; + } + + var noAggId = false + var bulkMap = []; + + _.forEach(events, function (evt) { + if (!evt.aggregateId) { + noAggId = true; + } + bulkMap.push({index: options.indexName, type: options.eventsTypeName, id: evt.id}); + bulkMap.push(evt); + }); -// if (callback) { callback(null); } -// }); -// }); -// }, + if (noAggId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + this.client.bulk({body: bulkMap}, function(err){ + if (callback) callback(err); + }); + }, // getEvents: function (query, skip, limit, callback) { // var findStatement = {}; From 2f5e5fe00d96cf0ff53789e5fa3ed82d38f58e0c Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Tue, 26 Jan 2016 15:32:22 +0100 Subject: [PATCH 04/20] add action name to bulk; --- lib/databases/elasticsearch.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index 49f9a4a1..394e6389 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -90,7 +90,7 @@ _.extend(Elastic.prototype, { if (!evt.aggregateId) { noAggId = true; } - bulkMap.push({index: options.indexName, type: options.eventsTypeName, id: evt.id}); + bulkMap.push({create: {index: options.indexName, type: options.eventsTypeName, id: evt.id}}); bulkMap.push(evt); }); From cd0d5497bae09524c3505ff51ceca517c92733f6 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Wed, 27 Jan 2016 17:17:24 +0100 Subject: [PATCH 05/20] progress on elasticsearch driver --- lib/databases/elasticsearch.js | 439 ++++++++++----------------------- 1 file changed, 131 insertions(+), 308 deletions(-) diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index 394e6389..a28fefd1 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -22,8 +22,7 @@ function Elastic(options) { indexName: 'eventstore', eventsTypeName: 'events', snapshotsTypeName: 'snapshots', - transactionsTypeName: 'transactions', - log: 'trace' + log: 'warning' }; _.defaults(options, defaults); @@ -90,7 +89,8 @@ _.extend(Elastic.prototype, { if (!evt.aggregateId) { noAggId = true; } - bulkMap.push({create: {index: options.indexName, type: options.eventsTypeName, id: evt.id}}); + evt.dispatched = false; + bulkMap.push({create: {_index: options.indexName, _type: options.eventsTypeName, _id: evt.id}}); bulkMap.push(evt); }); @@ -100,316 +100,139 @@ _.extend(Elastic.prototype, { if (callback) callback(new Error(errMsg)); return; } + this.client.bulk({body: bulkMap, refresh: true}, function(error, response){ + if (callback) callback(error); + }); + }, + + _search: function (type, find, sort, skip, limit, callback) { + const searchOptions = { + index: this.options.indexName, + type: type, + q: find.join(' AND '), + sort: sort, + defaultOperator: 'AND', + from: (!skip ? 0 : skip), + size: (!limit || limit === -1 ? 10000 : limit) + }; + this.client.search(searchOptions, function (error, response) { + var dataList = []; + if (response && response.hits && response.hits.hits && response.hits.hits.length) { + dataList = response.hits.hits.map((data) => { + return data._source; + }); + } + callback(null, dataList); + }); + }, + + _searchEvents: function(find, skip, limit, callback) { + this._search(this.options.eventsTypeName, find, ['commitStamp:asc', 'streamRevision:asc', 'commitSequence:asc'], skip, limit, callback); + }, + + _searchSnapshots: function(find, skip, limit, callback) { + this._search(this.options.snapshotsTypeName, find, ['revision:desc', 'version:desc', 'commitStamp:desc'], skip, limit, callback); + }, + + getEvents: function (query, skip, limit, callback) { + var findStatement = []; + if (query.aggregate) findStatement.push('aggregate:' + query.aggregate); + if (query.context) findStatement.push('context:' + query.context); + if (query.aggregateId) findStatement.push('aggregateId:' + query.aggregateId); + + this._searchEvents(findStatement, skip, limit, callback); + }, + + getEventsSince: function (date, skip, limit, callback) { + var findStatement = ['commitStamp:[' + date.toJSON() + ' TO *]']; + + this._searchEvents(findStatement, skip, limit, callback); + }, + + getEventsByRevision: function (query, revMin, revMax, callback) { + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var findStatement = []; + if (revMax === -1) { + findStatement.push('streamRevision:[' + revMin + ' TO *]'); + } else { + findStatement.push('streamRevision:[' + revMin + ' TO ' + revMax + '}'); + } + findStatement.push('aggregateId:' + query.aggregateId); + if (query.aggregate) findStatement.push('aggregate:' + query.aggregate); + if (query.context) findStatement.push('context:' + query.context); + + this._searchEvents(findStatement, null, null, callback); + }, + + getUndispatchedEvents: function (query, callback) { + var findStatement = ['dispatched:false']; + if (query && query.aggregate) findStatement.push('aggregate:' + query.aggregate); + if (query && query.context) findStatement.push('context:' + query.context); + if (query && query.aggregateId) findStatement.push('aggregateId:' + query.aggregateId); - this.client.bulk({body: bulkMap}, function(err){ - if (callback) callback(err); + this._searchEvents(findStatement, null, null, callback); + }, + + setEventToDispatched: function (id, callback) { + this.client.update({ + index: this.options.indexName, + type: this.options.eventsTypeName, + id: id, + body: { + doc: { + dispatched: true + } + }, + refresh: true + }, function (error, response) { + if (callback) callback(error); }); }, -// getEvents: function (query, skip, limit, callback) { -// var findStatement = {}; - -// if (query.aggregate) { -// findStatement.aggregate = query.aggregate; -// } - -// if (query.context) { -// findStatement.context = query.context; -// } - -// if (query.aggregateId) { -// findStatement.aggregateId = query.aggregateId; -// } - -// if (limit === -1) { -// return this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).skip(skip).toArray(callback); -// } - -// this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).skip(skip).limit(limit).toArray(callback); -// }, + addSnapshot: function(snap, callback) { + if (!snap.aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } -// getEventsSince: function (date, skip, limit, callback) { -// var findStatement = { commitStamp: { '$gte': date } }; - -// if (limit === -1) { -// return this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).skip(skip).toArray(callback); -// } - -// this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).skip(skip).limit(limit).toArray(callback); -// }, - -// getEventsByRevision: function (query, revMin, revMax, callback) { -// if (!query.aggregateId) { -// var errMsg = 'aggregateId not defined!'; -// debug(errMsg); -// if (callback) callback(new Error(errMsg)); -// return; -// } - -// var streamRevOptions = { '$gte': revMin, '$lt': revMax }; -// if (revMax === -1) { -// streamRevOptions = { '$gte': revMin }; -// } - -// var findStatement = { -// aggregateId: query.aggregateId, -// streamRevision: streamRevOptions -// }; - -// if (query.aggregate) { -// findStatement.aggregate = query.aggregate; -// } - -// if (query.context) { -// findStatement.context = query.context; -// } - -// var self = this; - -// this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).toArray(function (err, res) { -// if (err) { -// debug(err); -// return callback(err); -// } - -// if (!res || res.length === 0) { -// return callback(null, []); -// } - -// var lastEvt = res[res.length - 1]; - -// var txOk = (revMax === -1 && !lastEvt.restInCommitStream) || -// (revMax !== -1 && (lastEvt.streamRevision === revMax - 1 || !lastEvt.restInCommitStream)); - -// if (txOk) { -// // the following is usually unnecessary -// self.removeTransactions(lastEvt); - -// return callback(null, res); -// } - -// self.repairFailedTransaction(lastEvt, function (err) { -// if (err) { -// if (err.message.indexOf('missing tx entry') >= 0) { -// return callback(null, res); -// } -// debug(err); -// return callback(err); -// } - -// self.getEventsByRevision(query, revMin, revMax, callback); -// }); -// }); -// }, - -// getUndispatchedEvents: function (query, callback) { -// var findStatement = { -// dispatched: false -// }; - -// if (query && query.aggregate) { -// findStatement.aggregate = query.aggregate; -// } - -// if (query && query.context) { -// findStatement.context = query.context; -// } - -// if (query && query.aggregateId) { -// findStatement.aggregateId = query.aggregateId; -// } - -// this.events.find(findStatement, { sort: [['commitStamp', 'asc'], ['streamRevision', 'asc'], ['commitSequence', 'asc']] }).toArray(callback); -// }, - -// setEventToDispatched: function (id, callback) { -// var updateCommand = { '$unset' : { 'dispatched': null } }; -// this.events.update({'_id' : id}, updateCommand, callback); -// }, - -// addSnapshot: function(snap, callback) { -// if (!snap.aggregateId) { -// var errMsg = 'aggregateId not defined!'; -// debug(errMsg); -// if (callback) callback(new Error(errMsg)); -// return; -// } - -// snap._id = snap.id; -// this.snapshots.insert(snap, callback); -// }, - -// getSnapshot: function (query, revMax, callback) { -// if (!query.aggregateId) { -// var errMsg = 'aggregateId not defined!'; -// debug(errMsg); -// if (callback) callback(new Error(errMsg)); -// return; -// } - -// var findStatement = { -// aggregateId: query.aggregateId -// }; - -// if (query.aggregate) { -// findStatement.aggregate = query.aggregate; -// } - -// if (query.context) { -// findStatement.context = query.context; -// } - -// if (revMax > -1) { -// findStatement.revision = { '$lte': revMax }; -// } - -// this.snapshots.findOne(findStatement, { sort: [['revision', 'desc'], ['version', 'desc'], ['commitStamp', 'desc']] }, callback); -// }, - -// removeTransactions: function (evt, callback) { -// if (!evt.aggregateId) { -// var errMsg = 'aggregateId not defined!'; -// debug(errMsg); -// if (callback) callback(new Error(errMsg)); -// return; -// } - -// var findStatement = { aggregateId: evt.aggregateId }; - -// if (evt.aggregate) { -// findStatement.aggregate = evt.aggregate; -// } - -// if (evt.context) { -// findStatement.context = evt.context; -// } - -// // the following is usually unnecessary -// this.transactions.remove(findStatement, function (err) { -// if (err) { -// debug(err); -// } -// if (callback) { callback(err); } -// }); -// }, - -// getPendingTransactions: function (callback) { -// var self = this; -// this.transactions.find({}).toArray(function (err, txs) { -// if (err) { -// debug(err); -// return callback(err); -// } - -// if (txs.length === 0) { -// return callback(null, txs); -// } - -// var goodTxs = []; - -// async.map(txs, function (tx, clb) { -// var findStatement = { commitId: tx._id, aggregateId: tx.aggregateId }; - -// if (tx.aggregate) { -// findStatement.aggregate = tx.aggregate; -// } - -// if (tx.context) { -// findStatement.context = tx.context; -// } - -// self.events.findOne(findStatement, function (err, evt) { -// if (err) { -// return clb(err); -// } - -// if (evt) { -// goodTxs.push(evt); -// } else { -// self.transactions.remove({ _id: tx._id }, function (err) { -// if (err) { -// debug(err); -// } -// }); -// } - -// clb(null); -// }); -// }, function (err) { -// if (err) { -// debug(err); -// return callback(err); -// } - -// callback(null, goodTxs); -// }) -// }); -// }, - -// getLastEvent: function (query, callback) { -// if (!query.aggregateId) { -// var errMsg = 'aggregateId not defined!'; -// debug(errMsg); -// if (callback) callback(new Error(errMsg)); -// return; -// } - -// var findStatement = { aggregateId: query.aggregateId }; - -// if (query.aggregate) { -// findStatement.aggregate = query.aggregate; -// } - -// if (query.context) { -// findStatement.context = query.context; -// } - -// this.events.findOne(findStatement, { sort: [['commitStamp', 'desc'], ['streamRevision', 'desc'], ['commitSequence', 'desc']] }, callback); -// }, - -// repairFailedTransaction: function (lastEvt, callback) { -// var self = this; - -// //var findStatement = { -// // aggregateId: lastEvt.aggregateId, -// // 'events.streamRevision': lastEvt.streamRevision + 1 -// //}; -// // -// //if (lastEvt.aggregate) { -// // findStatement.aggregate = lastEvt.aggregate; -// //} -// // -// //if (lastEvt.context) { -// // findStatement.context = lastEvt.context; -// //} - -// //this.transactions.findOne(findStatement, function (err, tx) { -// this.transactions.findOne({ _id: lastEvt.commitId }, function (err, tx) { -// if (err) { -// debug(err); -// return callback(err); -// } - -// if (!tx) { -// var err = new Error('missing tx entry for aggregate ' + lastEvt.aggregateId); -// debug(err); -// return callback(err); -// } - -// var missingEvts = tx.events.slice(tx.events.length - lastEvt.restInCommitStream); - -// self.events.insert(missingEvts, function (err) { -// if (err) { -// debug(err); -// return callback(err); -// } - -// self.removeTransactions(lastEvt); - -// callback(null); -// }); -// }); -// } + this.client.create({ + index: this.options.indexName, + type: this.options.snapshotsTypeName, + id: snap.id, + body: snap, + refresh: true + }, function (error, response) { + if (callback) callback(error); + }); + }, + + getSnapshot: function (query, revMax, callback) { + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var findStatement = ['aggregateId:' + query.aggregateId]; + + if (query.context) findStatement.push('aggregate:' + query.aggregate); + if (query.aggregate) findStatement.push('context:' + query.context); + if (revMax > -1) findStatement.push('revision:[* TO ' + revMax + ']'); + + this._searchSnapshots(findStatement, 0, 1, function(error, response){ + const snap = response && response.length ? response[0] : null; + if (callback) callback(null, snap); + }); + } }); From f53c559feb402f3340057df83b66c164d565a0a6 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Thu, 28 Jan 2016 10:33:22 +0100 Subject: [PATCH 06/20] elasticsearch store passing all tests --- lib/databases/elasticsearch.js | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index a28fefd1..acd5ab3e 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -7,8 +7,6 @@ var util = require('util'), uuid = require('uuid'), elasticsearch = Store.use('elasticsearch'), elasticsearchVersion = Store.use('elasticsearch/package.json').version, - // isNew = mongoVersion.indexOf('1.') !== 0, - // ObjectID = isNew ? mongo.ObjectID : mongo.BSONPure.ObjectID, debug = require('debug')('eventstore:store:elasticsearch'); function Elastic(options) { @@ -22,7 +20,8 @@ function Elastic(options) { indexName: 'eventstore', eventsTypeName: 'events', snapshotsTypeName: 'snapshots', - log: 'warning' + log: 'warning', + maxSearchResults: 10000 }; _.defaults(options, defaults); @@ -106,19 +105,28 @@ _.extend(Elastic.prototype, { }, _search: function (type, find, sort, skip, limit, callback) { + var options = this.options; const searchOptions = { index: this.options.indexName, type: type, - q: find.join(' AND '), - sort: sort, defaultOperator: 'AND', from: (!skip ? 0 : skip), - size: (!limit || limit === -1 ? 10000 : limit) + size: (!limit || limit === -1 ? options.maxSearchResults : limit) }; + if (find && find.length) searchOptions.q = find.join(' AND '); + if (sort && sort.length) searchOptions.sort = sort; + this.client.search(searchOptions, function (error, response) { var dataList = []; if (response && response.hits && response.hits.hits && response.hits.hits.length) { + if (response.hits.hits.length >= options.maxSearchResults){ + var errMsg = 'reached maximum of ' + options.maxSearchResults + ' search results!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } dataList = response.hits.hits.map((data) => { + data._source.commitStamp = new Date(data._source.commitStamp); return data._source; }); } @@ -224,8 +232,8 @@ _.extend(Elastic.prototype, { var findStatement = ['aggregateId:' + query.aggregateId]; - if (query.context) findStatement.push('aggregate:' + query.aggregate); - if (query.aggregate) findStatement.push('context:' + query.context); + if (query.aggregate) findStatement.push('aggregate:' + query.aggregate); + if (query.context) findStatement.push('context:' + query.context); if (revMax > -1) findStatement.push('revision:[* TO ' + revMax + ']'); this._searchSnapshots(findStatement, 0, 1, function(error, response){ From 715e81b7072a3400c45ae69eedf791340647615d Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Thu, 28 Jan 2016 10:34:10 +0100 Subject: [PATCH 07/20] restored tests of other database types --- test/eventstoreTest.js | 2 +- test/storeTest.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/eventstoreTest.js b/test/eventstoreTest.js index b46751ee..fac498da 100644 --- a/test/eventstoreTest.js +++ b/test/eventstoreTest.js @@ -800,7 +800,7 @@ describe('eventstore', function () { describe('with options containing a type property with the value of', function () { - var types = [/*'inmemory', */'elasticsearch'/*, 'mongodb', 'tingodb', 'redis'/*, 'azuretable'*/]; + var types = ['inmemory', 'elasticsearch', 'mongodb', 'tingodb', 'redis', 'elasticsearch'/*, 'azuretable'*/]; types.forEach(function (type) { diff --git a/test/storeTest.js b/test/storeTest.js index 63154074..a3c716f3 100644 --- a/test/storeTest.js +++ b/test/storeTest.js @@ -3,7 +3,7 @@ var expect = require('expect.js'), async = require('async'), _ = require('lodash'); -var types = [/*'inmemory', */'elasticsearch'/*, 'mongodb', 'tingodb', 'redis'/*, 'azuretable'*/]; +var types = ['inmemory', 'elasticsearch', 'mongodb', 'tingodb', 'redis', 'elasticsearch'/*, 'azuretable'*/]; types.forEach(function (type) { From 288749838893430d8b2f11ba84dadd2a41617cd0 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Thu, 28 Jan 2016 10:35:26 +0100 Subject: [PATCH 08/20] dont test elasticsearch twice --- test/eventstoreTest.js | 2 +- test/storeTest.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/eventstoreTest.js b/test/eventstoreTest.js index fac498da..03b8f0ec 100644 --- a/test/eventstoreTest.js +++ b/test/eventstoreTest.js @@ -800,7 +800,7 @@ describe('eventstore', function () { describe('with options containing a type property with the value of', function () { - var types = ['inmemory', 'elasticsearch', 'mongodb', 'tingodb', 'redis', 'elasticsearch'/*, 'azuretable'*/]; + var types = ['inmemory', 'mongodb', 'tingodb', 'redis', 'elasticsearch'/*, 'azuretable'*/]; types.forEach(function (type) { diff --git a/test/storeTest.js b/test/storeTest.js index a3c716f3..dfdca0f9 100644 --- a/test/storeTest.js +++ b/test/storeTest.js @@ -3,7 +3,7 @@ var expect = require('expect.js'), async = require('async'), _ = require('lodash'); -var types = ['inmemory', 'elasticsearch', 'mongodb', 'tingodb', 'redis', 'elasticsearch'/*, 'azuretable'*/]; +var types = ['inmemory', 'mongodb', 'tingodb', 'redis', 'elasticsearch'/*, 'azuretable'*/]; types.forEach(function (type) { From 5de4cd34327cac3a2c9b2bf8939d5afb4b365a30 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Thu, 28 Jan 2016 10:46:44 +0100 Subject: [PATCH 09/20] updated readme to include elasticsearch --- README.md | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index efe3385a..993ea0c7 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ The project goal is to provide an eventstore implementation for node.js: - load and store events via EventStream object - event dispatching to your publisher (optional) -- supported Dbs (inmemory, mongodb, redis, tingodb, azuretable) +- supported Dbs (inmemory, mongodb, redis, tingodb, elasticsearch, azuretable) - snapshot support - query your events @@ -81,6 +81,19 @@ example with tingodb: timeout: 10000 // optional }); +example with elasticsearch: + + var es = require('eventstore')({ + type: 'elasticsearch', + host: 'localhost', // optional + port: 9200, // optional + indexName: 'eventstore', // optional + eventsTypeName: 'events', // optional + snapshotsTypeName: 'snapshots', // optional + log: 'warning', // optional + maxSearchResults: 10000 // optional + }); + example with azuretable: var es = require('eventstore')({ From e2942c258cf883d452b5d67f809b7b972cd85056 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Thu, 28 Jan 2016 10:53:07 +0100 Subject: [PATCH 10/20] use localhost as default --- README.md | 3 +-- lib/databases/elasticsearch.js | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 993ea0c7..3199bc6a 100644 --- a/README.md +++ b/README.md @@ -85,8 +85,7 @@ example with elasticsearch: var es = require('eventstore')({ type: 'elasticsearch', - host: 'localhost', // optional - port: 9200, // optional + host: 'localhost:9200', // optional indexName: 'eventstore', // optional eventsTypeName: 'events', // optional snapshotsTypeName: 'snapshots', // optional diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index acd5ab3e..a42c0da8 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -15,8 +15,7 @@ function Elastic(options) { Store.call(this, options); var defaults = { - host: '52.58.4.8', - port: 9200, + host: 'localhost:9200', indexName: 'eventstore', eventsTypeName: 'events', snapshotsTypeName: 'snapshots', @@ -44,7 +43,7 @@ _.extend(Elastic.prototype, { connect: function (callback) { var options = this.options; - this.client = new elasticsearch.Client({host: options.host+':'+options.port, log: options.log}); + this.client = new elasticsearch.Client({host: options.host, log: options.log}); this.emit('connect'); if (callback) callback(null); }, From 24fc25ed65453620f29b545120e8a904156931f7 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Thu, 28 Jan 2016 13:16:02 +0100 Subject: [PATCH 11/20] remove unused import --- lib/databases/elasticsearch.js | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index a42c0da8..30913553 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -3,7 +3,6 @@ var util = require('util'), Store = require('../base'), _ = require('lodash'), - async = require('async'), uuid = require('uuid'), elasticsearch = Store.use('elasticsearch'), elasticsearchVersion = Store.use('elasticsearch/package.json').version, From 26294d3031928ce11607978ff60e44c5f72ed6df Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Fri, 29 Jan 2016 17:16:27 +0100 Subject: [PATCH 12/20] add getLastEvent --- lib/databases/elasticsearch.js | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index 30913553..c2bf0a35 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -14,7 +14,7 @@ function Elastic(options) { Store.call(this, options); var defaults = { - host: 'localhost:9200', + host: '52.58.4.8:9200', indexName: 'eventstore', eventsTypeName: 'events', snapshotsTypeName: 'snapshots', @@ -149,6 +149,24 @@ _.extend(Elastic.prototype, { this._searchEvents(findStatement, skip, limit, callback); }, + getLastEvent: function (query, callback) { + if (!query.aggregateId) { + var errMsg = 'aggregateId not defined!'; + debug(errMsg); + if (callback) callback(new Error(errMsg)); + return; + } + + var findStatement = [ 'aggregateId:' + query.aggregateId ]; + if (query.aggregate) findStatement.push('aggregate:' + query.aggregate); + if (query.context) findStatement.push('context:' + query.context); + + this._search(this.options.eventsTypeName, findStatement, ['commitStamp:desc', 'streamRevision:desc', 'commitSequence:desc'], 0, 1, function(error, response){ + const event = response && response.length ? response[0] : null; + if (callback) callback(null, event); + }); + }, + getEventsSince: function (date, skip, limit, callback) { var findStatement = ['commitStamp:[' + date.toJSON() + ' TO *]']; From 975af33d05b9e94cf0393945880d270fc0e7e3e2 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Mon, 1 Feb 2016 11:54:48 +0100 Subject: [PATCH 13/20] use localhost as default --- lib/databases/elasticsearch.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index c2bf0a35..c228193d 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -14,7 +14,7 @@ function Elastic(options) { Store.call(this, options); var defaults = { - host: '52.58.4.8:9200', + host: 'localhost:9200', indexName: 'eventstore', eventsTypeName: 'events', snapshotsTypeName: 'snapshots', From dfc140a652a92d478ee1380764b040a16dc0a533 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Mon, 1 Feb 2016 12:25:47 +0100 Subject: [PATCH 14/20] add elasticsearch to travis.yml --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 8decc815..35c0dbed 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ sudo: false services: - mongodb - redis-server + - elasticsearch # - couchdb language: node_js From e1061fc03b3d861f0ddda3389351d0e5374e1fb9 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Mon, 1 Feb 2016 12:28:01 +0100 Subject: [PATCH 15/20] add the elasticsearch branch to travis.yml --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 35c0dbed..ad54aba6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,6 +15,7 @@ node_js: branches: only: - master + - elasticsearch notifications: email: From 7fb4354f2be266db9a286d5b21313e4f1fb3298e Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Mon, 1 Feb 2016 12:31:28 +0100 Subject: [PATCH 16/20] use var for search options --- lib/databases/elasticsearch.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index c228193d..ac6c2ca6 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -104,7 +104,7 @@ _.extend(Elastic.prototype, { _search: function (type, find, sort, skip, limit, callback) { var options = this.options; - const searchOptions = { + var searchOptions = { index: this.options.indexName, type: type, defaultOperator: 'AND', From e44bd693205d315f3d9af0ecb5db87de0c95a931 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Mon, 1 Feb 2016 12:33:30 +0100 Subject: [PATCH 17/20] removed the fat arrow --- lib/databases/elasticsearch.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index ac6c2ca6..41137f7b 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -123,7 +123,7 @@ _.extend(Elastic.prototype, { if (callback) callback(new Error(errMsg)); return; } - dataList = response.hits.hits.map((data) => { + dataList = response.hits.hits.map(function (data) { data._source.commitStamp = new Date(data._source.commitStamp); return data._source; }); From 2fa2e633e8239d2fb588bce9849368de3267ee46 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Mon, 1 Feb 2016 12:35:48 +0100 Subject: [PATCH 18/20] remove two more consts --- lib/databases/elasticsearch.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/databases/elasticsearch.js b/lib/databases/elasticsearch.js index 41137f7b..77acd1ba 100644 --- a/lib/databases/elasticsearch.js +++ b/lib/databases/elasticsearch.js @@ -162,7 +162,7 @@ _.extend(Elastic.prototype, { if (query.context) findStatement.push('context:' + query.context); this._search(this.options.eventsTypeName, findStatement, ['commitStamp:desc', 'streamRevision:desc', 'commitSequence:desc'], 0, 1, function(error, response){ - const event = response && response.length ? response[0] : null; + var event = response && response.length ? response[0] : null; if (callback) callback(null, event); }); }, @@ -253,7 +253,7 @@ _.extend(Elastic.prototype, { if (revMax > -1) findStatement.push('revision:[* TO ' + revMax + ']'); this._searchSnapshots(findStatement, 0, 1, function(error, response){ - const snap = response && response.length ? response[0] : null; + var snap = response && response.length ? response[0] : null; if (callback) callback(null, snap); }); } From 77868e02f9a51c5ea606515cbad5ea497dcae53f Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Mon, 1 Feb 2016 12:37:56 +0100 Subject: [PATCH 19/20] remove elasticsearch branch again from travis.yml --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index ad54aba6..35c0dbed 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,6 @@ node_js: branches: only: - master - - elasticsearch notifications: email: From 123477afe572e94c1910041d71f5253ba4ac5929 Mon Sep 17 00:00:00 2001 From: Gerben Meyer Date: Mon, 1 Feb 2016 12:43:10 +0100 Subject: [PATCH 20/20] add elasticsearch keyword --- package.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index d5f05b66..68d84514 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,8 @@ "tingodb", "azure", "azuretable", - "inmemory" + "inmemory", + "elasticsearch" ], "main": "./index.js", "directories": {