-
-
Notifications
You must be signed in to change notification settings - Fork 3
/
app.js
131 lines (119 loc) · 4.82 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
require('dotenv').config()
const mqtt = require('mqtt');
const mqttClient = mqtt.connect(process.env.MQTT_BROKER_ADDRESS);
const { Pool } = require('pg')
const pm2 = require('pm2');
const pool = new Pool()
pm2.launchBus((err, bus) => {
console.log('connected', bus);
bus.on('process:exception', function(data) {
console.log(arguments);
});
bus.on('log:err', function(data) {
console.log('logged error',arguments);
});
bus.on('reconnect attempt', function() {
console.log('Bus reconnecting');
});
bus.on('close', function() {
console.log('Bus closed');
});
});
mqttClient.on('connect', () => {
console.log("Connected!");
mqttClient.subscribe('#');
});
mqttClient.on('message', (topic, message) => {
let json = JSON.parse(message.toString());
if (topic === 'esp32/battery') {
let voltage = json.voltage.substring(0, json.voltage.length - 1); // Remove the trailing 'V'
let state_of_charge = json.state_of_charge.substring(0, json.state_of_charge.length - 1); // Remove the trailing '%'
insertBattery(json.datetime, voltage, state_of_charge, json.mac).catch(error => console.error(error));
} else if (topic === 'esp32/temperature') {
insertTemperature(json.datetime, json.temperature_0, 0, json.mac);
insertTemperature(json.datetime, json.temperature_1, 1, json.mac);
} else if (topic === 'esp32/heap') {
insertHeap(parseInt(json.free_heap_size), parseInt(json.low_water_mark), json.mac).catch(error => console.error(error));
} else if (topic === 'esp32/2/battery') {
let voltage = json.voltage.substring(0, json.voltage.length - 1); // Remove the trailing 'V'
let state_of_charge = json.state_of_charge.substring(0, json.state_of_charge.length - 1); // Remove the trailing '%'
insertBattery(json.datetime, voltage, state_of_charge, json.mac).catch(error => console.error(error));
} else if (topic === 'esp32/2/temperature') {
insertTemperature(json.datetime, json.temperature_0, 0, json.mac);
insertTemperature(json.datetime, json.temperature_1, 1, json.mac);
} else if (topic === 'esp32/2/heap') {
insertHeap(parseInt(json.free_heap_size), parseInt(json.low_water_mark), json.mac).catch(error => console.error(error));
} else if (topic === 'esp32/3/thermocouple') {
insertThermocouple(parseInt(json.cold_junction), parseInt(json.temperature), json.mac);
} else {
console.log("Unexpected message: " + topic + ": " + message);
}
});
async function insertBattery(time, voltage, stateOfCharge, mac) {
let client;
try {
client = await pool.connect()
} catch (e) {
console.error("Failed to connect to database pool due to: " + e.message)
return;
}
try {
const result = await client.query('INSERT INTO battery(voltage, state_of_charge, client_time, mac) VALUES($1, $2, $3, $4);', [voltage, stateOfCharge, time, mac]);
if (result.rowCount != 1) {
console.warn("Row count was not equal to 1 in: ", result);
}
} finally {
client.release()
}
}
async function insertTemperature(time, temperature, sensorNumber, mac) {
let client;
try {
client = await pool.connect()
} catch (e) {
console.error("Failed to connect to database pool due to: " + e.message)
return;
}
try {
const result = await client.query('INSERT INTO temperature(client_time, temperature, sensor, mac) VALUES($1, $2, $3, $4);', [time, temperature, sensorNumber, mac]);
if (result.rowCount != 1) {
console.warn("Row count was not equal to 1 in: ", result);
}
} finally {
client.release()
}
}
async function insertHeap(free_heap, low_water_mark, mac) {
let client;
try {
client = await pool.connect()
} catch (e) {
console.error("Failed to connect to database pool due to: " + e.message)
return;
}
try {
const result = await client.query('INSERT INTO heap(free_heap, low_water_mark, mac) VALUES($1, $2, $3);', [free_heap, low_water_mark, mac]);
if (result.rowCount != 1) {
console.warn("Row count was not equal to 1 in: ", result);
}
} finally {
client.release()
}
}
async function insertThermocouple(cold_junction, temperature, mac) {
let client;
try {
client = await pool.connect()
} catch (e) {
console.error("Failed to connect to database pool due to: " + e.message)
return;
}
try {
const result = await client.query('INSERT INTO thermocouple(cold_junction, temperature, mac) VALUES($1, $2, $3);', [cold_junction, temperature, mac]);
if (result.rowCount != 1) {
console.warn("Row count was not equal to 1 in: ", result);
}
} finally {
client.release()
}
}