forked from onebeyond/confabulous-vault-loader
-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
134 lines (124 loc) · 5.45 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
var debug = require('debug')('confabulous:loaders:vault')
var EventEmitter = require('events').EventEmitter
var request = require('request')
var async = require('async')
var duration = require('parse-duration')
var merge = require('lodash.merge')
var contains = require('lodash.contains')
var format = require('util').format
var crypto = require('crypto')
module.exports = function (_options, postProcessors) {
if (_options && _options.path && !_options.paths) {
_options.paths = [_options.path]
}
var options = merge({}, { mandatory: true, keepPaths: false, watch: false, request: { json: true, timeout: 10000, gzip: true, forever: false, headers: {} } }, _options)
var exists = false
var allowedResponseCodes = [200].concat(options.mandatory ? [] : 404)
var checksums = new Map()
var emitter = new EventEmitter()
return function (confabulous, cb) {
debug('running')
setImmediate(function () {
async.waterfall([validate, authenticate, watch, load], function (err, results) {
if (err) return cb(err)
results = Object.assign({}, ...results)
if (!options.keepPaths) {
results = Object.assign({}, ...Object.values(results))
}
async.seq.apply(async, postProcessors)(results, cb)
})
})
return emitter
function validate (cb) {
debug('validate: %s', JSON.stringify(options))
if (options.mandatory && !options.url) return cb(new Error('url is required'))
if (options.mandatory && !options.paths) return cb(new Error('paths is required'))
if (options.mandatory && !options.method) return cb(new Error('method is required'))
if (options.method === 'app-id' && !options.appId) return cb(new Error('appId is required'))
if (options.method === 'app-id' && !options.userId) return cb(new Error('userId is required'))
if (options.method === 'token' && !options.token) return cb(new Error('token is required'))
if (options.watch && !options.watch.interval) return cb(new Error('watch interval is required'))
cb(!options.url || !options.paths || !options.method) // eslint-disable-line
}
function authenticate (cb) {
debug('authenticate: method=%s url=%s', options.method, options.url)
switch (options.method) {
case 'app-id': {
var loginUrl = options.url + '/v1/auth/app-id/login'
post({ url: loginUrl, body: { app_id: options.appId, user_id: options.userId } }, function (err, res, body) {
if (err) return cb(err)
if (res.statusCode !== 200) return cb(new Error(format('%s returned %d', options.url, res.statusCode)))
options.request.headers['X-Vault-Token'] = body.auth.client_token
cb()
})
break
}
case 'token': {
options.request.headers['X-Vault-Token'] = typeof options.token === 'function' ? options.token() : options.token
cb()
break
}
default: {
cb(new Error(format('Unsupported authentication method: %s', options.method)))
}
}
}
function watch (cb) {
async.each(options.paths, (path, next) => {
var configUrl = options.url + '/v1/' + path
debug('watch: %s, interval:%s', configUrl, options.watch.interval)
if (!options.watch) return next()
var watcher = setInterval(function () {
debug('checking for changes to: %s', options.url)
authenticate(function (err) {
if (err) return emitter.error(err)
get({ url: configUrl }, function (err, res) {
if (!watcher) return
if (err) return emitter.emit('error', err)
if (!contains(allowedResponseCodes, res.statusCode)) return emitter.emit('error', new Error(format('%s returned %d', configUrl, res.statusCode)))
if ((res.statusCode === 404 && exists) || (res.statusCode === 200 && (!exists || isModified(res, path)))) emitter.emit('change')
})
})
}, duration(options.watch.interval))
watcher.unref()
confabulous.on('reloading', function () {
clearInterval(watcher)
watcher = null
})
return next()
}, cb)
}
function load (cb) {
async.map(options.paths, (path, next) => {
var configUrl = options.url + '/v1/' + path
debug('load: %s', configUrl)
exists = false
get({ url: configUrl }, function (err, res, body) {
if (err) return next(err)
if (!contains(allowedResponseCodes, res.statusCode)) return next(new Error(format('%s returned %d', configUrl, res.statusCode)))
if (res.statusCode === 404) return next(null, { [path]: [] })
exists = true
checksums.set(path, generateChecksum(res.body.data))
next(err, { [path]: body.data })
})
}, cb)
}
function get (args, cb) {
if (arguments.length === 1) return get({}, arguments[0])
request(merge({ method: 'GET' }, options.request, args), cb)
}
function post (args, cb) {
if (arguments.length === 1) return get({}, arguments[0])
request(merge({ method: 'POST' }, options.request, args), cb)
}
function isModified (res, path) {
var newChecksum = generateChecksum(res.body.data)
var modified = checksums.get(path) !== newChecksum
checksums.set(path, newChecksum)
return modified
}
function generateChecksum (data) {
return crypto.createHash('md5').update(JSON.stringify(data)).digest('hex')
}
}
}