Skip to content

Commit

Permalink
Fix device registration over HTTP (#19)
Browse files Browse the repository at this point in the history
Service registration over HTTP was not correctly recording the
associated Sparkplug device. This was causing issues with clients that
need to access the change-notify interface.

More fundamentally, the service registration table was allowing a
distinction between 'the principal which registered this service' and
'the Sparkplug device associated with this service'. This was because
when the interface was designed I was trying to allow a Kerberos
principal and its associated Sparkplug Node to use different UUIDs.
Since then I have decided that this is incorrect, and the Sparkplug Node
address is simply an alternative identifier for the same principal as
the Kerberos UPN. Remove the separate `principal` table, and simply
record service registrations as owned by their Sparkplug device.
  • Loading branch information
amrc-benmorrow authored Jan 9, 2024
2 parents eca2083 + 7b55e33 commit 133b504
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 2,991 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
node_modules
package-lock.json
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
# Local build
config.mk

# Gitignore package-lock as it messes up cross-platform builds
package-lock.json

# Environment
.env

Expand Down
2 changes: 1 addition & 1 deletion api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ components:
online at the moment. lastChange indicates the last time a BIRTH or DEATH was seen.
schemas is a list of all schemas used by the most recent birth certificate.
type: object
required: [uuid, group_id, node_id, online, last_change]
required: [uuid, group_id, node_id, online, last_change, schemas, top_schema]
properties:
uuid: { type: string, format: uuid }
group_id: { $ref: "#/components/schemas/sparkplug-name" }
Expand Down
5 changes: 0 additions & 5 deletions bin/directory-mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ fplus.set_service_discovery(model.find_service_url.bind(model));
const app = await new MQTTCli({
fplus,
model,
mqtt_broker: process.env.MQTT_BROKER,

device_uuid: process.env.DEVICE_UUID,
sparkplug_address: process.env.SPARKPLUG_ADDRESS,

url: process.env.HTTP_API_URL,
silent: !!process.env.MQTT_MONITOR_ONLY,
}).init();
Expand Down
22 changes: 16 additions & 6 deletions lib/api_v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ export default class API {
?? await this.fplus.resolve_principal({kerberos: req.auth});
if (owner == null) return res.status(400).end();

/* Specifying a device is for back-compat only. This version
* only supports principals which are also Sparkplug Nodes
* registering their own URLs. */
if (device != undefined && device != owner)
return res.status(403).end();

const ckown = (p, t) => {
console.log(`Checking ACL: ${owner}, ${p}, ${t}`);
return this.fplus.check_acl({uuid: owner}, p, t, true);
Expand All @@ -206,14 +212,18 @@ export default class API {
return this.fplus.check_acl({kerberos: req.auth}, p, t, true);
};

const ok =
(!setid || await ckreq(Perm.Manage_Service, owner))
&& (await ckown(Perm.Advertise_Service, service)
|| await ckreq(Perm.Override_Service, service));
const owner_ok = !setid || await ckreq(Perm.Manage_Service, owner);
const override_ok = setid
&& await ckreq(Perm.Override_Service, service);
const service_ok = override_ok
|| await ckown(Perm.Advertise_Service, service);
const ok = owner_ok && service_ok;

if (!ok) return res.status(403).end();

const st = await this.model.record_service({
service, owner, device, url
const st = await this.model.record_service({
service, url,
device: owner,
});

if (st == null)
Expand Down
3 changes: 1 addition & 2 deletions lib/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ export default class Model extends Queries {
if (opts.service) {
await q.record_service({
service: opts.service.uuid,
owner: uuid,
device: devid,
device: uuid,
url: opts.service.url,
});
}
Expand Down
35 changes: 25 additions & 10 deletions lib/mqttcli.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@

import timers from "timers/promises";
import async from "async";
import {Address, Debug, MetricBranch, MetricBuilder, MetricTree, SpB, Topic, UUIDs} from "@amrc-factoryplus/utilities";
import {Device_Info} from "./constants.js";
import Long from "long";

import {
Address, Debug,
MetricBranch, MetricBuilder, MetricTree,
SpB, Topic, UUIDs
} from "@amrc-factoryplus/utilities";
import { Device_Info } from "./constants.js";
import { Schema } from "./uuids.js";

function sym_diff(one, two) {
Expand All @@ -28,6 +34,11 @@ function map_get_or_create(map, key, factory) {
return rv;
}

function ts_date (ts) {
const val = Long.isLong(ts) ? ts.toNumber() : ts;
return new Date(val);
}

const debug = new Debug();

function for_each_metric(tree, action) {
Expand All @@ -48,11 +59,9 @@ export default class MQTTCli {
this.fplus = opts.fplus;
this.model = opts.model;

this.device_uuid = opts.device_uuid;
this.url = opts.url;
this.silent = opts.silent;

this.address = Address.parse(opts.sparkplug_address);
this.seq = 0;

this.online = new Set();
Expand All @@ -64,6 +73,13 @@ export default class MQTTCli {
}

async init() {
const ids = await this.fplus.Auth.find_principal();
if (!ids || !ids.uuid || !ids.sparkplug)
throw new Error("Can't find my identity!");

this.device_uuid = ids.uuid;
this.address = ids.sparkplug;

await this.model.init();

this.msg_q = async.queue(this.on_queued_message.bind(this));
Expand Down Expand Up @@ -333,7 +349,7 @@ export default class MQTTCli {
metric: branch.Active.name,
active: branch.Active.value,
alias: branch.Active.alias,
stamp: new Date(metric.timestamp ?? timestamp),
stamp: ts_date(metric.timestamp ?? timestamp),
});
});

Expand Down Expand Up @@ -362,7 +378,7 @@ export default class MQTTCli {
const uuid = "alias" in m
? alerts.alias.get(m.alias) : alerts.name.get(m.name);
if (!uuid) return [];
const stamp = new Date(m.timestamp ?? payload.timestamp);
const stamp = ts_date(m.timestamp ?? payload.timestamp);
return [[uuid, m.value, stamp]];
});

Expand All @@ -381,7 +397,6 @@ export default class MQTTCli {

async on_birth(address, payload) {
debug.log("device", `Registering BIRTH for ${address}`);

this.online.add(address);

let tree;
Expand All @@ -396,7 +411,7 @@ export default class MQTTCli {
this.record_alert_metrics(address, alerts);

await this.model.birth({
time: new Date(payload.timestamp ?? 0),
time: ts_date(payload.timestamp ?? 0),
address,
uuid: tree.Instance_UUID?.value,
top_schema: tree.Schema_UUID?.value,
Expand All @@ -409,7 +424,7 @@ export default class MQTTCli {
}

async on_death(address, payload) {
const time = new Date(payload.timestamp ?? 0);
const time = ts_date(payload.timestamp ?? 0);

debug.log("device", `Registering DEATH for ${address}`);

Expand Down Expand Up @@ -468,7 +483,7 @@ export default class MQTTCli {
async do_we_rebirth(addr) {
const {pending, sent} = this.rebirths;

debug.log("rebirth", `Checking whether we should rebirth ${addr}`);
//debug.log("rebirth", `Checking whether we should rebirth ${addr}`);

/* If we've rebirthed this device in the last 5 minutes, don't
* do it again. */
Expand Down
44 changes: 19 additions & 25 deletions lib/queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function sym_diff(one, two) {
* the database directly, and sometimes we need to query using a query
* function for a transaction. The model inherits from this class. */
export default class Queries {
static DBVersion = 9;
static DBVersion = 10;

constructor(query) {
this.query = query;
Expand Down Expand Up @@ -70,7 +70,7 @@ export default class Queries {
async devices() {
let dbr = await this.query(`
select uuid
from device
from device_status
`);

return dbr.rows.map(r => r.uuid);
Expand Down Expand Up @@ -320,44 +320,38 @@ export default class Queries {
}

async record_service(opts) {
if (!valid_uuid(opts.service) || !valid_uuid(opts.owner))
return null;

const dev = await this.find_or_create("device", opts.device);
const srv = await this.find_or_create("service", opts.service);
const own = await this.find_or_create("principal", opts.owner);
if (!dev || !srv) return null;

/* Insert if we can; otherwise only update if we are changing
* something. This both avoids unnecessary churn on the DB and
* allows us to do correct change-notify over MQTT. */
const ins = await this.query(`
insert into service_provider as prv
(service, owner, device, url)
values ($1, $2, $3, $4)
on conflict (service, owner) do
update
set device = $3, url = $4
where prv.device is distinct
from $3
or prv.url is distinct
from $4
returning 1 ok
`, [srv, own, opts.device, opts.url]);
insert into service_provider as prv (service, device, url)
values ($1, $2, $3)
on conflict (service, device) do update
set url = $3
where prv.device is distinct from $2
or prv.url is distinct from $3
returning 1 ok
`, [srv, dev, opts.url]);

return ins.rowCount == 1 ? opts.service : false;
}

async service_advert(service, owner) {
if (!valid_uuid(service) || !valid_uuid(owner))
async service_advert(service, device) {
if (!valid_uuid(service) || !valid_uuid(device))
return null;

const dbr = await this.query(`
select svp.device, svp.url
select svp.url
from service_provider svp
join service sv on sv.id = svp.service
join principal pr on pr.id = svp.owner
join service sv on sv.id = svp.service
join device dv on dv.id = svp.device
where sv.uuid = $1
and pr.uuid = $2
`, [service, owner]);
and dv.uuid = $2
`, [service, device]);
return dbr.rows[0];
}

Expand Down
Loading

0 comments on commit 133b504

Please sign in to comment.