Skip to content

Commit

Permalink
chore: idle debug logs (#1368)
Browse files Browse the repository at this point in the history
* chore: add logs

* chore: changeset

---------

Co-authored-by: typedarray <90073088+0xOlias@users.noreply.github.com>
  • Loading branch information
typedarray and typedarray authored Dec 20, 2024
1 parent 709a6e4 commit 492d7e7
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 23 deletions.
5 changes: 5 additions & 0 deletions .changeset/warm-coats-tickle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"ponder": patch
---

Improved debug-level logs for historical indexing observability.
47 changes: 43 additions & 4 deletions packages/core/src/bin/utils/run.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { IndexingBuild, SchemaBuild } from "@/build/index.js";
import { runCodegen } from "@/common/codegen.js";
import type { Common } from "@/common/common.js";
import { getAppProgress } from "@/common/metrics.js";
import type { Database } from "@/database/index.js";
import { createHistoricalIndexingStore } from "@/indexing-store/historical.js";
import { getMetadataStore } from "@/indexing-store/metadata.js";
Expand All @@ -15,6 +16,7 @@ import {
encodeCheckpoint,
zeroCheckpoint,
} from "@/utils/checkpoint.js";
import { formatEta, formatPercentage } from "@/utils/format.js";
import { never } from "@/utils/never.js";
import { createQueue } from "@ponder/common";

Expand Down Expand Up @@ -160,10 +162,25 @@ export async function run({
for await (const { events, checkpoint } of sync.getEvents()) {
end = checkpoint;

const result = await handleEvents(
decodeEvents(common, indexingBuild.sources, events),
checkpoint,
);
const decodedEvents = decodeEvents(common, indexingBuild.sources, events);
const result = await handleEvents(decodedEvents, checkpoint);

// underlying metrics collection is actually synchronous
// https://github.com/siimon/prom-client/blob/master/lib/histogram.js#L102-L125
const { eta, progress } = await getAppProgress(common.metrics);
if (events.length > 0) {
if (eta === undefined || progress === undefined) {
common.logger.info({
service: "app",
msg: `Indexed ${events.length} events`,
});
} else {
common.logger.info({
service: "app",
msg: `Indexed ${events.length} events with ${formatPercentage(progress)} complete and ${formatEta(eta)} remaining`,
});
}
}

// Persist the indexing store to the db if it is too full. The `finalized`
// checkpoint is used as a mutex. Any rows in the reorg table that may
Expand All @@ -175,6 +192,18 @@ export async function run({
lastFlush + 5_000 < Date.now() &&
events.length > 0)
) {
if (historicalIndexingStore.isCacheFull()) {
common.logger.debug({
service: "indexing",
msg: `Indexing cache has exceeded ${common.options.indexingCacheMaxBytes} MB limit, starting flush`,
});
} else {
common.logger.debug({
service: "indexing",
msg: "Dev server periodic flush triggered, starting flush",
});
}

await database.finalize({
checkpoint: encodeCheckpoint(zeroCheckpoint),
});
Expand All @@ -186,6 +215,11 @@ export async function run({
checkpoint: events[events.length - 1]!.checkpoint,
});
lastFlush = Date.now();

common.logger.debug({
service: "indexing",
msg: "Completed flush",
});
}

await metadataStore.setStatus(sync.getStatus());
Expand All @@ -204,6 +238,11 @@ export async function run({
// have been written because of raw sql access are deleted. Also must truncate
// the reorg tables that may have been written because of raw sql access.

common.logger.debug({
service: "indexing",
msg: "Completed all historical events, starting final flush",
});

await database.finalize({ checkpoint: encodeCheckpoint(zeroCheckpoint) });
await historicalIndexingStore.flush();
await database.complete({ checkpoint: encodeCheckpoint(zeroCheckpoint) });
Expand Down
30 changes: 11 additions & 19 deletions packages/core/src/sync/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Common } from "@/common/common.js";
import { getAppProgress } from "@/common/metrics.js";
import type { Network } from "@/config/networks.js";
import {
type HistoricalSync,
Expand Down Expand Up @@ -502,6 +501,11 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
});
consecutiveErrors = 0;

args.common.logger.debug({
service: "sync",
msg: `Fetched ${events.length} events from the database for a ${formatEta(estimateSeconds * 1000)} range from ${decodeCheckpoint(from).blockTimestamp}`,
});

for (const network of args.networks) {
updateHistoricalStatus({ events, checkpoint: cursor, network });
}
Expand All @@ -519,27 +523,15 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {

yield { events, checkpoint: to };
from = cursor;

// underlying metrics collection is actually synchronous
// https://github.com/siimon/prom-client/blob/master/lib/histogram.js#L102-L125
const { eta, progress } = await getAppProgress(args.common.metrics);

if (events.length > 0) {
if (eta === undefined || progress === undefined) {
args.common.logger.info({
service: "app",
msg: `Indexed ${events.length} events`,
});
} else {
args.common.logger.info({
service: "app",
msg: `Indexed ${events.length} events with ${formatPercentage(progress)} complete and ${formatEta(eta)} remaining`,
});
}
}
} catch (error) {
// Handle errors by reducing the requested range by 10x
estimateSeconds = Math.max(10, Math.round(estimateSeconds / 10));

args.common.logger.debug({
service: "sync",
msg: `Failed to fetch events from the database, retrying with a ${formatEta(estimateSeconds * 1000)} range`,
});

if (++consecutiveErrors > 4) throw error;
}
}
Expand Down

0 comments on commit 492d7e7

Please sign in to comment.