Skip to content

Commit

Permalink
moved the courier to the same level as kibana, being that it really i…
Browse files Browse the repository at this point in the history
…s external to the angular app
  • Loading branch information
Spencer Alger committed Feb 12, 2014
1 parent 8e3aee4 commit 082426a
Show file tree
Hide file tree
Showing 16 changed files with 406 additions and 260 deletions.
12 changes: 4 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,18 @@
"main": "Gulpfile.js",
"dependencies": {},
"devDependencies": {
"connect": "~2.12.0",
"gulp": "~3.5.2",
"lodash": "~2.4.1",
"expect.js": "~0.2.0",
"gulp-jshint": "git://github.com/spenceralger/gulp-jshint.git#relative_jshintrc",
"jshint-stylish": "~0.1.5",
"grunt": "~0.4.2",
"grunt-contrib-connect": "~0.6.0",
"grunt-contrib-jshint": "~0.8.0",
"mocha": "~1.17.1",
"grunt-mocha": "~0.4.10",
"load-grunt-config": "~0.7.0",
"grunt-mocha": "~0.4.10"
"mocha": "~1.17.1"
},
"scripts": {
"test": "gulp test",
"server": "gulp server"
"test": "grunt test",
"server": "grunt server"
},
"repository": {
"type": "git",
Expand Down
50 changes: 50 additions & 0 deletions scratch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* jshint node: true */
var elasticsearch = require('../elasticsearch-js');
var async = require('async');

var es = new elasticsearch.Client({
host: 'localhost:9200',
sniffOnStart: true,
sniffInterval: 3000,
apiVersion: '1.0',
log: 'trace'
});

var rl = require('readline').createInterface({
input: process.stdin,
output: process.stdout,
terminal: true
});

async.series([
function (done) {
setTimeout(done, 50);
},
function (done) {
console.log(es.transport.connectionPool._conns.index);
es.indices.create({
index: 'index_name'
}, done);
},
function (done) {
rl.question('Is the master down?', function () {
done();
});
},
function (done) {
console.log(es.transport.connectionPool._conns.index);
es.search({ index: 'index_name' }, done);
},
function (done) {
rl.question('Is the slave down now?', function () {
es.search({ body: { query: { match_all: {} } } }, done);
});
},
function (done) {
rl.question('Is the master back up?', function () {
es.search({ body: { query: { match_all: {} } } }, done);
});
}
], function (err) {
console.log(err);
});
229 changes: 229 additions & 0 deletions src/courier/courier.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
define(function (require) {

var DataSource = require('courier/data_source');
var EventEmitter = require('utils/event_emitter');
var inherits = require('utils/inherits');
var errors = require('courier/errors');
var _ = require('lodash');
var angular = require('angular');

function chain(cntx, method) {
return function () {
method.apply(cntx, arguments);
return this;
};
}

function mergeProp(state, filters, val, key) {
switch (key) {
case 'inherits':
// ignore
return;
case 'filter':
filters.push(val);
return;
case 'index':
case 'type':
if (key && state[key] == null) {
state[key] = val;
}
return;
case 'source':
key = '_source';
/* fall through */
}

if (key && state.body[key] == null) {
state.body[key] = val;
}
}

function flattenDataSource(source) {
var state = {
body: {}
};

// all of the filters from the source chain
var filters = [];

var collectProp = _.partial(mergeProp, state, filters);

// walk the chain and merge each property
var current = source;
var currentState;
while (current) {
currentState = current._state();
_.forOwn(currentState, collectProp);
current = currentState.inherits;
}

// defaults for the query
_.forOwn({
query: {
'match_all': {}
}
}, collectProp);

// switch to filtered query if there are filters
if (filters.length) {
state.body.query = {
filtered: {
query: state.body.query,
filter: {
bool: {
must: filters
}
}
}
};
}

return state;
}

function fetch(client, sources, cb) {
if (!client) {
this.emit('error', new Error('Courier does not have a client yet, unable to fetch queries.'));
return;
}

var all = [];
var body = '';
_.each(sources, function (source) {
all.push(source);

var state = flattenDataSource(source);
var header = JSON.stringify({
index: state.index,
type: state.type
});
var doc = JSON.stringify(state.body);

body += header + '\n' + doc + '\n';
});

return client.msearch({ body: body }, function (err, resp) {
if (err) return cb(err);

_.each(resp.responses, function (resp, i) {
sources[i].emit('results', resp);
});

cb(err, resp);
});
}

/**
* Federated query service, supports data sources that inherit properties
* from one another and automatically emit results.
* @param {object} config
* @param {Client} config.client - The elasticsearch.js client to use for querying. Should be setup and ready to go.
* @param {integer} [config.fetchInterval=30000] - The amount in ms between each fetch (deafult is 30 seconds)
*/
function Courier(config) {
if (!(this instanceof Courier)) return new Courier(config);
var opts = {
fetchInterval: 30000
};
var fetchTimer;
var activeRequest;

var sources = [];

function doFetch() {
if (!opts.client) {
this.emit('error', new Error('Courier does not have a client, pass it ' +
'in to the constructor or set it with the .client() method'));
return;
}
if (activeRequest) {
activeRequest.abort();
stopFetching();
this.emit('error', new errors.HastyRefresh());
return;
}

// we need to catch the original promise in order to keep it's abort method
activeRequest = fetch(opts.client, sources, function (err, resp) {
activeRequest = null;
setFetchTimeout();

if (err) {
window.console && console.log(err);
}
});
}

function setFetchTimeout() {
clearTimeout(fetchTimer);
if (opts.fetchInterval) {
fetchTimer = setTimeout(doFetch, opts.fetchInterval);
} else {
fetchTimer = null;
}
}

function stopFetching() {
clearTimeout(fetchTimer);
}

function startFetchingSource(source) {
var existing = _.find(sources, { source: source });
if (existing) return false;

sources.push(source);
}

function stopFetchingSource(source) {
var i = sources.indexOf(source);
if (i !== -1) {
sources.slice(i, 1);
}
if (sources.length === 0) stopFetching();
}

// is there a scheduled request?
function isStarted() {
return !!fetchTimer;
}

// chainable public api
this.isStarted = chain(this, isStarted);
this.start = chain(this, doFetch);
this.startFetchingSource = chain(this, startFetchingSource);
this.stop = chain(this, stopFetching);
this.stopFetchingSource = chain(this, stopFetchingSource);
this.close = chain(this, function stopFetchingAllSources() {
_.each(sources, stopFetchingSource);
});

// setter
this.client = chain(this, function (client) {
opts.client = client;
});

// setter/getter
this.fetchInterval = function (val) {
opts.fetchInterval = val;
if (isStarted()) setFetchTimeout();
return this;
};

// factory
this.createSource = function (state) {
return new DataSource(this, state);
};

// apply the passed in config
_.each(config || {}, function (val, key) {
if (typeof this[key] !== 'function') throw new TypeError('invalid config "' + key + '"');
this[key](val);
}, this);
}

// private api, exposed for testing
Courier._flattenDataSource = flattenDataSource;
inherits(Courier, EventEmitter);

return Courier;
});
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ define(function (require) {
'aggs',
'from',
'size',
'source',
'inherits'
];

Expand Down Expand Up @@ -81,13 +82,13 @@ define(function (require) {
return _.keys(mapping);
});
};
this.extend = function () {
return courier.createSource().inherits(this);
};

// get/set internal state values
optionNames.forEach(function chainableOptions(name) {
optionNames.forEach(function (name) {
this[name] = function (val) {
if (val === void 0) {
return state[name];
}
state[name] = val;
return this;
};
Expand Down
12 changes: 12 additions & 0 deletions src/courier/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
define(function (require) {
function HastyRefresh() {
this.name = 'HastyRefresh';
this.message = 'Courier attempted to start a query before the previous had finished.';
}
HastyRefresh.prototype = new Error();
HastyRefresh.prototype.constructor = HastyRefresh;

return {
HastyRefresh: HastyRefresh
};
});
File renamed without changes.
2 changes: 2 additions & 0 deletions src/courier/test.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
<courier-test type="apache" fields="extension,response,request"></courier-test>
<courier-test type="nginx" fields=""></courier-test>
4 changes: 3 additions & 1 deletion src/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
<script>require(['main'], function () {});</script>
</head>
<body>
<div ng-view></div>
<div ng-controller="Kibana">
<div ng-view></div>
</div>
</body>
</html>
Loading

0 comments on commit 082426a

Please sign in to comment.