From 36c9e5116a32a49d03b3cce0f81dec10ed2598a0 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 8 Jun 2021 11:24:20 -0700 Subject: [PATCH] fix(swingset): fix refcounts for messages queued to a promise As described in #3264, we were not maintaining correct reference counts when messages get queued to an unresolved promise. In the new approach, the lifetime of a message is: * all krefs (target, args, and optional result) of a message are increfed during `doSend`, as before * this adds a `type: 'send'` event to the run-queue * when the `send` event is pulled off the run-queue, all krefs are decrefed (as before) * if the event is delivered to a vat, great * if the event is delivered to a promise, `kernelKeeper.addMessageToPromiseQueue` increfs all krefs * (this is new: previously `addMessageToPromiseQueue` did not change the refcounts) * later, if/when the promise is resolved, the messages are transferred laterally from the promise queue to `send` events on the run-queue, without changing their refcounts * (this is new: previously the transfer was done with `doSend`, which incremented the refcounts) * the counts are decremented when the `send` event is removed from the run-queue, as usual The result is the same number of increments and decrements as before, but the increment is done earlier, so messages on a promise queue will maintain a refcount on all their krefs (target, args, and any result). This protects objects and promises which would otherwise have been eligible for collection while they sat on the promise queue. Strictly speaking we don't need to maintain a refcount on the target (which is also kind of redundant: everything on the queue for promise `kp1` obviously has `target: 'kp1'`), since that will always be additionally held by the c-list of the decider (at least). But by making all promise queues do the same thing as the main run-queue, we can laterally transfer messages from promise- to run-queue without DB churn (decrementing then immediately incrementing the counts). This change moves the responsibility for the lateral transfer from `notifySubscribersAndQueue` in kernel.js to `kernelKeeper.resolveKernelPromise()`, deleting the former entirely and merging the subscription loop into `doResolve`. closes #3264 --- packages/SwingSet/src/kernel/kernel.js | 35 ++++----------- .../SwingSet/src/kernel/state/kernelKeeper.js | 45 +++++++++++++++++++ .../SwingSet/src/kernel/state/vatKeeper.js | 2 +- packages/SwingSet/test/test-kernel.js | 6 +-- packages/SwingSet/test/test-state.js | 10 +++-- 5 files changed, 65 insertions(+), 33 deletions(-) diff --git a/packages/SwingSet/src/kernel/kernel.js b/packages/SwingSet/src/kernel/kernel.js index 2af6295b65e4..6cef19c235f1 100644 --- a/packages/SwingSet/src/kernel/kernel.js +++ b/packages/SwingSet/src/kernel/kernel.js @@ -283,26 +283,6 @@ export default function buildKernel( kernelKeeper.addToRunQueue(m); } - function notifySubscribersAndQueue(kpid, resolvingVatID, subscribers, queue) { - insistKernelType('promise', kpid); - for (const vatID of subscribers) { - if (vatID !== resolvingVatID) { - notify(vatID, kpid); - } - } - // re-deliver msg to the now-settled promise, which will forward or - // reject depending on the new state of the promise - for (const msg of queue) { - // todo: this is slightly lazy, sending the message back to the same - // promise that just got resolved. When this message makes it to the - // front of the run-queue, we'll look up the resolution. Instead, we - // could maybe look up the resolution *now* and set the correct target - // early. Doing that might make it easier to remove the Promise Table - // entry earlier. - kernelSyscallHandler.send(kpid, msg); - } - } - function doResolve(vatID, resolutions) { if (vatID) { insistVatID(vatID); @@ -312,14 +292,13 @@ export default function buildKernel( insistKernelType('promise', kpid); insistCapData(data); const p = kernelKeeper.getResolveablePromise(kpid, vatID); - const { subscribers, queue } = p; - let idx = 0; - for (const dataSlot of data.slots) { - kernelKeeper.incrementRefCount(dataSlot, `resolve|s${idx}`); - idx += 1; + const { subscribers } = p; + for (const subscriber of subscribers) { + if (subscriber !== vatID) { + notify(subscriber, kpid); + } } kernelKeeper.resolveKernelPromise(kpid, rejected, data); - notifySubscribersAndQueue(kpid, vatID, subscribers, queue); const tag = rejected ? 'rejected' : 'fulfilled'; if (p.policy === 'logAlways' || (rejected && p.policy === 'logFailure')) { console.log( @@ -594,6 +573,10 @@ export default function buildKernel( try { processQueueRunning = Error('here'); terminationTrigger = null; + // Decref everything in the message, under the assumption that most of + // the time we're delivering to a vat or answering the result promise + // with an error. If we wind up queueing it on a promise, we'll + // re-increment everything there. if (message.type === 'send') { kernelKeeper.decrementRefCount(message.target, `deq|msg|t`); if (message.msg.result) { diff --git a/packages/SwingSet/src/kernel/state/kernelKeeper.js b/packages/SwingSet/src/kernel/state/kernelKeeper.js index c74386bf1014..f2688a5214a0 100644 --- a/packages/SwingSet/src/kernel/state/kernelKeeper.js +++ b/packages/SwingSet/src/kernel/state/kernelKeeper.js @@ -420,6 +420,30 @@ export default function makeKernelKeeper(kvStore, streamStore, kernelSlog) { function resolveKernelPromise(kernelSlot, rejected, capdata) { insistKernelType('promise', kernelSlot); insistCapData(capdata); + + let idx = 0; + for (const dataSlot of capdata.slots) { + // eslint-disable-next-line no-use-before-define + incrementRefCount(dataSlot, `resolve|${kernelSlot}|s${idx}`); + idx += 1; + } + + // Re-queue all messages, so they can be delivered to the resolution. + // This is a lateral move, so we retain their original refcounts. TODO: + // this is slightly lazy, sending the message back to the same promise + // that just got resolved. When this message makes it to the front of the + // run-queue, we'll look up the resolution. Instead, we could maybe look + // up the resolution *now* and set the correct target early. Doing that + // might make it easier to remove the Promise Table entry earlier. + const p = getKernelPromise(kernelSlot); + const runQueue = JSON.parse(getRequired('runQueue')); + for (const msg of p.queue) { + const entry = harden({ type: 'send', target: kernelSlot, msg }); + runQueue.push(entry); + } + kvStore.set('runQueue', JSON.stringify(runQueue)); + incStat('runQueueLength', p.queue.length); + deleteKernelPromiseState(kernelSlot); decStat('kpUnresolved'); @@ -513,6 +537,27 @@ export default function makeKernelKeeper(kvStore, streamStore, kernelSlog) { insistKernelType('promise', kernelSlot); insistMessage(msg); + // Each message on a promise's queue maintains a refcount to the promise + // itself. This isn't strictly necessary (the promise will be kept alive + // by the deciding vat's clist, or the queued message that holds this + // promise as its result), but it matches our policy with run-queue + // messages (each holds a refcount on its target), and makes it easier to + // transfer these messages back to the run-queue in + // resolveKernelPromise() (which doesn't touch any of the refcounts). + + // eslint-disable-next-line no-use-before-define + incrementRefCount(kernelSlot, `pq|${kernelSlot}|t`); + if (msg.result) { + // eslint-disable-next-line no-use-before-define + incrementRefCount(msg.result, `pq|${kernelSlot}|r`); + } + let idx = 0; + for (const kref of msg.args.slots) { + // eslint-disable-next-line no-use-before-define + incrementRefCount(kref, `pq|${kernelSlot}|s${idx}`); + idx += 1; + } + const p = getKernelPromise(kernelSlot); assert( p.state === 'unresolved', diff --git a/packages/SwingSet/src/kernel/state/vatKeeper.js b/packages/SwingSet/src/kernel/state/vatKeeper.js index 0f487c4eb5da..5b19ca5e7422 100644 --- a/packages/SwingSet/src/kernel/state/vatKeeper.js +++ b/packages/SwingSet/src/kernel/state/vatKeeper.js @@ -234,7 +234,7 @@ export function makeVatKeeper( } else { assert.fail(X`unknown type ${type}`); } - incrementRefCount(kernelSlot, `${vatID}[kv|clist`); + incrementRefCount(kernelSlot, `${vatID}|kv|clist`); const vatSlot = makeVatSlot(type, false, id); const vatKey = `${vatID}.c.${vatSlot}`; diff --git a/packages/SwingSet/test/test-kernel.js b/packages/SwingSet/test/test-kernel.js index b9580d4fd200..d7d38e75c616 100644 --- a/packages/SwingSet/test/test-kernel.js +++ b/packages/SwingSet/test/test-kernel.js @@ -1097,7 +1097,7 @@ test('non-pipelined promise queueing', async t => { id: p1ForKernel, state: 'unresolved', policy: 'ignore', - refCount: 2, + refCount: 3, decider: vatB, subscribers: [], queue: [ @@ -1112,7 +1112,7 @@ test('non-pipelined promise queueing', async t => { id: p2ForKernel, state: 'unresolved', policy: 'ignore', - refCount: 1, + refCount: 3, decider: undefined, subscribers: [], queue: [ @@ -1127,7 +1127,7 @@ test('non-pipelined promise queueing', async t => { id: p3ForKernel, state: 'unresolved', policy: 'ignore', - refCount: 1, + refCount: 2, decider: undefined, subscribers: [], queue: [], diff --git a/packages/SwingSet/test/test-state.js b/packages/SwingSet/test/test-state.js index 564edcc29949..2da6a82b0f12 100644 --- a/packages/SwingSet/test/test-state.js +++ b/packages/SwingSet/test/test-state.js @@ -508,17 +508,21 @@ test('kernelKeeper promises', async t => { const expectedRunqueue = []; const m1 = { method: 'm1', args: { body: '', slots: [] } }; k.addMessageToPromiseQueue(p1, m1); + t.deepEqual(k.getKernelPromise(p1).refCount, 1); expectedRunqueue.push({ type: 'send', target: 'kp40', msg: m1 }); const m2 = { method: 'm2', args: { body: '', slots: [] } }; k.addMessageToPromiseQueue(p1, m2); t.deepEqual(k.getKernelPromise(p1).queue, [m1, m2]); + t.deepEqual(k.getKernelPromise(p1).refCount, 2); expectedRunqueue.push({ type: 'send', target: 'kp40', msg: m2 }); commitCrank(); k2 = duplicateKeeper(getState); t.deepEqual(k2.getKernelPromise(p1).queue, [m1, m2]); + // when we resolve the promise, all its queued messages are moved to the + // run-queue, and its refcount remains the same const capdata = harden({ body: '{"@qclass":"slot","index":0}', slots: ['ko44'], @@ -526,7 +530,7 @@ test('kernelKeeper promises', async t => { k.resolveKernelPromise(p1, false, capdata); t.deepEqual(k.getKernelPromise(p1), { state: 'fulfilled', - refCount: 0, + refCount: 2, data: capdata, }); t.truthy(k.hasKernelPromise(p1)); @@ -541,14 +545,14 @@ test('kernelKeeper promises', async t => { ['vat.dynamicIDs', '[]'], ['device.names', '[]'], ['gcActions', '[]'], - ['runQueue', '[]'], + ['runQueue', JSON.stringify(expectedRunqueue)], ['kd.nextID', '30'], ['ko.nextID', '20'], ['kp.nextID', '41'], ['kp40.data.body', '{"@qclass":"slot","index":0}'], ['kp40.data.slots', 'ko44'], ['kp40.state', 'fulfilled'], - ['kp40.refCount', '0'], + ['kp40.refCount', '2'], ['kernel.defaultManagerType', 'local'], ]); });