Skip to content

Commit

Permalink
Index alerts (#18)
Browse files Browse the repository at this point in the history
Create an index of all alerts published over MQTT and their current
state. Make this information available over API endpoints.
  • Loading branch information
AlexGodbehere authored Jan 8, 2024
2 parents 947d6da + 60cc33a commit eca2083
Show file tree
Hide file tree
Showing 10 changed files with 318 additions and 24 deletions.
5 changes: 0 additions & 5 deletions config.mk

This file was deleted.

65 changes: 59 additions & 6 deletions lib/api_v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import url from "node:url";
import express from "express";
import OpenApiValidator from "express-openapi-validator";
import {Perm} from "./uuids.js";
import {Perm, Special} from "./uuids.js";

function fqdn_split (fqdn) {
return fqdn.match(/([^.]*)\.(.*)/).slice(1);
Expand Down Expand Up @@ -39,11 +39,11 @@ export default class API {
let api = this.routes;

/* Validate against the spec */
const spec = url.fileURLToPath(new URL("../api/openapi.yaml", import.meta.url));
console.log(`Loading OpenAPI spec from ${spec}...`);
api.use(OpenApiValidator.middleware({
apiSpec: spec,
}));
//const spec = url.fileURLToPath(new URL("../api/openapi.yaml", import.meta.url));
//console.log(`Loading OpenAPI spec from ${spec}...`);
//api.use(OpenApiValidator.middleware({
// apiSpec: spec,
//}));

api.get("/search", this.search.bind(this));

Expand Down Expand Up @@ -73,6 +73,12 @@ export default class API {
.put(this.service_advert_put.bind(this));
/* Delete unimplemented for now. */
// .delete(this.service_advert_del.bind(this));

api.get("/alert", this.alert_list.bind(this, false));
api.get("/alert/active", this.alert_list.bind(this, true));
api.get("/alert/type/:type", this.alert_list.bind(this, false));
api.get("/alert/type/:type/active", this.alert_list.bind(this, true));
api.get("/alert/:uuid", this.alert_single.bind(this));
}

async search(req, res) {
Expand Down Expand Up @@ -266,4 +272,51 @@ export default class API {
uuid: uuid,
});
}

acl (principal) {
/* We can't use the library method as that is too specialised
* for the normal use case. So just do a straight fetch. */
return this.fplus.Auth.fetch({
url: "authz/acl",
query: { principal, permission: Perm.All },
});
}

async alert_list (active_only, req, res) {
const {type} = req.params;

const [st, acl] = await this.acl(req.auth);
if (st != 200)
return res.status(503).end();

const perm = p => acl.filter(a => a.permission == p)
.map(a => a.target);
const types = perm(Perm.Read_Alert_Type);
const devices = perm(Perm.Read_Device_Alerts);
const ifwild = l => l.includes(Special.Null) ? null : l;

const alerts = await this.model.alert_list({
type, active_only,
types: ifwild(types),
devices: ifwild(devices),
});
return res.status(200).json(alerts);
}

async alert_single (req, res) {
const {uuid} = req.params;
const auth = this.fplus.Auth;

const alrt = await this.model.alert_get(uuid);

/* We have to unhelpfully return 403 here to avoid exposing
* alert existence to unauthorised clients. */
if (alrt == null) return res.status(403).end();

/* XXX timing attack */
const ok = auth.check_acl(req.auth, Perm.Read_Alert_Type, alrt.type, true)
|| auth.check_acl(req.auth, Perm.Read_Device_Alerts, alrt.device, true);
if (!ok) return res.status(403).end();
return res.status(200).json(alrt);
}
}
13 changes: 13 additions & 0 deletions lib/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ export default class Model extends Queries {
for (const schema of opts.schemas)
await q.record_schema(sess, schema);

for (const alrt of opts.alerts)
await q.record_alert(devid, alrt);

if (opts.service) {
await q.record_service({
service: opts.service.uuid,
Expand All @@ -99,4 +102,14 @@ export default class Model extends Queries {

await this.txn(q => q.record_death(time, addr));
}

/* ALERTS */

update_alerts (updates) {
return this.txn(async q => {
for (const up of updates) {
await q.update_alert_active(...up);
}
});
}
}
108 changes: 98 additions & 10 deletions lib/mqttcli.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import timers from "timers/promises";
import async from "async";
import {Address, Debug, MetricBranch, MetricBuilder, MetricTree, SpB, Topic, UUIDs} from "@amrc-factoryplus/utilities";
import {Device_Info} from "./constants.js";
import { Schema } from "./uuids.js";

function sym_diff(one, two) {
const diff = new Set(one);
Expand All @@ -20,8 +21,28 @@ function sym_diff(one, two) {
return diff;
}

function map_get_or_create(map, key, factory) {
if (map.has(key)) return map.get(key);
const rv = factory();
map.set(key, rv);
return rv;
}

const debug = new Debug();

function for_each_metric(tree, action) {
for (const [name, metric] of Object.entries(tree)) {
if (metric instanceof MetricBranch) {
if (metric.$metric)
action(tree, name, metric.$metric)
for_each_metric(metric, action);
}
else {
action(tree, name, metric);
}
}
}

export default class MQTTCli {
constructor(opts) {
this.fplus = opts.fplus;
Expand All @@ -34,10 +55,12 @@ export default class MQTTCli {
this.address = Address.parse(opts.sparkplug_address);
this.seq = 0;

this.online = new Set();
this.rebirths = {
pending: {},
sent: {},
};
this.alerts = new Map();
}

async init() {
Expand Down Expand Up @@ -199,7 +222,7 @@ export default class MQTTCli {
}

on_queue_error(error, qitem) {
debug.log("mqtt", `Error handling ${qitem.topic}: ${error}`);
debug.log("mqtt", "Error handling %s: %o", qitem.topic, error);
}

on_notify(msg) {
Expand All @@ -212,6 +235,9 @@ export default class MQTTCli {
}

switch (table) {
case "alert":
this.on_alert_notify(id);
break;
case "session":
this.on_session_notify(id);
break;
Expand All @@ -223,6 +249,12 @@ export default class MQTTCli {
}
}

async on_alert_notify (id) {
const alrt = await this.model.alert_by_id(id);

this.publish_changed([["Alert_Type", alrt.type]]);
}

async on_session_notify(id) {
const session = await this.model.session_notification_info(id);
const schemas = await this.model.session_schemas(id);
Expand Down Expand Up @@ -289,6 +321,54 @@ export default class MQTTCli {
};
}

find_alerts(metrics, timestamp) {
const alerts = [];

for_each_metric(metrics, (branch, leaf, metric) => {
if (leaf != "Schema_UUID" || metric.value != Schema.Alert)
return;
alerts.push({
uuid: branch.Instance_UUID.value,
type: branch.Type.value,
metric: branch.Active.name,
active: branch.Active.value,
alias: branch.Active.alias,
stamp: new Date(metric.timestamp ?? timestamp),
});
});

debug.log("alerts", "Found alerts: %o", alerts);
return alerts;
}

record_alert_metrics(address, alerts) {
const metrics = map_get_or_create(
this.alerts, address,
() => ({ name: new Map(), alias: new Map() }));

for (const alrt of alerts.values()) {
metrics.name.set(alrt.metric, alrt.uuid);
if (alrt.alias != undefined) {
metrics.alias.set(alrt.alias, alrt.uuid);
}
}
}

handle_alert_updates(address, payload) {
const alerts = this.alerts.get(address);
if (!alerts) return;

const updates = payload.metrics.flatMap(m => {
const uuid = "alias" in m
? alerts.alias.get(m.alias) : alerts.name.get(m.name);
if (!uuid) return [];
const stamp = new Date(m.timestamp ?? payload.timestamp);
return [[uuid, m.value, stamp]];
});

this.model.update_alerts(updates);
}

publish_changed(changes) {
debug.log("change", "Publish changed: %o", changes);
this.publish("DATA", changes.map(
Expand All @@ -301,6 +381,8 @@ export default class MQTTCli {

async on_birth(address, payload) {
debug.log("device", `Registering BIRTH for ${address}`);

this.online.add(address);

let tree;
if (payload.uuid === UUIDs.FactoryPlus) {
Expand All @@ -310,13 +392,17 @@ export default class MQTTCli {
tree = {};
}

const alerts = this.find_alerts(tree, payload.timestamp);
this.record_alert_metrics(address, alerts);

await this.model.birth({
time: new Date(payload.timestamp ?? 0),
address,
uuid: tree.Instance_UUID?.value,
top_schema: tree.Schema_UUID?.value,
schemas: this.find_schemas(tree),
service: this.find_service(tree),
alerts,
});

debug.log("device", `Finished BIRTH for ${address}`);
Expand All @@ -327,6 +413,8 @@ export default class MQTTCli {

debug.log("device", `Registering DEATH for ${address}`);

this.alerts.delete(address);
this.online.delete(address);
await this.model.death({address, time});

debug.log("device", `Finished DEATH for ${address}`);
Expand Down Expand Up @@ -355,6 +443,9 @@ export default class MQTTCli {
* to copy Ignition's trick of rebirthing anyone I don't recognise.
*/
async on_data(addr, payload) {
/* Don't await this, allow it to run on its own */
this.handle_alert_updates(addr, payload);

if (this.silent) return;
if (!await this.do_we_rebirth(addr)) return;

Expand Down Expand Up @@ -391,23 +482,20 @@ export default class MQTTCli {
/* If we've got a pending rebirth, don't send another. */
if (pending[addr]) return false;

/* If we think this device is online, everything's OK. */
/* XXX This makes a database query for every DATA packet. This
* will be slow. We need to cache on/offline status in-process
* and only query the DB if we don't know about this device. */
let online = await this.model.is_addr_online(addr);
if (online || pending[addr]) return false;
/* If we think this device is online, everything's OK. This will
* rebirth any device we haven't seen a birth for, even if the
* database says it's online. This is important because we might
* have the wrong schema information. */
if (this.online.has(addr) || pending[addr]) return false;

/* Mark that we're working on this device and wait 5-10s to see if
* it rebirths on its own. */
pending[addr] = 1;
await timers.setTimeout(5000 + Math.random() * 5000);
online = await this.model.is_addr_online(addr);

/* Clear our marker first so we retry next time */
delete (pending[addr]);

if (online) return false;
if (this.online.has(addr)) return false;

sent[addr] = Date.now();
return true;
Expand Down
Loading

0 comments on commit eca2083

Please sign in to comment.