Skip to content

Commit

Permalink
fix: durable approach to watching purse balances
Browse files Browse the repository at this point in the history
  • Loading branch information
dckc committed Jan 5, 2024
1 parent 4e0e796 commit 401e87a
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 37 deletions.
4 changes: 2 additions & 2 deletions packages/boot/test/bootstrapTests/test-wallet-upgrade.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ test('update purse balance across zoe upgrade', async t => {
consume: { namesByAddressAdmin: true, zoe: true },
instance: { consume: { reserve: true } },
}),
js_code: `(${sendInvitationScript})()`,
js_code: `(${sendInvitationScript})()`.replace('ADDRESS', oraAddr),
},
]);

Expand Down Expand Up @@ -152,7 +152,7 @@ test('update purse balance across walletFactory upgrade', async t => {
consume: { namesByAddressAdmin: true, zoe: true },
instance: { consume: { reserve: true } },
}),
js_code: `(${sendInvitationScript})()`,
js_code: `(${sendInvitationScript})()`.replace('ADDRESS', oraAddr),
},
]);

Expand Down
2 changes: 1 addition & 1 deletion packages/boot/test/bootstrapTests/wallet-scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export const restartWalletFactoryScript = () => {
};

export const sendInvitationScript = () => {
const addr = 'agoric1oracle-operator';
const addr = 'ADDRESS';
const sendIt = async powers => {
// namesByAddress is broken #8113
const {
Expand Down
99 changes: 65 additions & 34 deletions packages/smart-wallet/src/smartWallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import {
import {
makeScalarBigMapStore,
makeScalarBigWeakMapStore,
prepareExoClass,
prepareExoClassKit,
provide,
watchPromise,
} from '@agoric/vat-data';
import {
prepareRecorderKit,
Expand All @@ -44,24 +46,6 @@ const { Fail, quote: q } = assert;

const trace = makeTracer('SmrtWlt');

/**
* Like `subscribeLatest` but that one swallows the upgrade error that this module needs to detect and handle.
*
* @template T
* @param {ERef<LatestTopic<T>>} topic
*/
async function* subscribeLatestSimple(topic) {
let lastUpdate;
await null; // for jessie
while (true) {
const updateRecord =
// eslint-disable-next-line no-await-in-loop
await E(topic).getUpdateSince(lastUpdate);
lastUpdate = updateRecord.updateCount;
yield updateRecord.value;
}
}

/**
* @file Smart wallet module
*
Expand Down Expand Up @@ -289,6 +273,57 @@ export const prepareSmartWallet = (baggage, shared) => {

const makeOfferWatcher = prepareOfferWatcher(baggage);

const NotifierShape = M.remotable();
const updateShape = {
value: AmountShape,
updateCount: M.bigint(),
};
const amountWatcherGuard = M.interface('paymentWatcher', {
onFulfilled: M.call(updateShape, NotifierShape).returns(),
onRejected: M.call(M.any(), NotifierShape).returns(M.promise()),
});
const prepareAmountWatcher = () =>
prepareExoClass(
baggage,
'AmountWatcher',
amountWatcherGuard,
/**
* @param {Purse} purse
* @param {ReturnType<makeWalletWithResolvedStorageNodes>['helper']} helper
*/
(purse, helper) => ({ purse, helper }),
{
/**
* @param {{ value: Amount, updateCount: bigint | undefined }} updateRecord
* @param { Notifier<Amount> } notifier
* @returns {void}
*/
onFulfilled(updateRecord, notifier) {
const { helper, purse } = this.state;
helper.updateBalance(purse, updateRecord.value);
helper.watchNextBalance(
this.self,
notifier,
updateRecord.updateCount,
);
},
/**
* @param {unknown} err
* @returns {Promise<void>}
*/
onRejected(err) {
const { helper, purse } = this.state;
if (isUpgradeDisconnection(err)) {
return helper.watchPurse(purse); // retry
}
helper.logWalletError(`failed amount observer`, err);
throw err;
},
},
);

const makeAmountWatcher = prepareAmountWatcher();

/**
* @param {UniqueParams} unique
* @returns {State}
Expand Down Expand Up @@ -374,6 +409,7 @@ export const prepareSmartWallet = (baggage, shared) => {
.returns(M.promise()),
publishCurrentState: M.call().returns(),
watchPurse: M.call(M.eref(PurseShape)).returns(M.promise()),
watchNextBalance: M.call(M.any(), NotifierShape, M.bigint()).returns(),
repairUnwatchedSeats: M.call().returns(),
updateStatus: M.call(M.any()).returns(),
addContinuingOffer: M.call(
Expand Down Expand Up @@ -496,28 +532,23 @@ export const prepareSmartWallet = (baggage, shared) => {

/** @type {(purse: ERef<Purse>) => Promise<void>} */
async watchPurse(purseRef) {
const { facets } = this;

const purse = await purseRef; // promises don't fit in durable storage

const { helper } = this.facets;
// publish purse's balance and changes
// This would seem to fit the observeNotifier() pattern,
// but purse notifiers are not (always) durable.
// but purse notifiers are not necessarily durable.
// If there is an error due to upgrade, retry watchPurse().
const notifier = E(purse).getCurrentAmountNotifier();
try {
// eslint-disable-next-line no-await-in-loop
for await (const newBalance of subscribeLatestSimple(notifier)) {
helper.updateBalance(purse, newBalance);
}
} catch (err) {
if (isUpgradeDisconnection(err)) {
helper.watchPurse(purse); // retry
}
facets.helper.logWalletError('⚠️ failed amount observer', err);
throw err;
}
const notifier = await E(purse).getCurrentAmountNotifier();

const handler = makeAmountWatcher(purse, helper);
const startP = E(notifier).getUpdateSince(undefined);
watchPromise(startP, handler, notifier);
},

watchNextBalance(handler, notifier, updateCount) {
const nextP = E(notifier).getUpdateSince(updateCount);
watchPromise(nextP, handler, notifier);
},

/**
Expand Down

0 comments on commit 401e87a

Please sign in to comment.