diff --git a/src/cache.ts b/src/cache.ts index d28e647..da49e2e 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -8,6 +8,7 @@ export interface Block { } export interface Cache { + migrate(): Promise; insertEvents(args: { chainId: number; events: Event[]; @@ -41,6 +42,5 @@ export interface Cache { chainId: number; blockNumber: bigint; }): Promise; - insertBlock(args: Block): Promise; } diff --git a/src/cache/postgres.test.ts b/src/cache/postgres.test.ts new file mode 100644 index 0000000..34d3c70 --- /dev/null +++ b/src/cache/postgres.test.ts @@ -0,0 +1,266 @@ +import { Block } from "@/cache"; +import { createPostgresCache } from "@/cache/postgres"; +import { Event } from "@/types"; +import { describe, it, expect } from "vitest"; +import { Pool } from "pg"; + +const makeEvent = (blockNumber: bigint): Event => ({ + name: "EventName", + params: { string: "value", bigint: blockNumber }, + address: "0x123", + topic: "0x456", + transactionHash: "0x789", + blockNumber, + logIndex: 0, +}); + +const DATABASE_URL = process.env.DATABASE_URL; + +async function createNewPostgresCache() { + const pool = new Pool({ + connectionString: DATABASE_URL, + }); + + const cache = createPostgresCache({ + connectionPool: pool, + schemaName: `test_${Math.random().toString(36).substring(7)}`, + }); + + await cache.migrate(); + + return cache; +} + +describe.runIf(DATABASE_URL !== undefined)("postgres cache", () => { + it("inserts and retrieves events", async () => { + const max256BitBigInt = 2n ** 256n - 1n; + const cache = await createNewPostgresCache(); + + const event: Event = { + name: "EventName", + params: { string: "value", bigint: max256BitBigInt }, + address: "0x123", + topic: "0x456", + transactionHash: "0x789", + blockNumber: 100n, + logIndex: 0, + }; + + await cache.insertEvents({ + chainId: 1, + events: [event], + address: "0x123", + fromBlock: 100n, + toBlock: 150n, + }); + + { + const storedEvents = await cache.getEvents({ + chainId: 1, + address: "0x123", + fromBlock: 0n, + toBlock: 150n, + }); + + expect(storedEvents).not.toBe(null); + if (storedEvents !== null) { + expect(storedEvents.events.length).toBe(1); + expect(storedEvents.events[0]).toEqual(event); + expect(storedEvents.fromBlock).toEqual(100n); + expect(storedEvents.toBlock).toEqual(150n); + } + } + + { + const storedEvents = await cache.getEvents({ + chainId: 1, + address: "0x123", + topic0: "0x456", + fromBlock: 0n, + toBlock: 99n, + }); + + expect(storedEvents).toBe(null); + } + + { + const storedEvents = await cache.getEvents({ + chainId: 1, + address: "0x123", + topic0: "0x456", + fromBlock: 100n, + toBlock: 101n, + }); + + expect(storedEvents).not.toBe(null); + if (storedEvents !== null) { + expect(storedEvents.events.length).toBe(1); + expect(storedEvents.events[0]).toEqual(event); + expect(storedEvents.fromBlock).toEqual(100n); + expect(storedEvents.toBlock).toEqual(101n); + } + } + + { + const storedEvents = await cache.getEvents({ + chainId: 1, + address: "0x123", + topic0: "0x456", + fromBlock: 0n, + toBlock: 100n, + }); + + expect(storedEvents).not.toBe(null); + if (storedEvents !== null) { + expect(storedEvents.events.length).toBe(1); + expect(storedEvents.events[0]).toEqual(event); + expect(storedEvents.fromBlock).toEqual(100n); + expect(storedEvents.toBlock).toEqual(100n); + } + } + }); + + it("merges sequential log ranges", async () => { + const cache = await createNewPostgresCache(); + + const makeEvent = (blockNumber: bigint): Event => ({ + name: "EventName", + params: { string: "value", bigint: blockNumber }, + address: "0x123", + topic: "0x456", + transactionHash: "0x789", + blockNumber, + logIndex: 0, + }); + + const eventsBatch1 = [makeEvent(1n), makeEvent(2n)]; + const eventsBatch2 = [makeEvent(3n), makeEvent(4n)]; + + await cache.insertEvents({ + chainId: 1, + events: eventsBatch1, + address: "0x123", + fromBlock: 1n, + toBlock: 2n, + }); + + await cache.insertEvents({ + chainId: 1, + events: eventsBatch2, + address: "0x123", + fromBlock: 3n, + toBlock: 4n, + }); + + const storedEvents = await cache.getEvents({ + chainId: 1, + address: "0x123", + topic0: "0x456", + fromBlock: 1n, + toBlock: 5n, + }); + + expect(storedEvents).not.toBeNull(); + + if (storedEvents !== null) { + expect(storedEvents.events.length).toBe(4); + const expectedEvents = [...eventsBatch1, ...eventsBatch2]; + expect(storedEvents.events).toEqual(expectedEvents); + expect(storedEvents.fromBlock).toEqual(1n); + expect(storedEvents.toBlock).toEqual(4n); + } + }); + + it("merges overlapping log ranges", async () => { + const cache = await createNewPostgresCache(); + + const eventsBatch1 = [makeEvent(1n), makeEvent(2n)]; + const eventsBatch2 = [makeEvent(2n), makeEvent(4n)]; + + await cache.insertEvents({ + chainId: 1, + events: eventsBatch1, + address: "0x123", + fromBlock: 1n, + toBlock: 2n, + }); + + await cache.insertEvents({ + chainId: 1, + events: eventsBatch2, + address: "0x123", + fromBlock: 2n, + toBlock: 4n, + }); + + const storedEvents = await cache.getEvents({ + chainId: 1, + address: "0x123", + topic0: "0x456", + fromBlock: 1n, + toBlock: 5n, + }); + + expect(storedEvents).not.toBeNull(); + + if (storedEvents !== null) { + expect(storedEvents.events.length).toBe(3); + const expectedEvents = [makeEvent(1n), makeEvent(2n), makeEvent(4n)]; + expect(storedEvents.events).toEqual(expectedEvents); + expect(storedEvents.fromBlock).toEqual(1n); + expect(storedEvents.toBlock).toEqual(4n); + } + }); + + it("returns empty array if range not fetched", async () => { + const cache = await createNewPostgresCache(); + + await cache.insertEvents({ + chainId: 1, + events: [makeEvent(1n)], + address: "0x123", + fromBlock: 1n, + toBlock: 2n, + }); + + const storedEvents = await cache.getEvents({ + chainId: 1, + address: "0x123", + topic0: "0x456", + fromBlock: 3n, + toBlock: 4n, + }); + + expect(storedEvents).toBeNull(); + }); + + describe("block cache", async () => { + it("returns null on non existent blocks", async () => { + const cache = await createNewPostgresCache(); + const cachedBlock = await cache.getBlockByNumber({ + chainId: 1, + blockNumber: 1n, + }); + + expect(cachedBlock).toBeNull(); + }); + + it("returns block on existent blocks", async () => { + const cache = await createNewPostgresCache(); + const block: Block = { + chainId: 1, + blockNumber: 1n, + blockHash: "0x123", + timestamp: 123, + }; + await cache.insertBlock(block); + + const cachedBlock = await cache.getBlockByNumber({ + chainId: 1, + blockNumber: 1n, + }); + + expect(cachedBlock).toEqual(block); + }); + }); +}); diff --git a/src/cache/postgres.ts b/src/cache/postgres.ts new file mode 100644 index 0000000..0398cde --- /dev/null +++ b/src/cache/postgres.ts @@ -0,0 +1,323 @@ +import { Pool } from "pg"; + +import { Event, Hex } from "@/types"; +import { Block, Cache } from "@/cache"; +import { encodeJsonWithBigInts, decodeJsonWithBigInts } from "@/utils"; + +type BlockRow = { + chainid: number; + blocknumber: string; + blockhash: Hex; + timestamp: number; +}; + +const migration = ` +CREATE TABLE IF NOT EXISTS "$1"."events" ( + chainId INTEGER, + name TEXT, + params TEXT, + address TEXT, + topic0 TEXT, + transactionHash TEXT, + blockNumber INTEGER, + logIndex INTEGER, + PRIMARY KEY (chainId, blockNumber, logIndex) +); +CREATE TABLE IF NOT EXISTS "$1"."logRanges" ( + chainId INTEGER, + address TEXT, + fromBlock INTEGER, + toBlock INTEGER +); +CREATE INDEX IF NOT EXISTS idx_events ON "$1"."events" (chainId, address, blockNumber, name, params, transactionHash, logIndex); +CREATE INDEX IF NOT EXISTS idx_logranges_search ON "$1"."logRanges" (chainId, address, fromBlock, toBlock); +CREATE TABLE IF NOT EXISTS "$1"."contractReads" ( + chainId INTEGER, + address TEXT, + data TEXT, + functionName TEXT, + blockNumber INTEGER, + result TEXT, + PRIMARY KEY (chainId, address, data, functionName, blockNumber) +); +CREATE TABLE IF NOT EXISTS "$1".blocks ( + chainId INTEGER, + blockNumber TEXT, + blockHash TEXT, + timestamp INTEGER, + PRIMARY KEY (chainId, blockHash) +); +CREATE INDEX IF NOT EXISTS idx_blocks ON "$1".blocks (chainId, blockNumber); +`; + +export function createPostgresCache(args: { + connectionPool: Pool; + schemaName?: string; +}): Cache { + const pool = args.connectionPool; + const schema = args.schemaName ?? "public"; + + const tableName = (name: string) => `"${schema}"."${name}"`; + + return { + migrate: async () => { + await pool.query(`CREATE SCHEMA IF NOT EXISTS ${schema}`); + await pool.query(migration.replaceAll(/\$1/g, schema)); + }, + + async getBlockByNumber(args: { + chainId: number; + blockNumber: bigint; + }): Promise { + const client = await pool.connect(); + try { + const res = await client.query( + `SELECT * FROM ${tableName( + "blocks" + )} WHERE chainId = $1 AND blockNumber = $2`, + [args.chainId, args.blockNumber.toString()] + ); + + if (res.rows.length === 0) { + return null; + } + + return { + chainId: res.rows[0].chainid, + blockNumber: BigInt(res.rows[0].blocknumber), + blockHash: res.rows[0].blockhash, + timestamp: res.rows[0].timestamp, + }; + } finally { + client.release(); + } + }, + + async insertBlock(args: Block) { + const client = await pool.connect(); + try { + await client.query( + `INSERT INTO ${tableName( + "blocks" + )} (chainId, blockNumber, blockHash, timestamp) VALUES ($1, $2, $3, $4)`, + [ + args.chainId, + args.blockNumber.toString(), + args.blockHash, + args.timestamp, + ] + ); + } finally { + client.release(); + } + }, + + async insertEvents(args: { + chainId: number; + events: Event[]; + address: Hex; + topics: Hex[]; + fromBlock: bigint; + toBlock: bigint; + }) { + const client = await pool.connect(); + try { + await client.query("BEGIN"); + + for (const event of args.events) { + await client.query( + `INSERT INTO ${tableName( + "events" + )} (chainId, name, params, address, topic0, transactionHash, blockNumber, logIndex) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (chainId, blockNumber, logIndex) DO UPDATE SET + name = EXCLUDED.name, + params = EXCLUDED.params, + address = EXCLUDED.address, + topic0 = EXCLUDED.topic0, + transactionHash = EXCLUDED.transactionHash, + blockNumber = EXCLUDED.blockNumber, + logIndex = EXCLUDED.logIndex + `, + [ + args.chainId, + event.name, + encodeJsonWithBigInts(event.params), + event.address, + event.topic, + event.transactionHash, + event.blockNumber.toString(), + event.logIndex, + ] + ); + } + + // Check for adjacent ranges in logRanges + const adjacentRanges = await client.query( + `SELECT fromBlock, toBlock FROM ${tableName( + "logRanges" + )} WHERE chainId = $1 AND address = $2 AND toBlock >= $3 - 1 AND fromBlock <= $4 + 1`, + [ + args.chainId, + args.address, + Number(args.fromBlock), + Number(args.toBlock), + ] + ); + + let newFrom = Number(args.fromBlock); + let newTo = Number(args.toBlock); + + for (const range of adjacentRanges.rows) { + newFrom = Math.min(newFrom, range.fromblock); + newTo = Math.max(newTo, range.toblock); + } + + // Remove old overlapping ranges + await client.query( + `DELETE FROM ${tableName( + "logRanges" + )} WHERE chainId = $1 AND address = $2 AND fromBlock >= $3 AND toBlock <= $4`, + [args.chainId, args.address, newFrom, newTo] + ); + + // Insert the new merged range + await client.query( + `INSERT INTO ${tableName( + "logRanges" + )} (chainId, address, fromBlock, toBlock) VALUES ($1, $2, $3, $4)`, + [args.chainId, args.address, newFrom, newTo] + ); + + await client.query("COMMIT"); + } catch (e) { + await client.query("ROLLBACK"); + throw e; + } finally { + client.release(); + } + }, + + async getEvents(args: { + chainId: number; + address: Hex; + topic0: Hex; + fromBlock: bigint; + toBlock: bigint; + }): Promise<{ + fromBlock: bigint; + toBlock: bigint; + events: Event[]; + } | null> { + const client = await pool.connect(); + + try { + const range = await client.query( + `SELECT fromBlock, toBlock + FROM ${tableName("logRanges")} + WHERE chainId = $1 AND address = $2 + AND ((fromBlock <= $3 AND toBlock >= $4) OR (fromBlock <= $5 AND toBlock >= $6)) + LIMIT 1`, + [ + args.chainId, + args.address, + Number(args.toBlock), + Number(args.fromBlock), + Number(args.toBlock), + Number(args.fromBlock), + ] + ); + + if (range.rows.length > 0) { + const fromBlock = Math.max( + range.rows[0].fromblock, + Number(args.fromBlock) + ); + + const toBlock = Math.min(range.rows[0].toblock, Number(args.toBlock)); + + const rows = await client.query( + `SELECT * FROM ${tableName( + "events" + )} WHERE chainId = $1 AND address = $2 AND blockNumber >= $3 AND blockNumber <= $4`, + [args.chainId, args.address, fromBlock, toBlock] + ); + + return { + fromBlock: BigInt(fromBlock), + toBlock: BigInt(toBlock), + events: rows.rows.map((row) => ({ + name: row.name, + params: decodeJsonWithBigInts(row.params), + address: row.address, + topic: row.topic0, + transactionHash: row.transactionhash, + blockNumber: BigInt(row.blocknumber), + logIndex: row.logindex, + })), + }; + } else { + return null; + } + } finally { + client.release(); + } + }, + + async getContractRead(args: { + chainId: number; + address: Hex; + data: Hex; + functionName: string; + blockNumber: bigint; + }): Promise { + const client = await pool.connect(); + try { + const res = await client.query( + `SELECT result FROM ${tableName( + "contractReads" + )} WHERE chainId = $1 AND address = $2 AND data = $3 AND functionName = $4 AND blockNumber = $5`, + [ + args.chainId, + args.address, + args.data, + args.functionName, + args.blockNumber.toString(), + ] + ); + return res.rows.length > 0 ? res.rows[0].result : null; + } finally { + client.release(); + } + }, + + async insertContractRead(args: { + chainId: number; + address: Hex; + data: Hex; + functionName: string; + blockNumber: bigint; + result: Hex; + }): Promise { + const client = await pool.connect(); + try { + await client.query( + `INSERT INTO ${tableName( + "contractReads" + )} (chainId, address, data, functionName, blockNumber, result) + VALUES ($1, $2, $3, $4, $5, $6)`, + [ + args.chainId, + args.address, + args.data, + args.functionName, + args.blockNumber.toString(), + args.result, + ] + ); + } finally { + client.release(); + } + }, + }; +} diff --git a/src/cache/sqlite.ts b/src/cache/sqlite.ts index 494e731..43aee30 100644 --- a/src/cache/sqlite.ts +++ b/src/cache/sqlite.ts @@ -141,6 +141,9 @@ export function createSqliteCache(dbPath: string): Cache { } return { + async migrate() { + // empty + }, async insertEvents(args: { chainId: number; events: Event[]; diff --git a/src/index.ts b/src/index.ts index fa4689e..0dd16d7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -7,6 +7,7 @@ export { Database } from "@/storage"; export { createJsonDatabase } from "@/storage/json"; export { Cache, Block } from "@/cache"; export { createSqliteCache } from "@/cache/sqlite"; +export { createPostgresCache } from "@/cache/postgres"; export { SubscriptionStore } from "@/subscriptionStore"; export { createSqliteSubscriptionStore } from "@/subscriptionStore/sqlite"; diff --git a/src/indexer.ts b/src/indexer.ts index 838e748..fd12083 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -367,6 +367,10 @@ export function createIndexer< ); } } + + if (config.cache) { + await config.cache.migrate(); + } } async function stop() {