diff --git a/src/domain/polling/index.ts b/src/domain/polling/index.ts index 7b93eef..a575019 100644 --- a/src/domain/polling/index.ts +++ b/src/domain/polling/index.ts @@ -266,7 +266,7 @@ export async function checkForAndPlaceOrder( // It may be handy in other versions of the watch tower implemented in other languages // to not delete owners, so we can keep track of them. - for (const [owner, conditionalOrders] of Array.from(ownerOrders.entries())) { + for (const [owner, conditionalOrders] of ownerOrders) { if (conditionalOrders.size === 0) { ownerOrders.delete(owner); metrics.activeOwnersTotal.labels(chainId.toString()).dec(); @@ -293,7 +293,8 @@ function _deleteOrders( log: LoggerWithMethods, chainId: SupportedChainId ) { - log.debug(`${ordersPendingDelete.length} to delete`); + ordersPendingDelete.length && + log.debug(`${ordersPendingDelete.length} to delete`); for (const conditionalOrder of ordersPendingDelete) { const deleted = conditionalOrders.delete(conditionalOrder); diff --git a/src/services/chain.ts b/src/services/chain.ts index cb314cc..1abe59e 100644 --- a/src/services/chain.ts +++ b/src/services/chain.ts @@ -320,7 +320,17 @@ export class ChainContext { let lastBlockReceived = lastProcessedBlock; provider.on("block", async (blockNumber: number) => { try { + // Decide if we should process this block before polling for events + const shouldProcessBlock = + blockNumber % this.processEveryNumBlocks === 0; + + if (!shouldProcessBlock) { + log.debug(`Skipping block ${blockNumber}`); + return; + } + log.debug(`New block ${blockNumber}`); + const block = await provider.getBlock(blockNumber); // Set the block time metric @@ -348,6 +358,7 @@ export class ChainContext { await processBlockAndPersist({ context: this, + block, blockNumber, events, log, @@ -442,7 +453,7 @@ async function processBlock( blockNumberOverride?: number, blockTimestampOverride?: number ) { - const { provider, chainId, processEveryNumBlocks } = context; + const { provider, chainId } = context; const timer = metrics.processBlockDurationSeconds .labels(context.chainId.toString()) .startTimer(); @@ -471,31 +482,27 @@ async function processBlock( } } - // Decide if we should process this block - const shouldProcessBlock = block.number % processEveryNumBlocks === 0; - // Check programmatic orders and place orders if necessary - if (shouldProcessBlock) { - const result = await checkForAndPlaceOrder( - context, - block, - blockNumberOverride, - blockTimestampOverride - ) - .then(() => true) - .catch(() => { - hasErrors = true; - log.error(`Error running "checkForAndPlaceOrder" action`); - return false; - }); - log.debug( - `Result of "checkForAndPlaceOrder" action for block ${ - block.number - }: ${_formatResult(result)}` - ); - } + const result = await checkForAndPlaceOrder( + context, + block, + blockNumberOverride, + blockTimestampOverride + ) + .then(() => true) + .catch(() => { + hasErrors = true; + log.error(`Error running "checkForAndPlaceOrder" action`); + return false; + }); + log.debug( + `Result of "checkForAndPlaceOrder" action for block ${ + block.number + }: ${_formatResult(result)}` + ); timer(); + if (hasErrors) { throw new Error("Errors found in processing block"); } @@ -524,26 +531,31 @@ async function persistLastProcessedBlock(params: { async function processBlockAndPersist(params: { context: ChainContext; + block?: providers.Block; blockNumber: number; events: ConditionalOrderCreatedEvent[]; currentBlock?: providers.Block; log: LoggerWithMethods; provider: ethers.providers.Provider; }) { - const { context, blockNumber, events, currentBlock, log, provider } = params; - const block = await provider.getBlock(blockNumber); + const { context, block, blockNumber, events, currentBlock, log, provider } = + params; + + // Accept optional block object, in case it was already fetched + const _block = block || (await provider.getBlock(blockNumber)); + try { await processBlock( context, - block, + _block, events, currentBlock?.number, currentBlock?.timestamp ); } catch (err) { - log.error(`Error processing block ${block.number}`, err); + log.error(`Error processing block ${_block.number}`, err); } finally { - return persistLastProcessedBlock({ context, block, log }); + return persistLastProcessedBlock({ context, block: _block, log }); } } @@ -563,27 +575,25 @@ async function pollContractForEvents( topics: [topic], }); - return logs - .map((event) => { - try { - const decoded = composableCow.interface.decodeEventLog( - topic, - event.data, - event.topics - ) as unknown as ConditionalOrderCreatedEvent; + return logs.reduce((acc, event) => { + try { + const decoded = composableCow.interface.decodeEventLog( + topic, + event.data, + event.topics + ) as unknown as ConditionalOrderCreatedEvent; - return { + if (!addresses || addresses.includes(decoded.args.owner)) { + acc.push({ ...decoded, ...event, - }; - } catch { - return null; + }); } - }) - .filter((e): e is ConditionalOrderCreatedEvent => e !== null) - .filter((e): e is ConditionalOrderCreatedEvent => { - return addresses ? addresses.includes(e.args.owner) : true; - }); + } catch { + // Ignore errors and do not add to the accumulator + } + return acc; + }, []); } function _formatResult(result: boolean) { diff --git a/src/types/model.ts b/src/types/model.ts index e6fe803..adc2503 100644 --- a/src/types/model.ts +++ b/src/types/model.ts @@ -106,6 +106,7 @@ export class Registry { network: string; lastNotifiedError: Date | null; lastProcessedBlock: RegistryBlock | null; + readonly logger = getLogger("Registry"); /** * Instantiates a registry. @@ -189,7 +190,11 @@ export class Registry { } get numOrders(): number { - return Array.from(this.ownerOrders.values()).flatMap((o) => o).length; + let count = 0; + for (const orders of this.ownerOrders.values()) { + count += orders.size; // Count each set's size directly + } + return count; } /** @@ -241,13 +246,12 @@ export class Registry { // Write all atomically await batch.write(); - const log = getLogger( - `Registry:write:${this.version}:${this.network}:${ + this.logger.debug( + `write:${this.version}:${this.network}:${ this.lastProcessedBlock?.number - }:${this.lastNotifiedError || ""}` + }:${this.lastNotifiedError || ""}`, + "batch written 📝" ); - - log.debug("batch written 📝"); } public stringifyOrders(): string { @@ -300,9 +304,7 @@ async function loadOwnerOrders( ); // Parse conditional orders registry (for the persisted version, converting it to the last version) - const ownerOrders = parseConditionalOrders(!!str ? str : undefined, version); - - return ownerOrders; + return parseConditionalOrders(!!str ? str : undefined, version); } function parseConditionalOrders(