diff --git a/.changeset/rude-eels-count.md b/.changeset/rude-eels-count.md new file mode 100644 index 000000000000..9b10bfd2a3bf --- /dev/null +++ b/.changeset/rude-eels-count.md @@ -0,0 +1,5 @@ +--- +'@eth-optimism/data-transport-layer': patch +--- + +Add highest L1 and L2 block number Gauge metrics to DTL diff --git a/.changeset/tall-plums-behave.md b/.changeset/tall-plums-behave.md new file mode 100644 index 000000000000..3459ad6e5e1a --- /dev/null +++ b/.changeset/tall-plums-behave.md @@ -0,0 +1,5 @@ +--- +'@eth-optimism/core-utils': patch +--- + +improved watcher ability to find transactions during periods of high load diff --git a/.vscode/extensions.json b/.vscode/extensions.json new file mode 100644 index 000000000000..db78da54acff --- /dev/null +++ b/.vscode/extensions.json @@ -0,0 +1,12 @@ +{ + // See https://go.microsoft.com/fwlink/?LinkId=827846 to learn about workspace recommendations. + // Extension identifier format: ${publisher}.${name}. Example: vscode.csharp + + // List of extensions which should be recommended for users of this workspace. + "recommendations": [ + "dbaeumer.vscode-eslint", + "editorconfig.editorconfig", + "juanblanco.solidity", + "golang.go", + ], +} diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 000000000000..a7defeb9329f --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,11 @@ +{ + "editor.formatOnSave": true, + "[typescript]": { + "editor.defaultFormatter": "dbaeumer.vscode-eslint", + "editor.formatOnSave": true, + }, + "eslint.nodePath": "./node_modules/eslint/bin/", + "eslint.format.enable": true, + "editorconfig.generateAuto": false, + "files.trimTrailingWhitespace": true, +} diff --git a/package.json b/package.json index b424d60f1095..163c5ef289ac 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "lint:check": "yarn lerna run lint:check", "lint:fix": "yarn lerna run lint:fix", "postinstall": "patch-package", + "ready": "yarn lint && yarn test", "release": "yarn build && yarn changeset publish" }, "dependencies": { diff --git a/packages/core-utils/src/watcher.ts b/packages/core-utils/src/watcher.ts index 49f69bd57ce3..b22cd2e24f95 100644 --- a/packages/core-utils/src/watcher.ts +++ b/packages/core-utils/src/watcher.ts @@ -73,22 +73,31 @@ export class Watcher { msgHash: string, pollForPending: boolean = true ): Promise { - const blockNumber = await layer.provider.getBlockNumber() - const startingBlock = Math.max(blockNumber - this.NUM_BLOCKS_TO_FETCH, 0) - const successFilter = { - address: layer.messengerAddress, - topics: [ethers.utils.id(`RelayedMessage(bytes32)`)], - fromBlock: startingBlock, - } - const failureFilter = { - address: layer.messengerAddress, - topics: [ethers.utils.id(`FailedRelayedMessage(bytes32)`)], - fromBlock: startingBlock, + let matches: ethers.providers.Log[] = [] + + // scan for transaction with specified message + while (matches.length === 0) { + const blockNumber = await layer.provider.getBlockNumber() + const startingBlock = Math.max(blockNumber - this.NUM_BLOCKS_TO_FETCH, 0) + const successFilter: ethers.providers.Filter = { + address: layer.messengerAddress, + topics: [ethers.utils.id(`RelayedMessage(bytes32)`)], + fromBlock: startingBlock + } + const failureFilter: ethers.providers.Filter = { + address: layer.messengerAddress, + topics: [ethers.utils.id(`FailedRelayedMessage(bytes32)`)], + fromBlock: startingBlock + } + const successLogs = await layer.provider.getLogs(successFilter) + const failureLogs = await layer.provider.getLogs(failureFilter) + const logs = successLogs.concat(failureLogs) + matches = logs.filter((log: ethers.providers.Log) => log.data === msgHash) + // exit loop after first iteration if not polling + if (!pollForPending) { + break + } } - const successLogs = await layer.provider.getLogs(successFilter) - const failureLogs = await layer.provider.getLogs(failureFilter) - const logs = successLogs.concat(failureLogs) - const matches = logs.filter((log: any) => log.data === msgHash) // Message was relayed in the past if (matches.length > 0) { @@ -98,30 +107,8 @@ export class Watcher { ) } return layer.provider.getTransactionReceipt(matches[0].transactionHash) - } - if (!pollForPending) { + } else { return Promise.resolve(undefined) } - - // Message has yet to be relayed, poll until it is found - return new Promise(async (resolve, reject) => { - const handleEvent = async (log: any) => { - if (log.data === msgHash) { - try { - const txReceipt = await layer.provider.getTransactionReceipt( - log.transactionHash - ) - layer.provider.off(successFilter) - layer.provider.off(failureFilter) - resolve(txReceipt) - } catch (e) { - reject(e) - } - } - } - - layer.provider.on(successFilter, handleEvent) - layer.provider.on(failureFilter, handleEvent) - }) } } diff --git a/packages/data-transport-layer/package.json b/packages/data-transport-layer/package.json index 7982c69b08f1..5e85d1ee437b 100644 --- a/packages/data-transport-layer/package.json +++ b/packages/data-transport-layer/package.json @@ -68,6 +68,7 @@ "mocha": "^8.3.2", "pino-pretty": "^4.7.1", "prettier": "^2.2.1", + "prom-client": "^13.1.0", "rimraf": "^3.0.2", "ts-node": "^9.1.1", "typescript": "^4.2.3" diff --git a/packages/data-transport-layer/src/services/l1-ingestion/service.ts b/packages/data-transport-layer/src/services/l1-ingestion/service.ts index a758d79bf75d..ad8894c8db5c 100644 --- a/packages/data-transport-layer/src/services/l1-ingestion/service.ts +++ b/packages/data-transport-layer/src/services/l1-ingestion/service.ts @@ -1,9 +1,10 @@ /* Imports: External */ import { fromHexString, EventArgsAddressSet } from '@eth-optimism/core-utils' -import { BaseService } from '@eth-optimism/common-ts' +import { BaseService, Metrics } from '@eth-optimism/common-ts' import { JsonRpcProvider } from '@ethersproject/providers' import { LevelUp } from 'levelup' import { ethers, constants } from 'ethers' +import { Gauge } from 'prom-client' /* Imports: Internal */ import { TransportDB } from '../../db/transport-db' @@ -21,9 +22,25 @@ import { handleEventsStateBatchAppended } from './handlers/state-batch-appended' import { L1DataTransportServiceOptions } from '../main/service' import { MissingElementError, EventName } from './handlers/errors' +interface L1IngestionMetrics { + highestSyncedL1Block: Gauge +} + +const registerMetrics = ({ + client, + registry, +}: Metrics): L1IngestionMetrics => ({ + highestSyncedL1Block: new client.Gauge({ + name: 'data_transport_layer_highest_synced_l1_block', + help: 'Highest Synced L1 Block Number', + registers: [registry], + }), +}) + export interface L1IngestionServiceOptions extends L1DataTransportServiceOptions { db: LevelUp + metrics: Metrics } const optionSettings = { @@ -61,6 +78,8 @@ export class L1IngestionService extends BaseService { super('L1_Ingestion_Service', options, optionSettings) } + private l1IngestionMetrics: L1IngestionMetrics + private state: { db: TransportDB contracts: OptimismContracts @@ -72,6 +91,8 @@ export class L1IngestionService extends BaseService { protected async _init(): Promise { this.state.db = new TransportDB(this.options.db) + this.l1IngestionMetrics = registerMetrics(this.metrics) + this.state.l1RpcProvider = typeof this.options.l1RpcProvider === 'string' ? new JsonRpcProvider(this.options.l1RpcProvider) @@ -199,6 +220,8 @@ export class L1IngestionService extends BaseService { await this.state.db.setHighestSyncedL1Block(targetL1Block) + this.l1IngestionMetrics.highestSyncedL1Block.set(targetL1Block) + if ( currentL1Block - highestSyncedL1Block < this.options.logsPerPollingInterval @@ -240,6 +263,10 @@ export class L1IngestionService extends BaseService { lastGoodElement.blockNumber ) + this.l1IngestionMetrics.highestSyncedL1Block.set( + lastGoodElement.blockNumber + ) + // Something we should be keeping track of. this.logger.warn('recovering from a missing event', { eventName, diff --git a/packages/data-transport-layer/src/services/l2-ingestion/service.ts b/packages/data-transport-layer/src/services/l2-ingestion/service.ts index dae374f7c756..50ea6e514874 100644 --- a/packages/data-transport-layer/src/services/l2-ingestion/service.ts +++ b/packages/data-transport-layer/src/services/l2-ingestion/service.ts @@ -1,10 +1,11 @@ /* Imports: External */ -import { BaseService } from '@eth-optimism/common-ts' +import { BaseService, Metrics } from '@eth-optimism/common-ts' import { JsonRpcProvider } from '@ethersproject/providers' import { BigNumber } from 'ethers' import { LevelUp } from 'levelup' import axios from 'axios' import bfj from 'bfj' +import { Gauge } from 'prom-client' /* Imports: Internal */ import { TransportDB } from '../../db/transport-db' @@ -12,6 +13,21 @@ import { sleep, toRpcHexString, validators } from '../../utils' import { L1DataTransportServiceOptions } from '../main/service' import { handleSequencerBlock } from './handlers/transaction' +interface L2IngestionMetrics { + highestSyncedL2Block: Gauge +} + +const registerMetrics = ({ + client, + registry, +}: Metrics): L2IngestionMetrics => ({ + highestSyncedL2Block: new client.Gauge({ + name: 'data_transport_layer_highest_synced_l2_block', + help: 'Highest Synced L2 Block Number', + registers: [registry], + }), +}) + export interface L2IngestionServiceOptions extends L1DataTransportServiceOptions { db: LevelUp @@ -52,6 +68,8 @@ export class L2IngestionService extends BaseService { super('L2_Ingestion_Service', options, optionSettings) } + private l2IngestionMetrics: L2IngestionMetrics + private state: { db: TransportDB l2RpcProvider: JsonRpcProvider @@ -64,6 +82,8 @@ export class L2IngestionService extends BaseService { ) } + this.l2IngestionMetrics = registerMetrics(this.metrics) + this.state.db = new TransportDB(this.options.db) this.state.l2RpcProvider = @@ -113,6 +133,8 @@ export class L2IngestionService extends BaseService { await this.state.db.setHighestSyncedUnconfirmedBlock(targetL2Block) + this.l2IngestionMetrics.highestSyncedL2Block.set(targetL2Block) + if ( currentL2Block - highestSyncedL2BlockNumber < this.options.transactionsPerPollingInterval diff --git a/packages/data-transport-layer/src/services/main/service.ts b/packages/data-transport-layer/src/services/main/service.ts index b4d0839a360e..c8567a448d3b 100644 --- a/packages/data-transport-layer/src/services/main/service.ts +++ b/packages/data-transport-layer/src/services/main/service.ts @@ -1,5 +1,5 @@ /* Imports: External */ -import { BaseService, Logger } from '@eth-optimism/common-ts' +import { BaseService, Logger, Metrics } from '@eth-optimism/common-ts' import { LevelUp } from 'levelup' import level from 'level' @@ -31,7 +31,6 @@ export interface L1DataTransportServiceOptions { useSentry?: boolean sentryDsn?: string sentryTraceRate?: number - enableMetrics?: boolean defaultBackend: string } @@ -65,8 +64,18 @@ export class L1DataTransportService extends BaseService { private _initializeApp() { // TODO: Maybe pass this in as a parameter instead of creating it here? this.state.app = express() + if (this.options.useSentry) { this._initSentry() } - if (this.options.enableMetrics) { - this._initMetrics() - } + this.state.app.use(cors()) + + // Add prometheus middleware to express BEFORE route registering + this.state.app.use( + // This also serves metrics on port 3000 at /metrics + promBundle({ + // Provide metrics registry that other metrics uses + promRegistry: this.metrics.registry, + includeMethod: true, + includePath: true, + }) + ) + this._registerAllRoutes() + // Sentry error handling must be after all controllers // and before other error middleware if (this.options.useSentry) { @@ -148,25 +162,6 @@ export class L1TransportServer extends BaseService { this.state.app.use(Sentry.Handlers.tracingHandler()) } - /** - * Initialize Prometheus metrics collection and endpoint - */ - private _initMetrics() { - this.metrics = new Metrics({ - labels: { - environment: this.options.nodeEnv, - network: this.options.ethNetworkName, - release: this.options.release, - service: this.name, - }, - }) - const metricsMiddleware = promBundle({ - includeMethod: true, - includePath: true, - }) - this.state.app.use(metricsMiddleware) - } - /** * Registers a route on the server. *