Skip to content

Commit

Permalink
fix: rename connection to channel
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Mar 22, 2020
1 parent 2dc3e07 commit f50a94b
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 72 deletions.
4 changes: 2 additions & 2 deletions packages/cosmic-swingset/lib/ag-solo/vats/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ export default function setup(syscall, state, helpers) {

// This will allow dApp developers to register in their api/deploy.js
const httpRegCallback = {
sendMulticast(obj, connectionHandles) {
return E(vats.http).sendMulticast(obj, connectionHandles);
send(obj, channelHandles) {
return E(vats.http).send(obj, channelHandles);
},
registerAPIHandler(handler) {
return E(vats.http).registerURLHandler(handler, '/api');
Expand Down
20 changes: 10 additions & 10 deletions packages/cosmic-swingset/lib/ag-solo/vats/captp.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,36 @@
import { makeCapTP } from '@agoric/captp/lib/captp';
import harden from '@agoric/harden';

export const getCapTPHandler = (E, sendMulticast, getBootstrapObject) => {
const conns = new Map();
export const getCapTPHandler = (E, send, getBootstrapObject) => {
const chans = new Map();
const handler = harden({
onConnect(_obj, meta) {
const { connectionHandle, origin = 'unknown' } = meta || {};
onOpen(_obj, meta) {
const { channelHandle, origin = 'unknown' } = meta || {};
console.info(`Starting CapTP`, meta);
const sendObj = o => {
sendMulticast(o, [connectionHandle]);
send(o, [channelHandle]);
};
const { dispatch, abort } = makeCapTP(
origin,
sendObj,
getBootstrapObject,
);
conns.set(connectionHandle, [dispatch, abort]);
chans.set(channelHandle, [dispatch, abort]);
},
onDisconnect(_obj, meta) {
onClose(_obj, meta) {
console.log(`Finishing CapTP`, meta);
const dispatchAbort = conns.get(meta.connectionHandle);
const dispatchAbort = chans.get(meta.channelHandle);
if (dispatchAbort) {
(1, dispatchAbort[1])();
}
conns.delete(meta.connectionHandle);
chans.delete(meta.channelHandle);
},
onError(obj, meta) {
console.log(`Error in CapTP`, meta, obj.error);
},
onMessage(obj, meta) {
console.error('processing inbound', obj);
const dispatchAbort = conns.get(meta.connectionHandle);
const dispatchAbort = chans.get(meta.channelHandle);
if (!dispatchAbort || !(1, dispatchAbort[0])(obj)) {
console.error(`Could not find CapTP handler ${obj.type}`, meta);
return false;
Expand Down
12 changes: 6 additions & 6 deletions packages/cosmic-swingset/lib/ag-solo/vats/repl.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export function stringify(value, spaces, already = new WeakSet()) {
return ret;
}

export function getReplHandler(E, homeObjects, sendMulticast) {
export function getReplHandler(E, homeObjects, send) {
const commands = {};
const history = {};
const display = {};
Expand All @@ -60,7 +60,7 @@ export function getReplHandler(E, homeObjects, sendMulticast) {

function updateHistorySlot(histnum) {
// console.log(`sendBroadcast ${histnum}`);
sendMulticast(
send(
{
type: 'updateHistory',
histnum,
Expand Down Expand Up @@ -141,11 +141,11 @@ export function getReplHandler(E, homeObjects, sendMulticast) {
};

const commandHandler = harden({
onConnect(_obj, meta) {
replHandles.add(meta.connectionHandle);
onOpen(_obj, meta) {
replHandles.add(meta.channelHandle);
},
onDisconnect(_obj, meta) {
replHandles.delete(meta.connectionHandle);
onClose(_obj, meta) {
replHandles.delete(meta.channelHandle);
},

onMessage(obj, meta) {
Expand Down
42 changes: 21 additions & 21 deletions packages/cosmic-swingset/lib/ag-solo/vats/vat-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import { getCapTPHandler } from './captp';
function build(E, D) {
let commandDevice;
let provisioner;
const connectionIdToHandle = new Map();
const connectionHandleToId = new WeakMap();
const channelIdToHandle = new Map();
const channelHandleToId = new WeakMap();
const loaded = {};
loaded.p = new Promise((resolve, reject) => {
loaded.res = resolve;
Expand All @@ -31,12 +31,12 @@ function build(E, D) {
},
};

const sendMulticast = (obj, connectionHandles) => {
const send = (obj, channelHandles) => {
// TODO: Make this sane by adding support for multicast to the commandDevice.
for (const connectionHandle of connectionHandles) {
const connectionID = connectionHandleToId.get(connectionHandle);
if (connectionID) {
const o = { ...obj, meta: { connectionID } };
for (const channelHandle of channelHandles) {
const channelID = channelHandleToId.get(channelHandle);
if (channelID) {
const o = { ...obj, meta: { channelID } };
D(commandDevice).sendBroadcast(o);
}
}
Expand Down Expand Up @@ -68,12 +68,12 @@ function build(E, D) {
if (ROLES.client) {
handler.readyForClient = () => readyForClient.p;

const replHandler = getReplHandler(E, homeObjects, sendMulticast);
const replHandler = getReplHandler(E, homeObjects, send);
registerURLHandler(replHandler, '/private/repl');

// Assign the captp handler.
// TODO: Break this out into a separate vat.
const captpHandler = getCapTPHandler(E, sendMulticast, () =>
const captpHandler = getCapTPHandler(E, send, () =>
// Harden only our exported objects.
harden(exportedToCapTP),
);
Expand Down Expand Up @@ -109,7 +109,7 @@ function build(E, D) {

registerURLHandler,
registerAPIHandler: h => registerURLHandler(h, '/api'),
sendMulticast,
send,

setProvisioner(p) {
provisioner = p;
Expand Down Expand Up @@ -138,19 +138,19 @@ function build(E, D) {
const { type, meta: rawMeta = {} } = rawObj || {};
const {
url = '/private/repl',
connectionID: rawConnectionID,
channelID: rawChannelID,
dispatcher = 'onMessage',
} = rawMeta;

try {
let connectionHandle = connectionIdToHandle.get(rawConnectionID);
if (dispatcher === 'onConnect') {
connectionHandle = harden({});
connectionIdToHandle.set(rawConnectionID, connectionHandle);
connectionHandleToId.set(connectionHandle, rawConnectionID);
} else if (dispatcher === 'onDisconnect') {
connectionIdToHandle.delete(rawConnectionID);
connectionHandleToId.delete(connectionHandle);
let channelHandle = channelIdToHandle.get(rawChannelID);
if (dispatcher === 'onOpen') {
channelHandle = harden({});
channelIdToHandle.set(rawChannelID, channelHandle);
channelHandleToId.set(channelHandle, rawChannelID);
} else if (dispatcher === 'onClose') {
channelIdToHandle.delete(rawChannelID);
channelHandleToId.delete(channelHandle);
}

const obj = {
Expand All @@ -160,9 +160,9 @@ function build(E, D) {

const meta = {
...rawMeta,
connectionHandle,
channelHandle,
};
delete meta.connectionID;
delete meta.channelID;

if (url === '/private/repl') {
// Use our local handler object (compatibility).
Expand Down
20 changes: 10 additions & 10 deletions packages/cosmic-swingset/lib/ag-solo/vats/vat-wallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ function build(E, D, _log) {
notify(m) {
pursesState = m;
if (http) {
E(http).sendMulticast(
E(http).send(
{
type: 'walletUpdatePurses',
data: pursesState,
Expand All @@ -108,7 +108,7 @@ function build(E, D, _log) {
notify(m) {
inboxState = m;
if (http) {
E(http).sendMulticast(
E(http).send(
{
type: 'walletUpdateInbox',
data: inboxState,
Expand All @@ -123,13 +123,13 @@ function build(E, D, _log) {

function getCommandHandler() {
return harden({
onConnect(_obj, meta) {
onOpen(_obj, meta) {
console.error('Adding adminHandle', meta);
adminHandles.add(meta.connectionHandle);
adminHandles.add(meta.channelHandle);
},
onDisconnect(_obj, meta) {
onClose(_obj, meta) {
console.error('Removing adminHandle', meta);
adminHandles.delete(meta.connectionHandle);
adminHandles.delete(meta.channelHandle);
},
onMessage: adminOnMessage,
});
Expand All @@ -139,11 +139,11 @@ function build(E, D, _log) {
return harden({
getCommandHandler() {
return harden({
onConnect(_obj, meta) {
bridgeHandles.add(meta.connectionHandle);
onOpen(_obj, meta) {
bridgeHandles.add(meta.channelHandle);
},
onDisconnect(_obj, meta) {
bridgeHandles.delete(meta.connectionHandle);
onClose(_obj, meta) {
bridgeHandles.delete(meta.channelHandle);
},
onMessage(obj, meta) {
const { type } = obj;
Expand Down
44 changes: 21 additions & 23 deletions packages/cosmic-swingset/lib/ag-solo/web.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const morgan = require('morgan');

const log = anylogger('web');

const points = new Map();
const channels = new Map();

const send = (ws, msg) => {
if (ws.readyState === ws.OPEN) {
Expand All @@ -26,12 +26,12 @@ export function makeHTTPListener(basedir, port, host, rawInboundCommand) {
// Enrich the inbound command with some metadata.
const inboundCommand = (
body,
{ connectionID, dispatcher, url, headers: { origin } = {} } = {},
{ channelID, dispatcher, url, headers: { origin } = {} } = {},
id = undefined,
) => {
const obj = {
...body,
meta: { connectionID, dispatcher, origin, url, date: Date.now() },
meta: { channelID, dispatcher, origin, url, date: Date.now() },
};
return rawInboundCommand(obj).catch(err => {
const idpfx = id ? `${id} ` : '';
Expand Down Expand Up @@ -124,11 +124,9 @@ export function makeHTTPListener(basedir, port, host, rawInboundCommand) {
.catch(_ => {});
});

// accept WebSocket connections at the root path.
// This senses the Connection-Upgrade header to distinguish between plain
// GETs (which should return index.html) and WebSocket requests.. it might
// be better to move the WebSocket listener off to e.g. /ws with a 'path:
// "ws"' option.
// accept WebSocket channels at the root path.
// This senses the Upgrade header to distinguish between plain
// GETs (which should return index.html) and WebSocket requests.
const wss = new WebSocket.Server({ noServer: true });
server.on('upgrade', (req, socket, head) => {
if (!validateOrigin(req)) {
Expand All @@ -143,18 +141,18 @@ export function makeHTTPListener(basedir, port, host, rawInboundCommand) {

server.listen(port, host, () => log.info('Listening on', `${host}:${port}`));

let lastConnectionID = 0;
let lastChannelID = 0;

function newConnection(ws, req) {
lastConnectionID += 1;
const connectionID = lastConnectionID;
const meta = { ...req, connectionID };
const id = `${req.socket.remoteAddress}:${req.socket.remotePort}[${connectionID}]:`;
function newChannel(ws, req) {
lastChannelID += 1;
const channelID = lastChannelID;
const meta = { ...req, channelID };
const id = `${req.socket.remoteAddress}:${req.socket.remotePort}[${channelID}]:`;

log.info(id, `new WebSocket connection ${req.url}`);

// Register the point-to-point connection.
points.set(connectionID, ws);
// Register the point-to-point channel.
channels.set(channelID, ws);

ws.on('error', err => {
log.error(id, 'client error', err);
Expand All @@ -164,14 +162,14 @@ export function makeHTTPListener(basedir, port, host, rawInboundCommand) {
log.info(id, 'client closed');
inboundCommand(
{ type: 'ws/meta' },
{ ...meta, dispatcher: 'onDisconnect' },
{ ...meta, dispatcher: 'onClose' },
id,
).finally(() => points.delete(connectionID));
).finally(() => channels.delete(channelID));
});

inboundCommand(
{ type: 'ws/meta' },
{ ...meta, dispatcher: 'onConnect' },
{ ...meta, dispatcher: 'onOpen' },
id,
);

Expand All @@ -192,19 +190,19 @@ export function makeHTTPListener(basedir, port, host, rawInboundCommand) {
}
});
}
wss.on('connection', newConnection);
wss.on('connection', newChannel);

function sendJSON(rawObj) {
const { meta: { connectionID } = {} } = rawObj;
const { meta: { channelID } = {} } = rawObj;
const obj = { ...rawObj };
delete obj.meta;

// Point-to-point.
const c = points.get(connectionID);
const c = channels.get(channelID);
if (c) {
send(c, JSON.stringify(obj));
} else {
log.error(`[${connectionID}]: connection not found`);
log.error(`[${channelID}]: channel not found`);
}
}

Expand Down

0 comments on commit f50a94b

Please sign in to comment.