Skip to content

Commit

Permalink
fix: durable approach to watching purse balances
Browse files Browse the repository at this point in the history
  • Loading branch information
dckc committed Dec 4, 2023
1 parent e10b3fa commit 43355f6
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 35 deletions.
4 changes: 2 additions & 2 deletions packages/boot/test/bootstrapTests/test-wallet-upgrade.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ test('update purse balance across zoe upgrade', async t => {
consume: { namesByAddressAdmin: true, zoe: true },
instance: { consume: { reserve: true } },
}),
js_code: `(${sendInvitationScript})()`,
js_code: `(${sendInvitationScript})()`.replace('ADDRESS', oraAddr),
},
]);

Expand Down Expand Up @@ -152,7 +152,7 @@ test('update purse balance across walletFactory upgrade', async t => {
consume: { namesByAddressAdmin: true, zoe: true },
instance: { consume: { reserve: true } },
}),
js_code: `(${sendInvitationScript})()`,
js_code: `(${sendInvitationScript})()`.replace('ADDRESS', oraAddr),
},
]);

Expand Down
2 changes: 1 addition & 1 deletion packages/boot/test/bootstrapTests/wallet-scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export const restartWalletFactoryScript = () => {
};

export const sendInvitationScript = () => {
const addr = 'agoric1oracle-operator';
const addr = 'ADDRESS';
const sendIt = async powers => {
// namesByAddress is broken #8113
const {
Expand Down
98 changes: 66 additions & 32 deletions packages/smart-wallet/src/smartWallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import {
import {
makeScalarBigMapStore,
makeScalarBigWeakMapStore,
prepareExoClass,
prepareExoClassKit,
provide,
watchPromise,
} from '@agoric/vat-data';
import {
prepareRecorderKit,
Expand All @@ -44,24 +46,6 @@ const { Fail, quote: q } = assert;

const trace = makeTracer('SmrtWlt');

/**
* Like `subscribeLatest` but that one swallows the upgrade error that this module needs to detect and handle.
*
* @template T
* @param {ERef<LatestTopic<T>>} topic
*/
async function* subscribeLatestSimple(topic) {
let lastUpdate;
await null; // for jessie
while (true) {
const updateRecord =
// eslint-disable-next-line no-await-in-loop
await E(topic).getUpdateSince(lastUpdate);
lastUpdate = updateRecord.updateCount;
yield updateRecord.value;
}
}

/**
* @file Smart wallet module
*
Expand Down Expand Up @@ -284,6 +268,58 @@ export const prepareSmartWallet = (baggage, shared) => {

const makeOfferWatcher = makeOfferWatcherMaker(baggage);

const NotifierShape = M.remotable();
const updateShape = {
value: AmountShape,
updateCount: M.bigint(),
};
const amountWatcherGuard = M.interface('paymentWatcher', {
onFulfilled: M.call(updateShape, NotifierShape).returns(),
onRejected: M.call(M.any(), NotifierShape).returns(M.promise()),
});
const prepareAmountWatcher = () =>
prepareExoClass(
baggage,
'AmountWatcher',
amountWatcherGuard,
/**
* @param {Purse} purse
* @param {ReturnType<makeWalletWithResolvedStorageNodes>['helper']} helper
* @param {string} address
*/
(purse, helper, address) => ({ purse, helper, address }),
{
/**
* @param {{ value: Amount, updateCount: number }} updateRecord
* @param { Notifier<Amount> } notifier
* @returns {void}
*/
onFulfilled(updateRecord, notifier) {
const { helper, purse } = this.state;
helper.updateBalance(purse, updateRecord.value);
helper.watchNextBalance(
this.self,
notifier,
updateRecord.updateCount,
);
},
/**
* @param {unknown} err
* @returns {Promise<void>}
*/
onRejected(err) {
const { helper, purse, address } = this.state;
if (isUpgradeDisconnection(err)) {
return helper.watchPurse(purse); // retry
}
console.error(`*** ${address} failed amount observer, ${err} ***`);
throw err;
},
},
);

const makeAmountWatcher = prepareAmountWatcher();

/**
* @param {UniqueParams} unique
* @returns {State}
Expand Down Expand Up @@ -369,6 +405,7 @@ export const prepareSmartWallet = (baggage, shared) => {
.returns(M.promise()),
publishCurrentState: M.call().returns(),
watchPurse: M.call(M.eref(PurseShape)).returns(M.promise()),
watchNextBalance: M.call(M.any(), NotifierShape, M.bigint()).returns(),
repairUnwatchedSeats: M.call().returns(),
updateStatus: M.call(M.any(), M.string()).returns(),
addContinuingOffer: M.call(
Expand Down Expand Up @@ -491,21 +528,18 @@ export const prepareSmartWallet = (baggage, shared) => {
const purse = await purseRef; // promises don't fit in durable storage

// This would seem to fit the observeNotifier() pattern,
// but purse notifiers are not (always) durable.
// but purse notifiers are not necessarily durable.
// If there is an error due to upgrade, retry watchPurse().
const notifier = E(purse).getCurrentAmountNotifier();
try {
// eslint-disable-next-line no-await-in-loop
for await (const newBalance of subscribeLatestSimple(notifier)) {
helper.updateBalance(purse, newBalance);
}
} catch (err) {
if (isUpgradeDisconnection(err)) {
return helper.watchPurse(purse); // retry
}
console.error(`*** ${address} failed amount observer, ${err} ***`);
throw err;
}
const notifier = await E(purse).getCurrentAmountNotifier();

const handler = makeAmountWatcher(purse, helper, address);
const startP = E(notifier).getUpdateSince(undefined);
watchPromise(startP, handler, notifier);
},

watchNextBalance(handler, notifier, updateCount) {
const nextP = E(notifier).getUpdateSince(updateCount);
watchPromise(nextP, handler, notifier);
},

/**
Expand Down

0 comments on commit 43355f6

Please sign in to comment.