Skip to content

Commit

Permalink
fix(swingset): fix refcounts for messages queued to a promise
Browse files Browse the repository at this point in the history
As described in #3264, we were not maintaining correct reference counts when
messages get queued to an unresolved promise.

In the new approach, the lifetime of a message is:

* all krefs (target, args, and optional result) of a message are increfed
  during `doSend`, as before
  * this adds a `type: 'send'` event to the run-queue
* when the `send` event is pulled off the run-queue, all krefs are
  decrefed (as before)
* if the event is delivered to a vat, great
* if the event is delivered to a promise,
  `kernelKeeper.addMessageToPromiseQueue` increfs all krefs
  * (this is new: previously `addMessageToPromiseQueue` did not change the
    refcounts)
* later, if/when the promise is resolved, the messages are transferred
  laterally from the promise queue to `send` events on the run-queue, without
  changing their refcounts
  * (this is new: previously the transfer was done with `doSend`, which
    incremented the refcounts)
  * the counts are decremented when the `send` event is removed from the
    run-queue, as usual

The result is the same number of increments and decrements as before, but the
increment is done earlier, so messages on a promise queue will maintain a
refcount on all their krefs (target, args, and any result). This protects
objects and promises which would otherwise have been eligible for collection
while they sat on the promise queue.

Strictly speaking we don't need to maintain a refcount on the target (which
is also kind of redundant: everything on the queue for promise `kp1`
obviously has `target: 'kp1'`), since that will always be additionally held
by the c-list of the decider (at least). But by making all promise queues do
the same thing as the main run-queue, we can laterally transfer messages from
promise- to run-queue without DB churn (decrementing then immediately
incrementing the counts).

This change moves the responsibility for the lateral transfer from
`notifySubscribersAndQueue` in kernel.js to
`kernelKeeper.resolveKernelPromise()`, deleting the former entirely and
merging the subscription loop into `doResolve`.

closes #3264
  • Loading branch information
warner committed Jun 8, 2021
1 parent 06785a2 commit 36c9e51
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 33 deletions.
35 changes: 9 additions & 26 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -283,26 +283,6 @@ export default function buildKernel(
kernelKeeper.addToRunQueue(m);
}

function notifySubscribersAndQueue(kpid, resolvingVatID, subscribers, queue) {
insistKernelType('promise', kpid);
for (const vatID of subscribers) {
if (vatID !== resolvingVatID) {
notify(vatID, kpid);
}
}
// re-deliver msg to the now-settled promise, which will forward or
// reject depending on the new state of the promise
for (const msg of queue) {
// todo: this is slightly lazy, sending the message back to the same
// promise that just got resolved. When this message makes it to the
// front of the run-queue, we'll look up the resolution. Instead, we
// could maybe look up the resolution *now* and set the correct target
// early. Doing that might make it easier to remove the Promise Table
// entry earlier.
kernelSyscallHandler.send(kpid, msg);
}
}

