-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
125 lines (99 loc) · 3.68 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
'use strict';
var EventEmitter = require('events').EventEmitter
, utils = require('util')
, fs = require('fs')
, path = require('path')
, doT = require('dot')
, pg = require('pg')
, _ = require('mol-proto')
, installSQLtemplate = getInstallSQLtemplate();
var DEFAULTS = {
changesChannel: 'data_changes',
names: {
notifyTrigger: 'data_change_notify_trigger',
notifyTriggerFunc: 'data_change_notify_trigger_func',
addNotifyTrigger: 'add_notify_trigger_to_table',
removeNotifyTrigger: 'remove_notify_trigger_to_table'
},
operationEvents: true,
checkUpdates: true, // only notify if update changes record, can be set to false if record comparison is expensive
sendRecordId: false // send only record IDs, true - to send column 'id', string to send column with a given name
};
function PGObserver (opts) {
if (!(this instanceof PGObserver)) return new PGObserver(opts);
this.client = opts.client || new pg.Client(opts.conString);
if (opts.names) opts.names = _.extend(_.clone(DEFAULTS.names), opts.names);
opts = _.extend(_.clone(DEFAULTS), opts);
if (opts.sendRecordId === true) opts.sendRecordId = 'id';
this.options = opts;
};
utils.inherits(PGObserver, EventEmitter);
_.extendProto(PGObserver, {
install: install,
subscribe: subscribe,
unsubscribe: unsubscribe
});
module.exports = PGObserver;
/**
* Creates procedures in the database
* @param {Function} cb optional callback
*/
function install(cb) {
var installSQL = installSQLtemplate(this.options);
this.client.connect();
this.client.query(installSQL, cb);
}
/**
* Subscribes to changes in the table(s)
* @param {String|Array<String>} tables table name (or array of table names) to subscribe to
* @param {Function} cb optional callback
*/
function subscribe(tables, cb) {
var self = this;
var sql = getSQL.call(this, tables, addTrigger);
this.client.query(sql, function (err, data) {
if (err) return cb(err, data);
if (!self._listening) {
var sql = 'LISTEN ' + self.options.changesChannel;
self.client.query(sql, function (err, data) {
if (err) return cb(err, data);
self.client.on('notification', function (data) {
if (typeof data == 'string') data = JSON.parse(data);
data = JSON.parse(data.payload);
self.emit(data.table, data);
if (self.options.operationEvents)
self.emit(data.table + ':' + data.operation.toLowerCase(), data);
});
self._listening = true;
cb(null, data);
});
} else
cb(null, data);
});
}
/**
* Unsubscribes from changes in the table(s)
* @param {String|Array<String>} tables table name (or array of table names) to unsubscribe from
* @param {Function} cb optional callback
*/
function unsubscribe(tables, cb) {
var sql = getSQL.call(this, tables, removeTrigger);
this.client.query(sql, cb);
}
function getInstallSQLtemplate() {
var str = fs.readFileSync(path.join(__dirname + '/install.sql'))
, options = _.clone(doT.templateSettings);
options.strip = false;
return doT.template(str, options);
}
function getSQL(tables, func) {
return Array.isArray(tables)
? tables.map(func, this).join(' ')
: func.call(this, tables);
}
function addTrigger(table) {
return 'SELECT ' + this.options.names.addNotifyTrigger + "('" + table + "');";
}
function removeTrigger(table) {
return 'SELECT ' + this.options.names.removeNotifyTrigger + "('" + table + "');";
}