From cb8c252ac46fad6d1abf63e39def0caf48a24d93 Mon Sep 17 00:00:00 2001 From: Nitin Mittal Date: Mon, 4 Mar 2024 18:41:55 +0400 Subject: [PATCH 1/4] feat: new changes --- .../matic_transfer/consumer/src/consumer.ts | 6 +- examples/matic_transfer/consumer/src/index.ts | 2 +- .../src/interfaces/transfer_methods.ts | 2 +- examples/matic_transfer/producer/src/index.ts | 6 +- .../matic_transfer/transformer/src/index.ts | 8 +- .../nft_balancer/consumer/src/consumer.ts | 6 +- examples/nft_balancer/consumer/src/index.ts | 2 +- examples/nft_balancer/producer/src/index.ts | 6 +- .../nft_balancer/transformer/src/index.ts | 8 +- internal/block_getters/http_block_getter.ts | 143 ++++++++++ .../block_getters/http_block_getter_worker.ts | 49 ++++ .../block_getters/quicknode_block_getter.ts | 53 ++-- internal/block_producers/block_producer.ts | 6 + .../block_subscription/block_subscription.ts | 2 + internal/coder/protobuf_coder.ts | 24 +- internal/formatters/block_formatter.ts | 250 +++++++++++++----- internal/interfaces/deposit.ts | 3 +- internal/interfaces/http_block.ts | 24 ++ internal/interfaces/http_transaction.ts | 20 ++ internal/interfaces/index.ts | 2 + .../block_producers/block_polling_producer.ts | 12 +- public/block_producers/block_producer.ts | 10 +- .../block_producers/erigon_block_producer.ts | 10 +- public/block_producers/http_block_producer.ts | 72 +++++ .../quicknode_block_producer.ts | 12 +- public/index.ts | 1 + public/interfaces/http_block.ts | 1 + public/interfaces/http_transaction.ts | 1 + schemas/burnblock.proto | 2 +- schemas/depositblock.proto | 1 + 30 files changed, 603 insertions(+), 141 deletions(-) create mode 100644 internal/block_getters/http_block_getter.ts create mode 100644 internal/block_getters/http_block_getter_worker.ts create mode 100644 internal/interfaces/http_block.ts create mode 100644 internal/interfaces/http_transaction.ts create mode 100644 public/block_producers/http_block_producer.ts create mode 100644 public/interfaces/http_block.ts create mode 100644 public/interfaces/http_transaction.ts diff --git a/examples/matic_transfer/consumer/src/consumer.ts b/examples/matic_transfer/consumer/src/consumer.ts index 5d48297..5eee094 100644 --- a/examples/matic_transfer/consumer/src/consumer.ts +++ b/examples/matic_transfer/consumer/src/consumer.ts @@ -26,10 +26,10 @@ dotenv.config() export default async function startConsuming(transferService: TransferService, transferMapper: TransferMapper): Promise { try { consume({ - "metadata.broker.list": process.env.KAFKA_CONNECTION_URL || "localhost:9092", - "group.id": process.env.CONSUMER_GROUP_ID || "matic.transfer.consumer", + "metadata.broker.list": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092", + "group.id": process.env.CONSUMER_GROUP_ID ?? "matic.transfer.consumer", "security.protocol": "plaintext", - topic: process.env.TRANSFER_TOPIC || "apps.1.matic.transfer", + topic: process.env.TRANSFER_TOPIC ?? "apps.1.matic.transfer", coders: { fileName: "matic_transfer", packageName: "matictransferpackage", diff --git a/examples/matic_transfer/consumer/src/index.ts b/examples/matic_transfer/consumer/src/index.ts index f92781c..50f72b6 100644 --- a/examples/matic_transfer/consumer/src/index.ts +++ b/examples/matic_transfer/consumer/src/index.ts @@ -23,7 +23,7 @@ async function start(): Promise { } }); - const database = new Database(process.env.MONGO_URL || 'mongodb://localhost:27017/chain-indexer'); + const database = new Database(process.env.MONGO_URL ?? 'mongodb://localhost:27017/chain-indexer'); await database.connect(); const transferService = new TransferService( diff --git a/examples/matic_transfer/consumer/src/interfaces/transfer_methods.ts b/examples/matic_transfer/consumer/src/interfaces/transfer_methods.ts index 9b590d2..56769a0 100644 --- a/examples/matic_transfer/consumer/src/interfaces/transfer_methods.ts +++ b/examples/matic_transfer/consumer/src/interfaces/transfer_methods.ts @@ -16,7 +16,7 @@ const statics = { //@ts-ignore const tx = await this.findOne().sort({ timestamp: -1 }).exec(); - return tx?.blockNumber || 0; + return tx?.blockNumber ?? 0; }, /** diff --git a/examples/matic_transfer/producer/src/index.ts b/examples/matic_transfer/producer/src/index.ts index 10a4d20..702817f 100644 --- a/examples/matic_transfer/producer/src/index.ts +++ b/examples/matic_transfer/producer/src/index.ts @@ -21,12 +21,12 @@ Logger.create({ const producer = produce({ startBlock: parseInt(process.env.START_BLOCK as string), rpcWsEndpoints: process.env.RPC_WS_ENDPOINT_URL_LIST?.split(','), - topic: process.env.PRODUCER_TOPIC || "polygon.1.blocks", + topic: process.env.PRODUCER_TOPIC ?? "polygon.1.blocks", maxReOrgDepth: 96, maxRetries: 5, - mongoUrl: process.env.MONGO_URL || 'mongodb://localhost:27017/chain-indexer', + mongoUrl: process.env.MONGO_URL ?? 'mongodb://localhost:27017/chain-indexer', blockSubscriptionTimeout: 120000, - "bootstrap.servers": process.env.KAFKA_CONNECTION_URL || "localhost:9092", + "bootstrap.servers": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092", "security.protocol": "plaintext", type: "blocks:erigon" }); diff --git a/examples/matic_transfer/transformer/src/index.ts b/examples/matic_transfer/transformer/src/index.ts index d65ba11..7c69f1d 100644 --- a/examples/matic_transfer/transformer/src/index.ts +++ b/examples/matic_transfer/transformer/src/index.ts @@ -25,7 +25,7 @@ Logger.create({ try { startTransforming( { - "bootstrap.servers": process.env.KAFKA_CONNECTION_URL || "localhost:9092", + "bootstrap.servers": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092", "group.id": "matic.transfer.transformer", "security.protocol": "plaintext", "message.max.bytes": 26214400, @@ -35,11 +35,11 @@ try { packageName: "blockpackage", messageType: "Block" }, - topic: process.env.CONSUMER_TOPIC || "polygon.1.blocks", + topic: process.env.CONSUMER_TOPIC ?? "polygon.1.blocks", }, { - topic: process.env.PRODUCER_TOPIC || "apps.1.matic.transfer", - "bootstrap.servers": process.env.KAFKA_CONNECTION_URL || "localhost:9092", + topic: process.env.PRODUCER_TOPIC ?? "apps.1.matic.transfer", + "bootstrap.servers": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092", "security.protocol": "plaintext", "message.max.bytes": 26214400, coder: { diff --git a/examples/nft_balancer/consumer/src/consumer.ts b/examples/nft_balancer/consumer/src/consumer.ts index 6a87922..b939e96 100644 --- a/examples/nft_balancer/consumer/src/consumer.ts +++ b/examples/nft_balancer/consumer/src/consumer.ts @@ -26,10 +26,10 @@ dotenv.config() export default async function startConsuming(transferTokenService: TransferTokenService, transferTokenMapper: TransferTokenMapper): Promise { try { consume({ - "metadata.broker.list": process.env.KAFKA_CONNECTION_URL || "localhost:9092", - "group.id": process.env.CONSUMER_GROUP_ID || "matic.transfer.consumer", + "metadata.broker.list": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092", + "group.id": process.env.CONSUMER_GROUP_ID ?? "matic.transfer.consumer", "security.protocol": "plaintext", - topic: process.env.TRANSFER_TOPIC || "apps.1.matic.transfer", + topic: process.env.TRANSFER_TOPIC ?? "apps.1.matic.transfer", coders: { fileName: "nft_transfer", packageName: "nfttransferpackage", diff --git a/examples/nft_balancer/consumer/src/index.ts b/examples/nft_balancer/consumer/src/index.ts index a265103..82e5a63 100644 --- a/examples/nft_balancer/consumer/src/index.ts +++ b/examples/nft_balancer/consumer/src/index.ts @@ -23,7 +23,7 @@ async function start(): Promise { } }); - const database = new Database(process.env.MONGO_URL || 'mongodb://localhost:27017/chain-indexer'); + const database = new Database(process.env.MONGO_URL ?? 'mongodb://localhost:27017/chain-indexer'); await database.connect(); const transferService = new TransferTokenService( diff --git a/examples/nft_balancer/producer/src/index.ts b/examples/nft_balancer/producer/src/index.ts index 7c65205..6721e4f 100644 --- a/examples/nft_balancer/producer/src/index.ts +++ b/examples/nft_balancer/producer/src/index.ts @@ -22,12 +22,12 @@ const producer = produce({ startBlock: parseInt(process.env.START_BLOCK as string), rpcWsEndpoints: process.env.HTTP_PROVIDER ? [process.env.HTTP_PROVIDER] : undefined, blockPollingTimeout: parseInt(process.env.BLOCK_POLLING_TIMEOUT as string), - topic: process.env.PRODUCER_TOPIC || "polygon.1442.blocks", + topic: process.env.PRODUCER_TOPIC ?? "polygon.1442.blocks", maxReOrgDepth: 0, maxRetries: 5, - mongoUrl: process.env.MONGO_URL || 'mongodb://localhost:27017/chain-indexer', + mongoUrl: process.env.MONGO_URL ?? 'mongodb://localhost:27017/chain-indexer', // blockSubscriptionTimeout: 120000, - "bootstrap.servers": process.env.KAFKA_CONNECTION_URL || "localhost:9092", + "bootstrap.servers": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092", "security.protocol": "plaintext", type: "blocks:polling" }); diff --git a/examples/nft_balancer/transformer/src/index.ts b/examples/nft_balancer/transformer/src/index.ts index 0e33215..4b41d4c 100644 --- a/examples/nft_balancer/transformer/src/index.ts +++ b/examples/nft_balancer/transformer/src/index.ts @@ -25,7 +25,7 @@ Logger.create({ try { startTransforming( { - "bootstrap.servers": process.env.KAFKA_CONNECTION_URL || "localhost:9092", + "bootstrap.servers": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092", "group.id": "matic.transfer.transformer", "security.protocol": "plaintext", "message.max.bytes": 26214400, @@ -35,11 +35,11 @@ try { packageName: "blockpackage", messageType: "Block" }, - topic: process.env.CONSUMER_TOPIC || "polygon.1.blocks", + topic: process.env.CONSUMER_TOPIC ?? "polygon.1.blocks", }, { - topic: process.env.PRODUCER_TOPIC || "apps.1.matic.transfer", - "bootstrap.servers": process.env.KAFKA_CONNECTION_URL || "localhost:9092", + topic: process.env.PRODUCER_TOPIC ?? "apps.1.matic.transfer", + "bootstrap.servers": process.env.KAFKA_CONNECTION_URL ?? "localhost:9092", "security.protocol": "plaintext", "message.max.bytes": 26214400, coder: { diff --git a/internal/block_getters/http_block_getter.ts b/internal/block_getters/http_block_getter.ts new file mode 100644 index 0000000..4ce2631 --- /dev/null +++ b/internal/block_getters/http_block_getter.ts @@ -0,0 +1,143 @@ +import { Block, Eth } from "web3-eth"; +import utils from "web3-utils"; +import { WebsocketProvider } from "web3-core"; +import { IBlock } from "../interfaces/block.js"; +import { BlockGetter } from "./block_getter.js"; +import { IBlockGetter } from "../interfaces/block_getter.js"; +import { ITransaction } from "../interfaces/transaction.js"; +import { Logger } from "../logger/logger.js"; +import { IHttpBlock } from "../interfaces/http_block.js"; +import { IHttpTransaction } from "../interfaces/http_transaction.js"; + +/** + * A wrapper class on web3 block related functions + * + */ +export class HttpBlockGetter extends BlockGetter implements IBlockGetter { + /** + * @param {Eth} eth - Eth module from web3.js + * @param {number} maxRetries - The number of times to retry on errors. + * @param {rpcTimeout} rpcTimeout - Option param to set timeout on the RPC call + * + * @constructor + */ + constructor(eth: Eth, maxRetries: number = 0, private rpcTimeout?: number) { + super(eth, maxRetries); + } + + /** + * @async + * Public method to query block data of a single block + * + * @param {number | string} blockNumber - Block number to query the block details for. + * + * @returns {Promise} - Block object + */ + public async getBlock(blockNumber: number | string): Promise { + Logger.debug("getBlock called on getter"); + return await new Promise(async (resolve, reject) => { + const timeout = setTimeout(() => { + reject( + new Error(`Request timed out for block: ${blockNumber}`) + ); + }, this.rpcTimeout ?? 4000); + const eth: Eth = this.eth; + (eth.currentProvider as WebsocketProvider).send( + { + method: "eth_getBlockByNumber", + id: Date.now().toString() + blockNumber, + params: [utils.numberToHex(blockNumber)], + jsonrpc: "2.0", + }, + (error, response) => { + if (error) { + clearTimeout(timeout); + reject(error); + } + + if (!response?.result) { + clearTimeout(timeout); + reject( + new Error( + `null response received for block: ${blockNumber}` + ) + ); + } + + clearTimeout(timeout); + resolve(response?.result); + } + ); + }); + } + + /** + * @async + * Public method to query block data including transaction receipts of a single block. + * + * @param {number | string} blockNumber - The block number for which block data needs to be retrieved. + * + * @returns {Promise} - Block object containing all details including transaction receipts. + * + * @throws {Error} - Throws error object on failure. + */ + public async getBlockWithTransactionReceipts( + blockNumber: number | string + ): Promise { + Logger.debug(`Fetching block ${blockNumber}`); + const block: IHttpBlock = await new Promise(async (resolve, reject) => { + const timeout = setTimeout(() => { + reject( + new Error(`Request timed out for block: ${blockNumber}`) + ); + }, this.rpcTimeout ?? 4000); + const eth: Eth = this.eth; + (eth.currentProvider as WebsocketProvider).send( + { + method: "eth_getBlockByNumber", + id: Date.now().toString() + blockNumber, + params: [utils.numberToHex(blockNumber), true], + jsonrpc: "2.0", + }, + (error, response) => { + if (error) { + clearTimeout(timeout); + reject(error); + } + + if (!response?.result) { + clearTimeout(timeout); + reject( + new Error( + `null response received for block: ${blockNumber}` + ) + ); + } + + clearTimeout(timeout); + resolve(response?.result); + } + ); + }); + Logger.debug( + `Fetching transaction receipts for the following block ${block.number}` + ); + + const transactions: ITransaction[] = []; + + for (const transactionObject of block.transactions) { + Logger.debug(`Processing transaction object ${transactionObject}`); + transactions.push( + this.formatHttpTransactionObject( + transactionObject as IHttpTransaction, + await this.getTransactionReceipt(transactionObject.hash) + ) + ); + } + Logger.debug( + `Fetched transactions receipts successfully for the following block ${block.number}` + ); + + return this.formatHttpBlock(block, transactions); + } +} diff --git a/internal/block_getters/http_block_getter_worker.ts b/internal/block_getters/http_block_getter_worker.ts new file mode 100644 index 0000000..74be656 --- /dev/null +++ b/internal/block_getters/http_block_getter_worker.ts @@ -0,0 +1,49 @@ +import { IBlockWorkerMessage } from "../interfaces/block_worker_message.js"; +import { parentPort, workerData } from "worker_threads"; +import { HttpBlockGetter } from "./http_block_getter.js"; +import EthClass from "web3-eth"; + +if (!workerData || !parentPort) { + process.exit(1); +} + +const blockGetter = new HttpBlockGetter( + //@ts-ignore + new EthClass( + //@ts-ignore + new EthClass.providers.WebsocketProvider( + workerData.endpoint, + { + reconnect: { + auto: true + }, + clientConfig: { + maxReceivedFrameSize: 1000000000, + maxReceivedMessageSize: 1000000000, + }, + timeout: 45000 + } + ) + ), + workerData.maxRetries +); + +parentPort.on( + "message", + async (message: { blockNumber: number; callBackId: number }) => { + try { + parentPort?.postMessage({ + callBackId: message.callBackId, + error: null, + block: await blockGetter.getBlockWithTransactionReceipts( + message.blockNumber + ), + } as IBlockWorkerMessage); + } catch (error) { + parentPort?.postMessage({ + callBackId: message.callBackId, + error: error, + } as IBlockWorkerMessage); + } + } +); diff --git a/internal/block_getters/quicknode_block_getter.ts b/internal/block_getters/quicknode_block_getter.ts index c8cfb70..38e8381 100644 --- a/internal/block_getters/quicknode_block_getter.ts +++ b/internal/block_getters/quicknode_block_getter.ts @@ -36,31 +36,42 @@ export class QuickNodeBlockGetter extends BlockGetter implements IBlockGetter { */ public async getBlockWithTransactionReceipts(blockNumber: number | string, retryCount: number = 0): Promise { try { - const response: IQuickNodeResponse = await new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - reject(new Error(`Request timed out for block: ${blockNumber}`)); - }, this.rpcTimeout || 4000); + const response: IQuickNodeResponse = await new Promise( + async (resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error(`Request timed out for block: ${blockNumber}`)); + }, this.rpcTimeout ?? 4000); - let eth: Eth = this.eth; - if (retryCount > 0 && this.alternateEth) { - eth = this.alternateEth; - } - - (eth.currentProvider as WebsocketProvider).send({ - method: "qn_getBlockWithReceipts", - id: Date.now().toString() + blockNumber, - params: [utils.numberToHex(blockNumber)], - jsonrpc: "2.0" - }, (error, response) => { - if (error) { - clearTimeout(timeout); - reject(error); + let eth: Eth = this.eth; + if (retryCount > 0 && this.alternateEth) { + await new Promise(r => setTimeout(r, 2000)); + eth = this.alternateEth; } - clearTimeout(timeout); - resolve(response?.result); + (eth.currentProvider as WebsocketProvider).send({ + method: "qn_getBlockWithReceipts", + id: Date.now().toString() + blockNumber, + params: [utils.numberToHex(blockNumber)], + jsonrpc: "2.0" + }, (error, response) => { + if (error) { + clearTimeout(timeout); + reject(error); + } + + if (!(response?.result)) { + clearTimeout(timeout); + reject( + new Error( + `null response received for block: ${blockNumber}` + ) + ); + } + + clearTimeout(timeout); + resolve(response?.result); + }); }); - }); const transactions: ITransaction[] = []; diff --git a/internal/block_producers/block_producer.ts b/internal/block_producers/block_producer.ts index 98b17ac..42b71c1 100644 --- a/internal/block_producers/block_producer.ts +++ b/internal/block_producers/block_producer.ts @@ -89,6 +89,12 @@ export class BlockProducer extends AsynchronousProducer { await this.database.connect(); const metadata = await super.start(); + Logger.info({ + location: "block_producer", + function: "start", + message: "Producer started", + }); + this.on("delivered", async (report: DeliveryReport) => { if (report.partition === -1) { const error = new BlockProducerError( diff --git a/internal/block_subscription/block_subscription.ts b/internal/block_subscription/block_subscription.ts index 5ab40fa..3deef3d 100644 --- a/internal/block_subscription/block_subscription.ts +++ b/internal/block_subscription/block_subscription.ts @@ -71,6 +71,7 @@ export class BlockSubscription extends AbstractBlockSubscription { workerData } ); + worker.setMaxListeners(1000); worker.on("exit", () => { this.workers[i] = new Worker( @@ -79,6 +80,7 @@ export class BlockSubscription extends AbstractBlockSubscription { workerData } ); + this.workers[i].setMaxListeners(1000); }); workers.push(worker); diff --git a/internal/coder/protobuf_coder.ts b/internal/coder/protobuf_coder.ts index 750b908..cabbd58 100644 --- a/internal/coder/protobuf_coder.ts +++ b/internal/coder/protobuf_coder.ts @@ -3,6 +3,7 @@ import { CoderError } from "../errors/coder_error.js"; import protobuf, { Root, Type } from "protobufjs"; import { ICoder } from "../interfaces/coder.js"; import { createRequire } from "module"; +import { Logger } from "../logger/logger.js"; const { load } = protobuf; /** @@ -76,8 +77,20 @@ export class Coder implements ICoder { } try { + if (this.messageType === "L1StateBlock") { + Logger.info({message: "In coder deserialize - for L1StateBlock", data: { + base64: buffer.toString("base64"), + stringData: buffer.toString(), + buffer + }}); + } return this.protobufType.decode(buffer); } catch (error) { + Logger.error(error as any); + Logger.info({message: "Decodding Error: deserialize", data: { + buffer: buffer, + string: buffer.toString() + }}); throw new CoderError( "Decoding error", CoderError.codes.DECODING_ERROR, @@ -112,6 +125,15 @@ export class Coder implements ICoder { ); } - return this.protobufType.encode(messageObject).finish(); + const _buffer = this.protobufType.encode(messageObject).finish(); + + if (this.messageType === "L1StateBlock") { + Logger.info({message: "In coder serialize - for L1StateBlock", data: { + base64: (_buffer as Buffer).toString("base64"), + stringData: _buffer.toString(), + _buffer + }}); + } + return _buffer; } } diff --git a/internal/formatters/block_formatter.ts b/internal/formatters/block_formatter.ts index 922e743..9d37a11 100644 --- a/internal/formatters/block_formatter.ts +++ b/internal/formatters/block_formatter.ts @@ -9,6 +9,9 @@ import { IRawTransaction } from "../interfaces/raw_transaction.js"; import { IRawReceipt } from "../interfaces/raw_receipt.js"; import { IWeb3Transaction } from "../interfaces/web3_transaction.js"; import { IWeb3TransactionReceipt } from "../interfaces/web3_transaction_receipt.js"; +import { IHttpBlock } from "../interfaces/http_block.js"; +import { Logger } from "../logger/logger.js"; +import { IHttpTransaction } from "../interfaces/http_transaction.js"; import utils from "web3-utils"; @@ -82,32 +85,87 @@ export class BlockFormatter { /** * @protected - * + * + * Formats a raw block response returned by a JSON RPC request to evm client. + * + * @param {IRawBlock} block - The block object to be formatted. + * @param {[ITransaction]} transactions - Formatted transactions array that needs to be added + * to the formatted block object. + * + * @returns {IBlock} - Formatted block object with transactions and transaction receipts. + */ + protected formatHttpBlock( + block: IHttpBlock, + transactions: ITransaction[] + ): IBlock { + return this.formatHttpBlockWithTransactions( + //@ts-ignore + block, + transactions + ); + } + + /** + * @protected + * * Formats a block object that is returned by 'web3.js'. - * + * * @param {BlockTransactionObject} block - The block object to be formatted returned by 'web3.js'. * @param {[ITransaction]} transactions - Formatted transactions array that needs to be added * to the formatted block object. - * + * * @returns {IBlock} - Formatted block object with transactions and transaction receipts. */ - protected formatBlockWithTransactions(block: BlockTransactionObject, transactions: ITransaction[]): IBlock { + protected formatHttpBlockWithTransactions( + block: IHttpBlock, + transactions: ITransaction[] + ): IBlock { + Logger.debug(`formatting block with transactions ${block.number}`); return { ...block, - nonce: Long.fromValue( - utils.hexToNumberString( - block.nonce - ), - true - ), + nonce: Long.fromValue(utils.hexToNumberString(block.nonce), true), + difficulty: utils.toHex(block.difficulty), + totalDifficulty: utils.toHex(block.totalDifficulty), + timestamp: Long.fromValue(parseInt(block.timestamp) * 1000, true), + number: Long.fromValue(parseInt(block.number), true), + baseFeePerGas: + block.baseFeePerGas || block.baseFeePerGas === "0x0" + ? block.baseFeePerGas + : undefined, + size: block.size, + transactions: transactions, + gasLimit: Long.fromValue(block.gasLimit, true), + gasUsed: Long.fromValue(parseInt(block.gasUsed), true), + }; + } + + /** + * @protected + * + * Formats a block object that is returned by 'web3.js'. + * + * @param {BlockTransactionObject} block - The block object to be formatted returned by 'web3.js'. + * @param {[ITransaction]} transactions - Formatted transactions array that needs to be added + * to the formatted block object. + * + * @returns {IBlock} - Formatted block object with transactions and transaction receipts. + */ + protected formatBlockWithTransactions( + block: BlockTransactionObject, + transactions: ITransaction[] + ): IBlock { + Logger.debug(`formatting block with transactions ${block.number}`); + return { + ...block, + nonce: Long.fromValue(utils.hexToNumberString(block.nonce), true), difficulty: utils.toHex(block.difficulty), totalDifficulty: utils.toHex(block.totalDifficulty), timestamp: Long.fromValue((block.timestamp as number) * 1000, true), number: Long.fromValue(block.number, true), - baseFeePerGas: (block.baseFeePerGas || block.baseFeePerGas === 0 ? - utils.toHex(block.baseFeePerGas) : - undefined - ), + baseFeePerGas: + block.baseFeePerGas || block.baseFeePerGas === 0 + ? utils.toHex(block.baseFeePerGas) + : undefined, size: utils.toHex(block.size), transactions: transactions, gasLimit: Long.fromValue(block.gasLimit, true), @@ -116,70 +174,121 @@ export class BlockFormatter { } /** - * @protected - * - * Formats a raw transaction object that is returned by the 'web3.js' formatter. - * + * @protected + * + * Formats a raw transaction object that is returned by the 'web3.js' formatter. + * * @param {IWeb3Transaction} transactionObject - The transaction object to format. - * @param {ITransactionReceipt} receipt - Formatted transaction receipt object to be + * @param {ITransactionReceipt} receipt - Formatted transaction receipt object to be * added to transaction object. - * + * * @returns {ITransaction} - The formatted transaction object. */ - protected formatTransactionObject(transactionObject: IWeb3Transaction, receipt: ITransactionReceipt): ITransaction { + protected formatTransactionObject( + transactionObject: IWeb3Transaction, + receipt: ITransactionReceipt + ): ITransaction { return { ...transactionObject, receipt, value: utils.toHex(transactionObject.value), - transactionIndex: ( - transactionObject.transactionIndex || transactionObject.transactionIndex === 0 ? - Long.fromValue(transactionObject.transactionIndex, true) : - null - ), + transactionIndex: + transactionObject.transactionIndex || + transactionObject.transactionIndex === 0 + ? Long.fromValue(transactionObject.transactionIndex, true) + : null, gas: Long.fromValue(transactionObject.gas, true), gasPrice: utils.toHex(transactionObject.gasPrice), nonce: Long.fromValue(transactionObject.nonce, true), - maxFeePerGas: ( - transactionObject.maxFeePerGas || transactionObject.maxFeePerGas === 0 ? - utils.toHex( - transactionObject.maxFeePerGas - ) : - undefined - ), - maxPriorityFeePerGas: ( - transactionObject.maxPriorityFeePerGas || transactionObject.maxPriorityFeePerGas === 0 ? - utils.toHex( - transactionObject.maxPriorityFeePerGas - ) : - undefined - ), - blockNumber: ( - transactionObject.blockNumber || transactionObject.blockNumber === 0 ? - Long.fromValue(transactionObject.blockNumber, true) : - null - ) + maxFeePerGas: + transactionObject.maxFeePerGas || + transactionObject.maxFeePerGas === 0 + ? utils.toHex(transactionObject.maxFeePerGas) + : undefined, + maxPriorityFeePerGas: + transactionObject.maxPriorityFeePerGas || + transactionObject.maxPriorityFeePerGas === 0 + ? utils.toHex(transactionObject.maxPriorityFeePerGas) + : undefined, + blockNumber: + transactionObject.blockNumber || + transactionObject.blockNumber === 0 + ? Long.fromValue(transactionObject.blockNumber, true) + : null, + }; + } + + /** + * @protected + * + * Formats a raw transaction object that is returned by the 'web3.js' formatter. + * + * @param {IWeb3Transaction} transactionObject - The transaction object to format. + * @param {ITransactionReceipt} receipt - Formatted transaction receipt object to be + * added to transaction object. + * + * @returns {ITransaction} - The formatted transaction object. + */ + protected formatHttpTransactionObject( + transactionObject: IHttpTransaction, + receipt: ITransactionReceipt + ): ITransaction { + return { + ...transactionObject, + receipt, + value: utils.toHex(transactionObject.value), + transactionIndex: + transactionObject.transactionIndex || + transactionObject.transactionIndex === "0x0" + ? Long.fromValue( + parseInt(transactionObject.transactionIndex), + true + ) + : null, + gas: Long.fromValue(parseInt(transactionObject.gas), true), + gasPrice: transactionObject.gasPrice, + nonce: Long.fromValue(parseInt(transactionObject.nonce), true), + type: Number(transactionObject.nonce), + maxFeePerGas: + transactionObject.maxFeePerGas || + transactionObject.maxFeePerGas === "0x0" + ? transactionObject.maxFeePerGas + : undefined, + maxPriorityFeePerGas: + transactionObject.maxPriorityFeePerGas || + transactionObject.maxPriorityFeePerGas === "0x0" + ? transactionObject.maxPriorityFeePerGas + : undefined, + blockNumber: + transactionObject.blockNumber || + transactionObject.blockNumber === "0" + ? Long.fromValue( + parseInt(transactionObject.blockNumber), + true + ) + : null, }; } /** - * @protected - * - * Formats transaction receipt object returned or formatted by 'web3.js'. - * - * @param {IRawReceipt} transactionReceipt - The transaction receipt to format. - * - * @returns { ITransactionReceipt | void } - The formatted transaction receipt object. - */ - protected formatTransactionReceipt(transactionReceipt: IWeb3TransactionReceipt): ITransactionReceipt { + * @protected + * + * Formats transaction receipt object returned or formatted by 'web3.js'. + * + * @param {IRawReceipt} transactionReceipt - The transaction receipt to format. + * + * @returns { ITransactionReceipt | void } - The formatted transaction receipt object. + */ + protected formatTransactionReceipt( + transactionReceipt: IWeb3TransactionReceipt + ): ITransactionReceipt { return { ...transactionReceipt, - effectiveGasPrice: ( - transactionReceipt.effectiveGasPrice || transactionReceipt.effectiveGasPrice === 0 ? - utils.toHex( - transactionReceipt.effectiveGasPrice - ) : - undefined - ), + effectiveGasPrice: + transactionReceipt.effectiveGasPrice || + transactionReceipt.effectiveGasPrice === 0 + ? utils.toHex(transactionReceipt.effectiveGasPrice) + : undefined, cumulativeGasUsed: Long.fromValue( transactionReceipt.cumulativeGasUsed, true @@ -188,22 +297,19 @@ export class BlockFormatter { transactionReceipt.transactionIndex, true ), - blockNumber: Long.fromValue( - transactionReceipt.blockNumber, - true - ), - gasUsed: Long.fromValue( - transactionReceipt.gasUsed, - true - ), + blockNumber: Long.fromValue(transactionReceipt.blockNumber, true), + gasUsed: Long.fromValue(transactionReceipt.gasUsed, true), logs: transactionReceipt.logs.map((log) => { return { ...log, - transactionIndex: Long.fromValue(log.transactionIndex, true), + transactionIndex: Long.fromValue( + log.transactionIndex, + true + ), logIndex: Long.fromValue(log.logIndex, true), - blockNumber: Long.fromValue(log.blockNumber, true) + blockNumber: Long.fromValue(log.blockNumber, true), }; - }) + }), }; } } diff --git a/internal/interfaces/deposit.ts b/internal/interfaces/deposit.ts index 7e71e17..312fd7a 100644 --- a/internal/interfaces/deposit.ts +++ b/internal/interfaces/deposit.ts @@ -12,5 +12,6 @@ export interface IDeposit { tokenIds?: string[], timestamp?: number, rootTunnelAddress?: string - refuel?: boolean + refuel?: boolean, + nonce?: number } diff --git a/internal/interfaces/http_block.ts b/internal/interfaces/http_block.ts new file mode 100644 index 0000000..e954ca3 --- /dev/null +++ b/internal/interfaces/http_block.ts @@ -0,0 +1,24 @@ +import { IHttpTransaction } from "./http_transaction.js"; + +export interface IHttpBlock { + difficulty: string; + totalDifficulty: string; + number: string; + gasLimit: string; + baseFeePerGas?: string; + gasUsed: string; + logsBloom: string; + hash: string; + parentHash: string; + receiptsRoot: string; + sha3Uncles: string; + size: string; + stateRoot: string; + timestamp: string; + transactionsRoot: string; + miner: string; + nonce: string; + extraData: string; + transactions: IHttpTransaction[]; + uncles?: string[]; +} diff --git a/internal/interfaces/http_transaction.ts b/internal/interfaces/http_transaction.ts new file mode 100644 index 0000000..daa0446 --- /dev/null +++ b/internal/interfaces/http_transaction.ts @@ -0,0 +1,20 @@ +export interface IHttpTransaction { + hash: string; + nonce: string; + blockHash: string | null; + blockNumber: string; + transactionIndex?: string; + from: string; + to: string | null; + value: string | string; + gasPrice: string | string; + gas: string; + input: string; + maxFeePerGas?: string; + maxPriorityFeePerGas?: string; + chainId: string; + v: string; + r: string; + s: string; + type: number; +} diff --git a/internal/interfaces/index.ts b/internal/interfaces/index.ts index c071d2d..89814ea 100644 --- a/internal/interfaces/index.ts +++ b/internal/interfaces/index.ts @@ -15,6 +15,8 @@ export * from "./consumer_queue_object.js"; export * from "./deposit.js"; export * from "./deserialised_kafka_message.js"; export * from "./event_log.js"; +export * from "./http_block.js"; +export * from "./http_transaction.js"; export * from "./kafka_coder_config.js"; export * from "./logger_config.js"; export * from "./mapper.js"; diff --git a/public/block_producers/block_polling_producer.ts b/public/block_producers/block_polling_producer.ts index 8449685..a0042b4 100644 --- a/public/block_producers/block_polling_producer.ts +++ b/public/block_producers/block_polling_producer.ts @@ -22,13 +22,13 @@ export class BlockPollerProducer extends BlockProducer { * @returns {BlockPollerProducer} */ constructor(config: IBlockProducerConfig) { - const endpoint = config.rpcWsEndpoints?.[0] || ""; - const startBlock = config.startBlock || 0; - const mongoUrl = config.mongoUrl || "mongodb://localhost:27017/chain-indexer"; + const endpoint = config.rpcWsEndpoints?.[0] ?? ""; + const startBlock = config.startBlock ?? 0; + const mongoUrl = config.mongoUrl ?? "mongodb://localhost:27017/chain-indexer"; const dbCollection = config.dbCollection ?? "producedblocks"; - const blockPollingTimeout = config.blockPollingTimeout || 2000; - const maxRetries = config.maxRetries || 0; - const maxReOrgDepth = config.maxReOrgDepth || 0; + const blockPollingTimeout = config.blockPollingTimeout ?? 2000; + const maxRetries = config.maxRetries ?? 0; + const maxReOrgDepth = config.maxReOrgDepth ?? 0; delete config.rpcWsEndpoints; delete config.startBlock; diff --git a/public/block_producers/block_producer.ts b/public/block_producers/block_producer.ts index 16bd11c..244f3c3 100644 --- a/public/block_producers/block_producer.ts +++ b/public/block_producers/block_producer.ts @@ -25,12 +25,12 @@ export class BlockProducer extends InternalBlockProducer { * @returns {BlockProducer} */ constructor(config: IBlockProducerConfig) { - const endpoints = config.rpcWsEndpoints || []; - const startBlock = config.startBlock || 0; - const mongoUrl = config.mongoUrl || "mongodb://localhost:27017/chain-indexer"; + const endpoints = config.rpcWsEndpoints ?? []; + const startBlock = config.startBlock ?? 0; + const mongoUrl = config.mongoUrl ?? "mongodb://localhost:27017/chain-indexer"; const dbCollection = config.dbCollection ?? "producedblocks"; - const maxReOrgDepth = config.maxReOrgDepth || 0; - const maxRetries = config.maxRetries || 0; + const maxReOrgDepth = config.maxReOrgDepth ?? 0; + const maxRetries = config.maxRetries ?? 0; const blockSubscriptionTimeout = config.blockSubscriptionTimeout; // Has to be done or Kafka complains later diff --git a/public/block_producers/erigon_block_producer.ts b/public/block_producers/erigon_block_producer.ts index 6374954..edad3de 100644 --- a/public/block_producers/erigon_block_producer.ts +++ b/public/block_producers/erigon_block_producer.ts @@ -23,12 +23,12 @@ export class ErigonBlockProducer extends BlockProducer { * @returns {ErigonBlockProducer} */ constructor(config: IBlockProducerConfig) { - const endpoints = config.rpcWsEndpoints || []; - const startBlock = config.startBlock || 0; - const mongoUrl = config.mongoUrl || "mongodb://localhost:27017/chain-indexer"; + const endpoints = config.rpcWsEndpoints ?? []; + const startBlock = config.startBlock ?? 0; + const mongoUrl = config.mongoUrl ?? "mongodb://localhost:27017/chain-indexer"; const dbCollection = config.dbCollection ?? "producedblocks"; - const maxReOrgDepth = config.maxReOrgDepth || 0; - const maxRetries = config.maxRetries || 0; + const maxReOrgDepth = config.maxReOrgDepth ?? 0; + const maxRetries = config.maxRetries ?? 0; const blockSubscriptionTimeout = config.blockSubscriptionTimeout; // Has to be done or Kafka complains later diff --git a/public/block_producers/http_block_producer.ts b/public/block_producers/http_block_producer.ts new file mode 100644 index 0000000..f7f4844 --- /dev/null +++ b/public/block_producers/http_block_producer.ts @@ -0,0 +1,72 @@ +import { IProducedBlock, ProducedBlocksModel, IProducedBlocksModel } from "@internal/block_producers/produced_blocks_model.js"; +import { IBlockProducerConfig } from "@internal/interfaces/block_producer_config.js"; +import { IProducerConfig } from "@internal/interfaces/producer_config.js"; +import { HttpBlockGetter } from "@internal/block_getters/http_block_getter.js"; +import { Coder } from "@internal/coder/protobuf_coder.js"; +import { BlockPoller } from "@internal/block_subscription/block_polling.js"; +import { Database } from "@internal/mongo/database.js"; +import { BlockProducer } from "@internal/block_producers/block_producer.js"; +import Eth from "web3-eth"; + +/** + * HttpBlockProducer block producer class which retrieves block from http node + * for producing to kafka. + * + * @author - Rajesh Chaganti + */ +export class HttpBlockProducer extends BlockProducer { + /** + * @constructor + * + * @param {IBlockProducerConfig} config + * + * @returns {HttpBlockProducer} + */ + constructor(config: IBlockProducerConfig) { + const endpoint = config.rpcWsEndpoints?.[0] ?? ""; + const startBlock = config.startBlock ?? 0; + const dbCollection = config.dbCollection ?? "producedblocks"; + const mongoUrl = config.mongoUrl ?? "mongodb://localhost:27017/open-api"; + const blockPollingTimeout = config.blockPollingTimeout ?? 2000; + const maxRetries = config.maxRetries ?? 0; + const maxReOrgDepth = config.maxReOrgDepth ?? 0; + + delete config.rpcWsEndpoints; + delete config.startBlock; + delete config.mongoUrl; + delete config.dbCollection; + delete config.maxReOrgDepth; + delete config.maxRetries; + delete config.blockPollingTimeout; + + const database = new Database(mongoUrl); + + const blockGetter = new HttpBlockGetter( + //@ts-ignore + new Eth(endpoint), + maxRetries + ); + + super( + new Coder( + "block", + "blockpackage", + "Block" + ), + config as IProducerConfig, + new BlockPoller( + blockGetter, + blockPollingTimeout + ), + blockGetter, + database, + database.model>( + "ProducedBlocks", + ProducedBlocksModel, + dbCollection + ), + startBlock, + maxReOrgDepth + ); + } +} diff --git a/public/block_producers/quicknode_block_producer.ts b/public/block_producers/quicknode_block_producer.ts index be69d8c..c1ec0ff 100644 --- a/public/block_producers/quicknode_block_producer.ts +++ b/public/block_producers/quicknode_block_producer.ts @@ -24,14 +24,14 @@ export class QuickNodeBlockProducer extends BlockProducer { */ constructor(config: IBlockProducerConfig) { - const endpoints = config.rpcWsEndpoints || []; - const startBlock = config.startBlock || 0; - const mongoUrl = config.mongoUrl || "mongodb://localhost:27017/chain-indexer"; + const endpoints = config.rpcWsEndpoints ?? []; + const startBlock = config.startBlock ?? 0; + const mongoUrl = config.mongoUrl ?? "mongodb://localhost:27017/chain-indexer"; const dbCollection = config.dbCollection ?? "producedblocks"; - const maxReOrgDepth = config.maxReOrgDepth || 0; - const maxRetries = config.maxRetries || 0; + const maxReOrgDepth = config.maxReOrgDepth ?? 0; + const maxRetries = config.maxRetries ?? 0; const blockSubscriptionTimeout = config.blockSubscriptionTimeout; - const blockDelay = config.blockDelay || 0; + const blockDelay = config.blockDelay ?? 0; const alternateEndpoint = config.alternateEndpoint; const rpcTimeout = config.rpcTimeout; diff --git a/public/index.ts b/public/index.ts index d284f37..cd0ecbb 100644 --- a/public/index.ts +++ b/public/index.ts @@ -1,6 +1,7 @@ // block_producer export * from "./block_producers/block_polling_producer.js"; export * from "./block_producers/erigon_block_producer.js"; +export * from "./block_producers/http_block_producer.js"; export * from "./block_producers/quicknode_block_producer.js"; export * from "./block_producers/block_producer.js"; diff --git a/public/interfaces/http_block.ts b/public/interfaces/http_block.ts new file mode 100644 index 0000000..82fcae1 --- /dev/null +++ b/public/interfaces/http_block.ts @@ -0,0 +1 @@ +export * from "@internal/interfaces/http_block.js"; diff --git a/public/interfaces/http_transaction.ts b/public/interfaces/http_transaction.ts new file mode 100644 index 0000000..d1030f9 --- /dev/null +++ b/public/interfaces/http_transaction.ts @@ -0,0 +1 @@ +export * from "@internal/interfaces/http_transaction.js"; diff --git a/schemas/burnblock.proto b/schemas/burnblock.proto index 422b769..84ae987 100644 --- a/schemas/burnblock.proto +++ b/schemas/burnblock.proto @@ -11,7 +11,7 @@ message BurnBlock { repeated string amounts = 6; repeated uint64 tokenIds = 7; string bridgeType = 8; - + bool isCctp = 9; } required uint64 blockNumber = 1; diff --git a/schemas/depositblock.proto b/schemas/depositblock.proto index 11ebbe2..5e17080 100644 --- a/schemas/depositblock.proto +++ b/schemas/depositblock.proto @@ -13,6 +13,7 @@ message DepositBlock { repeated uint64 tokenIds = 8; string rootTunnelAddress = 9; bool refuel = 10; + string nonce = 11; } required uint64 blockNumber = 1; From faae5eb3e1cbbe833a356b43aee9d821c397ff4a Mon Sep 17 00:00:00 2001 From: Nitin Mittal Date: Mon, 4 Mar 2024 19:02:31 +0400 Subject: [PATCH 2/4] rmv: http producer --- internal/block_getters/http_block_getter.ts | 143 ------------------ .../block_getters/http_block_getter_worker.ts | 49 ------ internal/formatters/block_formatter.ts | 110 -------------- internal/interfaces/http_block.ts | 24 --- internal/interfaces/http_transaction.ts | 20 --- internal/interfaces/index.ts | 2 - public/block_producers/http_block_producer.ts | 72 --------- public/index.ts | 1 - public/interfaces/http_block.ts | 1 - public/interfaces/http_transaction.ts | 1 - schemas/mappings.proto | 2 +- 11 files changed, 1 insertion(+), 424 deletions(-) delete mode 100644 internal/block_getters/http_block_getter.ts delete mode 100644 internal/block_getters/http_block_getter_worker.ts delete mode 100644 internal/interfaces/http_block.ts delete mode 100644 internal/interfaces/http_transaction.ts delete mode 100644 public/block_producers/http_block_producer.ts delete mode 100644 public/interfaces/http_block.ts delete mode 100644 public/interfaces/http_transaction.ts diff --git a/internal/block_getters/http_block_getter.ts b/internal/block_getters/http_block_getter.ts deleted file mode 100644 index 4ce2631..0000000 --- a/internal/block_getters/http_block_getter.ts +++ /dev/null @@ -1,143 +0,0 @@ -import { Block, Eth } from "web3-eth"; -import utils from "web3-utils"; -import { WebsocketProvider } from "web3-core"; -import { IBlock } from "../interfaces/block.js"; -import { BlockGetter } from "./block_getter.js"; -import { IBlockGetter } from "../interfaces/block_getter.js"; -import { ITransaction } from "../interfaces/transaction.js"; -import { Logger } from "../logger/logger.js"; -import { IHttpBlock } from "../interfaces/http_block.js"; -import { IHttpTransaction } from "../interfaces/http_transaction.js"; - -/** - * A wrapper class on web3 block related functions - * - */ -export class HttpBlockGetter extends BlockGetter implements IBlockGetter { - /** - * @param {Eth} eth - Eth module from web3.js - * @param {number} maxRetries - The number of times to retry on errors. - * @param {rpcTimeout} rpcTimeout - Option param to set timeout on the RPC call - * - * @constructor - */ - constructor(eth: Eth, maxRetries: number = 0, private rpcTimeout?: number) { - super(eth, maxRetries); - } - - /** - * @async - * Public method to query block data of a single block - * - * @param {number | string} blockNumber - Block number to query the block details for. - * - * @returns {Promise} - Block object - */ - public async getBlock(blockNumber: number | string): Promise { - Logger.debug("getBlock called on getter"); - return await new Promise(async (resolve, reject) => { - const timeout = setTimeout(() => { - reject( - new Error(`Request timed out for block: ${blockNumber}`) - ); - }, this.rpcTimeout ?? 4000); - const eth: Eth = this.eth; - (eth.currentProvider as WebsocketProvider).send( - { - method: "eth_getBlockByNumber", - id: Date.now().toString() + blockNumber, - params: [utils.numberToHex(blockNumber)], - jsonrpc: "2.0", - }, - (error, response) => { - if (error) { - clearTimeout(timeout); - reject(error); - } - - if (!response?.result) { - clearTimeout(timeout); - reject( - new Error( - `null response received for block: ${blockNumber}` - ) - ); - } - - clearTimeout(timeout); - resolve(response?.result); - } - ); - }); - } - - /** - * @async - * Public method to query block data including transaction receipts of a single block. - * - * @param {number | string} blockNumber - The block number for which block data needs to be retrieved. - * - * @returns {Promise} - Block object containing all details including transaction receipts. - * - * @throws {Error} - Throws error object on failure. - */ - public async getBlockWithTransactionReceipts( - blockNumber: number | string - ): Promise { - Logger.debug(`Fetching block ${blockNumber}`); - const block: IHttpBlock = await new Promise(async (resolve, reject) => { - const timeout = setTimeout(() => { - reject( - new Error(`Request timed out for block: ${blockNumber}`) - ); - }, this.rpcTimeout ?? 4000); - const eth: Eth = this.eth; - (eth.currentProvider as WebsocketProvider).send( - { - method: "eth_getBlockByNumber", - id: Date.now().toString() + blockNumber, - params: [utils.numberToHex(blockNumber), true], - jsonrpc: "2.0", - }, - (error, response) => { - if (error) { - clearTimeout(timeout); - reject(error); - } - - if (!response?.result) { - clearTimeout(timeout); - reject( - new Error( - `null response received for block: ${blockNumber}` - ) - ); - } - - clearTimeout(timeout); - resolve(response?.result); - } - ); - }); - Logger.debug( - `Fetching transaction receipts for the following block ${block.number}` - ); - - const transactions: ITransaction[] = []; - - for (const transactionObject of block.transactions) { - Logger.debug(`Processing transaction object ${transactionObject}`); - transactions.push( - this.formatHttpTransactionObject( - transactionObject as IHttpTransaction, - await this.getTransactionReceipt(transactionObject.hash) - ) - ); - } - Logger.debug( - `Fetched transactions receipts successfully for the following block ${block.number}` - ); - - return this.formatHttpBlock(block, transactions); - } -} diff --git a/internal/block_getters/http_block_getter_worker.ts b/internal/block_getters/http_block_getter_worker.ts deleted file mode 100644 index 74be656..0000000 --- a/internal/block_getters/http_block_getter_worker.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { IBlockWorkerMessage } from "../interfaces/block_worker_message.js"; -import { parentPort, workerData } from "worker_threads"; -import { HttpBlockGetter } from "./http_block_getter.js"; -import EthClass from "web3-eth"; - -if (!workerData || !parentPort) { - process.exit(1); -} - -const blockGetter = new HttpBlockGetter( - //@ts-ignore - new EthClass( - //@ts-ignore - new EthClass.providers.WebsocketProvider( - workerData.endpoint, - { - reconnect: { - auto: true - }, - clientConfig: { - maxReceivedFrameSize: 1000000000, - maxReceivedMessageSize: 1000000000, - }, - timeout: 45000 - } - ) - ), - workerData.maxRetries -); - -parentPort.on( - "message", - async (message: { blockNumber: number; callBackId: number }) => { - try { - parentPort?.postMessage({ - callBackId: message.callBackId, - error: null, - block: await blockGetter.getBlockWithTransactionReceipts( - message.blockNumber - ), - } as IBlockWorkerMessage); - } catch (error) { - parentPort?.postMessage({ - callBackId: message.callBackId, - error: error, - } as IBlockWorkerMessage); - } - } -); diff --git a/internal/formatters/block_formatter.ts b/internal/formatters/block_formatter.ts index 9d37a11..dc8496f 100644 --- a/internal/formatters/block_formatter.ts +++ b/internal/formatters/block_formatter.ts @@ -9,9 +9,7 @@ import { IRawTransaction } from "../interfaces/raw_transaction.js"; import { IRawReceipt } from "../interfaces/raw_receipt.js"; import { IWeb3Transaction } from "../interfaces/web3_transaction.js"; import { IWeb3TransactionReceipt } from "../interfaces/web3_transaction_receipt.js"; -import { IHttpBlock } from "../interfaces/http_block.js"; import { Logger } from "../logger/logger.js"; -import { IHttpTransaction } from "../interfaces/http_transaction.js"; import utils from "web3-utils"; @@ -83,62 +81,6 @@ export class BlockFormatter { ); } - /** - * @protected - * - * Formats a raw block response returned by a JSON RPC request to evm client. - * - * @param {IRawBlock} block - The block object to be formatted. - * @param {[ITransaction]} transactions - Formatted transactions array that needs to be added - * to the formatted block object. - * - * @returns {IBlock} - Formatted block object with transactions and transaction receipts. - */ - protected formatHttpBlock( - block: IHttpBlock, - transactions: ITransaction[] - ): IBlock { - return this.formatHttpBlockWithTransactions( - //@ts-ignore - block, - transactions - ); - } - - /** - * @protected - * - * Formats a block object that is returned by 'web3.js'. - * - * @param {BlockTransactionObject} block - The block object to be formatted returned by 'web3.js'. - * @param {[ITransaction]} transactions - Formatted transactions array that needs to be added - * to the formatted block object. - * - * @returns {IBlock} - Formatted block object with transactions and transaction receipts. - */ - protected formatHttpBlockWithTransactions( - block: IHttpBlock, - transactions: ITransaction[] - ): IBlock { - Logger.debug(`formatting block with transactions ${block.number}`); - return { - ...block, - nonce: Long.fromValue(utils.hexToNumberString(block.nonce), true), - difficulty: utils.toHex(block.difficulty), - totalDifficulty: utils.toHex(block.totalDifficulty), - timestamp: Long.fromValue(parseInt(block.timestamp) * 1000, true), - number: Long.fromValue(parseInt(block.number), true), - baseFeePerGas: - block.baseFeePerGas || block.baseFeePerGas === "0x0" - ? block.baseFeePerGas - : undefined, - size: block.size, - transactions: transactions, - gasLimit: Long.fromValue(block.gasLimit, true), - gasUsed: Long.fromValue(parseInt(block.gasUsed), true), - }; - } - /** * @protected * @@ -218,58 +160,6 @@ export class BlockFormatter { }; } - /** - * @protected - * - * Formats a raw transaction object that is returned by the 'web3.js' formatter. - * - * @param {IWeb3Transaction} transactionObject - The transaction object to format. - * @param {ITransactionReceipt} receipt - Formatted transaction receipt object to be - * added to transaction object. - * - * @returns {ITransaction} - The formatted transaction object. - */ - protected formatHttpTransactionObject( - transactionObject: IHttpTransaction, - receipt: ITransactionReceipt - ): ITransaction { - return { - ...transactionObject, - receipt, - value: utils.toHex(transactionObject.value), - transactionIndex: - transactionObject.transactionIndex || - transactionObject.transactionIndex === "0x0" - ? Long.fromValue( - parseInt(transactionObject.transactionIndex), - true - ) - : null, - gas: Long.fromValue(parseInt(transactionObject.gas), true), - gasPrice: transactionObject.gasPrice, - nonce: Long.fromValue(parseInt(transactionObject.nonce), true), - type: Number(transactionObject.nonce), - maxFeePerGas: - transactionObject.maxFeePerGas || - transactionObject.maxFeePerGas === "0x0" - ? transactionObject.maxFeePerGas - : undefined, - maxPriorityFeePerGas: - transactionObject.maxPriorityFeePerGas || - transactionObject.maxPriorityFeePerGas === "0x0" - ? transactionObject.maxPriorityFeePerGas - : undefined, - blockNumber: - transactionObject.blockNumber || - transactionObject.blockNumber === "0" - ? Long.fromValue( - parseInt(transactionObject.blockNumber), - true - ) - : null, - }; - } - /** * @protected * diff --git a/internal/interfaces/http_block.ts b/internal/interfaces/http_block.ts deleted file mode 100644 index e954ca3..0000000 --- a/internal/interfaces/http_block.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { IHttpTransaction } from "./http_transaction.js"; - -export interface IHttpBlock { - difficulty: string; - totalDifficulty: string; - number: string; - gasLimit: string; - baseFeePerGas?: string; - gasUsed: string; - logsBloom: string; - hash: string; - parentHash: string; - receiptsRoot: string; - sha3Uncles: string; - size: string; - stateRoot: string; - timestamp: string; - transactionsRoot: string; - miner: string; - nonce: string; - extraData: string; - transactions: IHttpTransaction[]; - uncles?: string[]; -} diff --git a/internal/interfaces/http_transaction.ts b/internal/interfaces/http_transaction.ts deleted file mode 100644 index daa0446..0000000 --- a/internal/interfaces/http_transaction.ts +++ /dev/null @@ -1,20 +0,0 @@ -export interface IHttpTransaction { - hash: string; - nonce: string; - blockHash: string | null; - blockNumber: string; - transactionIndex?: string; - from: string; - to: string | null; - value: string | string; - gasPrice: string | string; - gas: string; - input: string; - maxFeePerGas?: string; - maxPriorityFeePerGas?: string; - chainId: string; - v: string; - r: string; - s: string; - type: number; -} diff --git a/internal/interfaces/index.ts b/internal/interfaces/index.ts index 89814ea..c071d2d 100644 --- a/internal/interfaces/index.ts +++ b/internal/interfaces/index.ts @@ -15,8 +15,6 @@ export * from "./consumer_queue_object.js"; export * from "./deposit.js"; export * from "./deserialised_kafka_message.js"; export * from "./event_log.js"; -export * from "./http_block.js"; -export * from "./http_transaction.js"; export * from "./kafka_coder_config.js"; export * from "./logger_config.js"; export * from "./mapper.js"; diff --git a/public/block_producers/http_block_producer.ts b/public/block_producers/http_block_producer.ts deleted file mode 100644 index f7f4844..0000000 --- a/public/block_producers/http_block_producer.ts +++ /dev/null @@ -1,72 +0,0 @@ -import { IProducedBlock, ProducedBlocksModel, IProducedBlocksModel } from "@internal/block_producers/produced_blocks_model.js"; -import { IBlockProducerConfig } from "@internal/interfaces/block_producer_config.js"; -import { IProducerConfig } from "@internal/interfaces/producer_config.js"; -import { HttpBlockGetter } from "@internal/block_getters/http_block_getter.js"; -import { Coder } from "@internal/coder/protobuf_coder.js"; -import { BlockPoller } from "@internal/block_subscription/block_polling.js"; -import { Database } from "@internal/mongo/database.js"; -import { BlockProducer } from "@internal/block_producers/block_producer.js"; -import Eth from "web3-eth"; - -/** - * HttpBlockProducer block producer class which retrieves block from http node - * for producing to kafka. - * - * @author - Rajesh Chaganti - */ -export class HttpBlockProducer extends BlockProducer { - /** - * @constructor - * - * @param {IBlockProducerConfig} config - * - * @returns {HttpBlockProducer} - */ - constructor(config: IBlockProducerConfig) { - const endpoint = config.rpcWsEndpoints?.[0] ?? ""; - const startBlock = config.startBlock ?? 0; - const dbCollection = config.dbCollection ?? "producedblocks"; - const mongoUrl = config.mongoUrl ?? "mongodb://localhost:27017/open-api"; - const blockPollingTimeout = config.blockPollingTimeout ?? 2000; - const maxRetries = config.maxRetries ?? 0; - const maxReOrgDepth = config.maxReOrgDepth ?? 0; - - delete config.rpcWsEndpoints; - delete config.startBlock; - delete config.mongoUrl; - delete config.dbCollection; - delete config.maxReOrgDepth; - delete config.maxRetries; - delete config.blockPollingTimeout; - - const database = new Database(mongoUrl); - - const blockGetter = new HttpBlockGetter( - //@ts-ignore - new Eth(endpoint), - maxRetries - ); - - super( - new Coder( - "block", - "blockpackage", - "Block" - ), - config as IProducerConfig, - new BlockPoller( - blockGetter, - blockPollingTimeout - ), - blockGetter, - database, - database.model>( - "ProducedBlocks", - ProducedBlocksModel, - dbCollection - ), - startBlock, - maxReOrgDepth - ); - } -} diff --git a/public/index.ts b/public/index.ts index cd0ecbb..d284f37 100644 --- a/public/index.ts +++ b/public/index.ts @@ -1,7 +1,6 @@ // block_producer export * from "./block_producers/block_polling_producer.js"; export * from "./block_producers/erigon_block_producer.js"; -export * from "./block_producers/http_block_producer.js"; export * from "./block_producers/quicknode_block_producer.js"; export * from "./block_producers/block_producer.js"; diff --git a/public/interfaces/http_block.ts b/public/interfaces/http_block.ts deleted file mode 100644 index 82fcae1..0000000 --- a/public/interfaces/http_block.ts +++ /dev/null @@ -1 +0,0 @@ -export * from "@internal/interfaces/http_block.js"; diff --git a/public/interfaces/http_transaction.ts b/public/interfaces/http_transaction.ts deleted file mode 100644 index d1030f9..0000000 --- a/public/interfaces/http_transaction.ts +++ /dev/null @@ -1 +0,0 @@ -export * from "@internal/interfaces/http_transaction.js"; diff --git a/schemas/mappings.proto b/schemas/mappings.proto index b3176ce..0759d45 100644 --- a/schemas/mappings.proto +++ b/schemas/mappings.proto @@ -6,7 +6,7 @@ message Mappings { message Metadata { string name = 1; string symbol = 2; - uint32 decimal = 3; + uint32 decimals = 3; } message MappingEvent { From 348d44f33afa85c6b2493fe768e2d9f07a544f42 Mon Sep 17 00:00:00 2001 From: Nitin Mittal Date: Mon, 4 Mar 2024 19:11:38 +0400 Subject: [PATCH 3/4] rmv: isCctp --- schemas/burnblock.proto | 1 - 1 file changed, 1 deletion(-) diff --git a/schemas/burnblock.proto b/schemas/burnblock.proto index 84ae987..1908bbc 100644 --- a/schemas/burnblock.proto +++ b/schemas/burnblock.proto @@ -11,7 +11,6 @@ message BurnBlock { repeated string amounts = 6; repeated uint64 tokenIds = 7; string bridgeType = 8; - bool isCctp = 9; } required uint64 blockNumber = 1; From cf4bb98982d134ea8fe32d2156502cef58bd34e5 Mon Sep 17 00:00:00 2001 From: Nitin Mittal Date: Mon, 4 Mar 2024 19:19:06 +0400 Subject: [PATCH 4/4] rmv: logger --- internal/coder/protobuf_coder.ts | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/internal/coder/protobuf_coder.ts b/internal/coder/protobuf_coder.ts index cabbd58..e6e0cf0 100644 --- a/internal/coder/protobuf_coder.ts +++ b/internal/coder/protobuf_coder.ts @@ -86,11 +86,6 @@ export class Coder implements ICoder { } return this.protobufType.decode(buffer); } catch (error) { - Logger.error(error as any); - Logger.info({message: "Decodding Error: deserialize", data: { - buffer: buffer, - string: buffer.toString() - }}); throw new CoderError( "Decoding error", CoderError.codes.DECODING_ERROR, @@ -125,15 +120,6 @@ export class Coder implements ICoder { ); } - const _buffer = this.protobufType.encode(messageObject).finish(); - - if (this.messageType === "L1StateBlock") { - Logger.info({message: "In coder serialize - for L1StateBlock", data: { - base64: (_buffer as Buffer).toString("base64"), - stringData: _buffer.toString(), - _buffer - }}); - } - return _buffer; + return this.protobufType.encode(messageObject).finish(); } }