function doResolve(vatID, resolutions) {
if (vatID) {
insistVatID(vatID);
Expand All @@ -312,14 +292,13 @@ export default function buildKernel(
insistKernelType('promise', kpid);
insistCapData(data);
const p = kernelKeeper.getResolveablePromise(kpid, vatID);
const { subscribers, queue } = p;
let idx = 0;
for (const dataSlot of data.slots) {
kernelKeeper.incrementRefCount(dataSlot, `resolve|s${idx}`);
idx += 1;
const { subscribers } = p;
for (const subscriber of subscribers) {
if (subscriber !== vatID) {
notify(subscriber, kpid);
}
}
kernelKeeper.resolveKernelPromise(kpid, rejected, data);
notifySubscribersAndQueue(kpid, vatID, subscribers, queue);
const tag = rejected ? 'rejected' : 'fulfilled';
if (p.policy === 'logAlways' || (rejected && p.policy === 'logFailure')) {
console.log(
Expand Down Expand Up @@ -594,6 +573,10 @@ export default function buildKernel(
try {
processQueueRunning = Error('here');
terminationTrigger = null;
// Decref everything in the message, under the assumption that most of
// the time we're delivering to a vat or answering the result promise
// with an error. If we wind up queueing it on a promise, we'll
// re-increment everything there.
if (message.type === 'send') {
kernelKeeper.decrementRefCount(message.target, `deq|msg|t`);
if (message.msg.result) {
Expand Down
45 changes: 45 additions & 0 deletions packages/SwingSet/src/kernel/state/kernelKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,30 @@ export default function makeKernelKeeper(kvStore, streamStore, kernelSlog) {
function resolveKernelPromise(kernelSlot, rejected, capdata) {
insistKernelType('promise', kernelSlot);
insistCapData(capdata);

let idx = 0;
for (const dataSlot of capdata.slots) {
// eslint-disable-next-line no-use-before-define
incrementRefCount(dataSlot, `resolve|${kernelSlot}|s${idx}`);
idx += 1;
}

// Re-queue all messages, so they can be delivered to the resolution.
// This is a lateral move, so we retain their original refcounts. TODO:
// this is slightly lazy, sending the message back to the same promise
// that just got resolved. When this message makes it to the front of the
// run-queue, we'll look up the resolution. Instead, we could maybe look
// up the resolution *now* and set the correct target early. Doing that
// might make it easier to remove the Promise Table entry earlier.
const p = getKernelPromise(kernelSlot);
const runQueue = JSON.parse(getRequired('runQueue'));
for (const msg of p.queue) {
const entry = harden({ type: 'send', target: kernelSlot, msg });
runQueue.push(entry);
}
kvStore.set('runQueue', JSON.stringify(runQueue));
incStat('runQueueLength', p.queue.length);

deleteKernelPromiseState(kernelSlot);
decStat('kpUnresolved');

Expand Down Expand Up @@ -513,6 +537,27 @@ export default function makeKernelKeeper(kvStore, streamStore, kernelSlog) {
insistKernelType('promise', kernelSlot);
insistMessage(msg);

// Each message on a promise's queue maintains a refcount to the promise
// itself. This isn't strictly necessary (the promise will be kept alive
// by the deciding vat's clist, or the queued message that holds this
// promise as its result), but it matches our policy with run-queue
// messages (each holds a refcount on its target), and makes it easier to
// transfer these messages back to the run-queue in
// resolveKernelPromise() (which doesn't touch any of the refcounts).

// eslint-disable-next-line no-use-before-define
incrementRefCount(kernelSlot, `pq|${kernelSlot}|t`);
if (msg.result) {
// eslint-disable-next-line no-use-before-define
incrementRefCount(msg.result, `pq|${kernelSlot}|r`);
}
let idx = 0;
for (const kref of msg.args.slots) {
// eslint-disable-next-line no-use-before-define
incrementRefCount(kref, `pq|${kernelSlot}|s${idx}`);
idx += 1;
}

const p = getKernelPromise(kernelSlot);
assert(
p.state === 'unresolved',
Expand Down
2 changes: 1 addition & 1 deletion packages/SwingSet/src/kernel/state/vatKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ export function makeVatKeeper(
} else {
assert.fail(X`unknown type ${type}`);
}
incrementRefCount(kernelSlot, `${vatID}[kv|clist`);
incrementRefCount(kernelSlot, `${vatID}|kv|clist`);
const vatSlot = makeVatSlot(type, false, id);

const vatKey = `${vatID}.c.${vatSlot}`;
Expand Down
6 changes: 3 additions & 3 deletions packages/SwingSet/test/test-kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,7 @@ test('non-pipelined promise queueing', async t => {
id: p1ForKernel,
state: 'unresolved',
policy: 'ignore',
refCount: 2,
refCount: 3,
decider: vatB,
subscribers: [],
queue: [
Expand All @@ -1112,7 +1112,7 @@ test('non-pipelined promise queueing', async t => {
id: p2ForKernel,
state: 'unresolved',
policy: 'ignore',
refCount: 1,
refCount: 3,
decider: undefined,
subscribers: [],
queue: [
Expand All @@ -1127,7 +1127,7 @@ test('non-pipelined promise queueing', async t => {
id: p3ForKernel,
state: 'unresolved',
policy: 'ignore',
refCount: 1,
refCount: 2,
decider: undefined,
subscribers: [],
queue: [],
Expand Down
10 changes: 7 additions & 3 deletions packages/SwingSet/test/test-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -508,25 +508,29 @@ test('kernelKeeper promises', async t => {
const expectedRunqueue = [];
const m1 = { method: 'm1', args: { body: '', slots: [] } };
k.addMessageToPromiseQueue(p1, m1);
t.deepEqual(k.getKernelPromise(p1).refCount, 1);
expectedRunqueue.push({ type: 'send', target: 'kp40', msg: m1 });

const m2 = { method: 'm2', args: { body: '', slots: [] } };
k.addMessageToPromiseQueue(p1, m2);
t.deepEqual(k.getKernelPromise(p1).queue, [m1, m2]);
t.deepEqual(k.getKernelPromise(p1).refCount, 2);
expectedRunqueue.push({ type: 'send', target: 'kp40', msg: m2 });

commitCrank();
k2 = duplicateKeeper(getState);
t.deepEqual(k2.getKernelPromise(p1).queue, [m1, m2]);

// when we resolve the promise, all its queued messages are moved to the
// run-queue, and its refcount remains the same
const capdata = harden({
body: '{"@qclass":"slot","index":0}',
slots: ['ko44'],
});
k.resolveKernelPromise(p1, false, capdata);
t.deepEqual(k.getKernelPromise(p1), {
state: 'fulfilled',
refCount: 0,
refCount: 2,
data: capdata,
});
t.truthy(k.hasKernelPromise(p1));
Expand All @@ -541,14 +545,14 @@ test('kernelKeeper promises', async t => {
['vat.dynamicIDs', '[]'],
['device.names', '[]'],
['gcActions', '[]'],
['runQueue', '[]'],
['runQueue', JSON.stringify(expectedRunqueue)],
['kd.nextID', '30'],
['ko.nextID', '20'],
['kp.nextID', '41'],
['kp40.data.body', '{"@qclass":"slot","index":0}'],
['kp40.data.slots', 'ko44'],
['kp40.state', 'fulfilled'],
['kp40.refCount', '0'],
['kp40.refCount', '2'],
['kernel.defaultManagerType', 'local'],
]);
});
Expand Down

0 comments on commit 36c9e51

Please sign in to comment.