Skip to content

Commit

Permalink
fix: attempt to fix memory leak (#161)
Browse files Browse the repository at this point in the history
# Description

I noticed that since my last changes #159 and #160 , there has been a
very visible memory leak, specially on arb1.

![image](https://github.com/user-attachments/assets/f447e5dd-c503-4eed-bce9-3c50771bdcbd)

[graph
link](https://g-0263500beb.grafana-workspace.eu-central-1.amazonaws.com/goto/sjeXFwkHg?orgId=1)

While I have not been able to detect where the memory leak is, this PR
has several improvements that might help.

# Changes

- [x] Remove a couple of unnecessary Array conversions, using iterators
instead
- [x] Use a single logger instance in the Registry class
- [x] Move block skipping logic early, avoiding unnecessary RPC calls
- [x] Reuse `block` instance if already queried

## How to test
Run it locally: should work

![image](https://github.com/user-attachments/assets/f2583ed1-4ebf-4d20-a53d-86824d7c7ac2)

~I'll also run it on staging for awhile before taking this PR out of
draft.~ can't apply it, pulumi is not happy with me.
  • Loading branch information
alfetopito authored Oct 9, 2024
1 parent 990dbb7 commit b50d5b4
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 56 deletions.
5 changes: 3 additions & 2 deletions src/domain/polling/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ export async function checkForAndPlaceOrder(

// It may be handy in other versions of the watch tower implemented in other languages
// to not delete owners, so we can keep track of them.
for (const [owner, conditionalOrders] of Array.from(ownerOrders.entries())) {
for (const [owner, conditionalOrders] of ownerOrders) {
if (conditionalOrders.size === 0) {
ownerOrders.delete(owner);
metrics.activeOwnersTotal.labels(chainId.toString()).dec();
Expand All @@ -293,7 +293,8 @@ function _deleteOrders(
log: LoggerWithMethods,
chainId: SupportedChainId
) {
log.debug(`${ordersPendingDelete.length} to delete`);
ordersPendingDelete.length &&
log.debug(`${ordersPendingDelete.length} to delete`);

for (const conditionalOrder of ordersPendingDelete) {
const deleted = conditionalOrders.delete(conditionalOrder);
Expand Down
100 changes: 55 additions & 45 deletions src/services/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,17 @@ export class ChainContext {
let lastBlockReceived = lastProcessedBlock;
provider.on("block", async (blockNumber: number) => {
try {
// Decide if we should process this block before polling for events
const shouldProcessBlock =
blockNumber % this.processEveryNumBlocks === 0;

if (!shouldProcessBlock) {
log.debug(`Skipping block ${blockNumber}`);
return;
}

log.debug(`New block ${blockNumber}`);

const block = await provider.getBlock(blockNumber);

// Set the block time metric
Expand Down Expand Up @@ -348,6 +358,7 @@ export class ChainContext {

await processBlockAndPersist({
context: this,
block,
blockNumber,
events,
log,
Expand Down Expand Up @@ -442,7 +453,7 @@ async function processBlock(
blockNumberOverride?: number,
blockTimestampOverride?: number
) {
const { provider, chainId, processEveryNumBlocks } = context;
const { provider, chainId } = context;
const timer = metrics.processBlockDurationSeconds
.labels(context.chainId.toString())
.startTimer();
Expand Down Expand Up @@ -471,31 +482,27 @@ async function processBlock(
}
}

// Decide if we should process this block
const shouldProcessBlock = block.number % processEveryNumBlocks === 0;

// Check programmatic orders and place orders if necessary
if (shouldProcessBlock) {
const result = await checkForAndPlaceOrder(
context,
block,
blockNumberOverride,
blockTimestampOverride
)
.then(() => true)
.catch(() => {
hasErrors = true;
log.error(`Error running "checkForAndPlaceOrder" action`);
return false;
});
log.debug(
`Result of "checkForAndPlaceOrder" action for block ${
block.number
}: ${_formatResult(result)}`
);
}
const result = await checkForAndPlaceOrder(
context,
block,
blockNumberOverride,
blockTimestampOverride
)
.then(() => true)
.catch(() => {
hasErrors = true;
log.error(`Error running "checkForAndPlaceOrder" action`);
return false;
});
log.debug(
`Result of "checkForAndPlaceOrder" action for block ${
block.number
}: ${_formatResult(result)}`
);

timer();

if (hasErrors) {
throw new Error("Errors found in processing block");
}
Expand Down Expand Up @@ -524,26 +531,31 @@ async function persistLastProcessedBlock(params: {

async function processBlockAndPersist(params: {
context: ChainContext;
block?: providers.Block;
blockNumber: number;
events: ConditionalOrderCreatedEvent[];
currentBlock?: providers.Block;
log: LoggerWithMethods;
provider: ethers.providers.Provider;
}) {
const { context, blockNumber, events, currentBlock, log, provider } = params;
const block = await provider.getBlock(blockNumber);
const { context, block, blockNumber, events, currentBlock, log, provider } =
params;

// Accept optional block object, in case it was already fetched
const _block = block || (await provider.getBlock(blockNumber));

try {
await processBlock(
context,
block,
_block,
events,
currentBlock?.number,
currentBlock?.timestamp
);
} catch (err) {
log.error(`Error processing block ${block.number}`, err);
log.error(`Error processing block ${_block.number}`, err);
} finally {
return persistLastProcessedBlock({ context, block, log });
return persistLastProcessedBlock({ context, block: _block, log });
}
}

Expand All @@ -563,27 +575,25 @@ async function pollContractForEvents(
topics: [topic],
});

return logs
.map((event) => {
try {
const decoded = composableCow.interface.decodeEventLog(
topic,
event.data,
event.topics
) as unknown as ConditionalOrderCreatedEvent;
return logs.reduce<ConditionalOrderCreatedEvent[]>((acc, event) => {
try {
const decoded = composableCow.interface.decodeEventLog(
topic,
event.data,
event.topics
) as unknown as ConditionalOrderCreatedEvent;

return {
if (!addresses || addresses.includes(decoded.args.owner)) {
acc.push({
...decoded,
...event,
};
} catch {
return null;
});
}
})
.filter((e): e is ConditionalOrderCreatedEvent => e !== null)
.filter((e): e is ConditionalOrderCreatedEvent => {
return addresses ? addresses.includes(e.args.owner) : true;
});
} catch {
// Ignore errors and do not add to the accumulator
}
return acc;
}, []);
}

function _formatResult(result: boolean) {
Expand Down
20 changes: 11 additions & 9 deletions src/types/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ export class Registry {
network: string;
lastNotifiedError: Date | null;
lastProcessedBlock: RegistryBlock | null;
readonly logger = getLogger("Registry");

/**
* Instantiates a registry.
Expand Down Expand Up @@ -189,7 +190,11 @@ export class Registry {
}

get numOrders(): number {
return Array.from(this.ownerOrders.values()).flatMap((o) => o).length;
let count = 0;
for (const orders of this.ownerOrders.values()) {
count += orders.size; // Count each set's size directly
}
return count;
}

/**
Expand Down Expand Up @@ -241,13 +246,12 @@ export class Registry {
// Write all atomically
await batch.write();

const log = getLogger(
`Registry:write:${this.version}:${this.network}:${
this.logger.debug(
`write:${this.version}:${this.network}:${
this.lastProcessedBlock?.number
}:${this.lastNotifiedError || ""}`
}:${this.lastNotifiedError || ""}`,
"batch written 📝"
);

log.debug("batch written 📝");
}

public stringifyOrders(): string {
Expand Down Expand Up @@ -300,9 +304,7 @@ async function loadOwnerOrders(
);

// Parse conditional orders registry (for the persisted version, converting it to the last version)
const ownerOrders = parseConditionalOrders(!!str ? str : undefined, version);

return ownerOrders;
return parseConditionalOrders(!!str ? str : undefined, version);
}

function parseConditionalOrders(
Expand Down

0 comments on commit b50d5b4

Please sign in to comment.