From 42500462a9095f955e3beee2cba4ed9e5038e6e0 Mon Sep 17 00:00:00 2001 From: Ashwin Ramaswami Date: Sun, 13 Mar 2022 04:01:09 +0000 Subject: [PATCH] update -- use method instead of timer, which is more performant --- web3.js/examples/get_account_info_batch.js | 38 ++++++++ web3.js/src/connection.ts | 102 +++++++++++++++------ web3.js/test/connection.test.ts | 80 ++++++++++++++++ 3 files changed, 192 insertions(+), 28 deletions(-) create mode 100644 web3.js/examples/get_account_info_batch.js diff --git a/web3.js/examples/get_account_info_batch.js b/web3.js/examples/get_account_info_batch.js new file mode 100644 index 00000000000000..400444f830d84b --- /dev/null +++ b/web3.js/examples/get_account_info_batch.js @@ -0,0 +1,38 @@ +import * as web3 from '@solana/web3.js'; + +(async () => { + // Connect to cluster + var connection = new web3.Connection( + web3.clusterApiUrl('devnet'), + 'confirmed', + ); + + // Generate a new wallet keypair and airdrop SOL + var wallet1 = web3.Keypair.generate(); + var airdropSignature = await connection.requestAirdrop( + wallet1.publicKey, + web3.LAMPORTS_PER_SOL, + ); + + //wait for airdrop confirmation + await connection.confirmTransaction(airdropSignature); + + // Generate a new wallet keypair and airdrop SOL + var wallet2 = web3.Keypair.generate(); + var airdropSignature2 = await connection.requestAirdrop( + wallet2.publicKey, + web3.LAMPORTS_PER_SOL, + ); + + //wait for airdrop confirmation + await connection.confirmTransaction(airdropSignature); + + // get both accounts' info through a single JSON RPC batch transaction + // account data is bytecode that needs to be deserialized + // serialization and deserialization is program specific + let [account1, account2] = await connection.performBatchRequest([ + () => connection.getAccountInfo(wallet1.publicKey), + () => connection.getAccountInfo(wallet2.publicKey) + ]); + console.log(account1, account2); +})(); diff --git a/web3.js/src/connection.ts b/web3.js/src/connection.ts index 13eedf899c0811..bba0213259e5c3 100644 --- a/web3.js/src/connection.ts +++ b/web3.js/src/connection.ts @@ -865,33 +865,18 @@ function createRpcRequest( connection: Connection, ): RpcRequest { return (method, args) => { - if (connection._autoBatch) { + if (connection._numRequestsToBatch > 0) { return new Promise((resolve, reject) => { - // Automatically batch requests every 100 ms. - const BATCH_INTERVAL_MS = 100; - - connection._batchRequests.push([ - client.request(method, args), + // Automatically queue request to be processed in this batch. + connection._batchedRequests.push({ + params: {methodName: method, args}, resolve, reject, - ]); - - if (!connection._pendingBatchTimer) { - connection._pendingBatchTimer = setTimeout(() => { - const batch = client.batchRequests.map((e: any) => e[0]); - client.request(batch, (err: any, response: any) => { - if (err) { - // Call reject handler of each promise - connection._batchRequests.map((e: any) => e[2](err)); - } else { - // Call resolve handler of each promise - connection._batchRequests.map((e: any, i: number) => - e[1](response[i]), - ); - } - connection._pendingBatchTimer = 0; - }); - }, BATCH_INTERVAL_MS); + }); + if ( + connection._batchedRequests.length === connection._numRequestsToBatch + ) { + connection._resolvePendingBatchRequests(); } }); } else { @@ -2119,9 +2104,14 @@ export class Connection { /** @internal */ _confirmTransactionInitialTimeout?: number; /** @internal */ _rpcEndpoint: string; /** @internal */ _rpcWsEndpoint: string; - /** @internal */ _autoBatch?: boolean; - /** @internal */ _batchRequests: any[] = []; - /** @internal */ _pendingBatchTimer: number = 0; + /** @internal */ _numRequestsToBatch: number = 0; + /** @internal */ _resolvePendingBatchRequests: (value?: unknown) => void = + () => null; + /** @internal */ _batchedRequests: { + params: RpcParams; + resolve: (value?: unknown) => void; + reject: (reason?: any) => void; + }[] = []; /** @internal */ _rpcClient: RpcClient; /** @internal */ _rpcRequest: RpcRequest; /** @internal */ _rpcBatchRequest: RpcBatchRequest; @@ -2210,7 +2200,6 @@ export class Connection { httpHeaders = commitmentOrConfig.httpHeaders; fetchMiddleware = commitmentOrConfig.fetchMiddleware; disableRetryOnRateLimit = commitmentOrConfig.disableRetryOnRateLimit; - this._autoBatch = commitmentOrConfig.autoBatch; } this._rpcEndpoint = endpoint; @@ -4009,6 +3998,63 @@ export class Connection { return res.result; } + /** + * Perform the provided requests in a single batch JSON RPC request. Basically, this function allows you to + * replace the following code, which executes multiple JSON RPC requests in parallel: + * + * Promise.all(addresses.map(address => connection.getSignaturesForAddress(address, undefined, 'confirmed')) + * + * with the below code, which batches all requests into a single JSON RPC request: + * + * connection.performBatchRequest(addresses.map(address => () => connection.getSignaturesForAddress(address, undefined, 'confirmed')) + * + * @param deferredRequests an array of functions, each which returns a promise to be batched. Each promise should call a + * method on this Connection instance that performs a non-batched request. Note: only methods on + * the Connection class that call _rpcRequest are supported (most do). + * @return {Promise>} an array of responses that correspond to each request. + */ + async performBatchRequest( + deferredRequests: Array<() => Promise>, + ): Promise> { + this._numRequestsToBatch = deferredRequests.length; + let promises: Array = []; + await new Promise((resolve, reject) => { + this._resolvePendingBatchRequests = resolve; + + // Begin executing the promises. + promises = deferredRequests.map(e => e().catch(reject)); + + // Each promise generates an RPC payload, and it stores + // that payload, resolve function, and reject function + // in this._batchedRequests. + // + // This outer Promise is resolved only when all the entries + // in this._batchedRequests are created, at which + // point _resolvePendingBatchRequests is called. + }); + + assert( + this._batchedRequests.length === this._numRequestsToBatch, + 'all requests were not properly batched', + ); + + // Now call the RPC batch request with the data. + try { + const unsafeRes = await this._rpcBatchRequest( + this._batchedRequests.map(e => e.params), + ); + + // Finally, resolve the promises created by deferredRequests with the appropriate data for each promise. + this._batchedRequests.forEach(({resolve}, i) => resolve(unsafeRes[i])); + } catch (err) { + // Propagate the error to the promises created by deferredRequests. + this._batchedRequests.forEach(({reject}) => reject(err)); + } + + // Await all promises so we return a list of the results from each one. + return Promise.all(promises); + } + /** * @internal */ diff --git a/web3.js/test/connection.test.ts b/web3.js/test/connection.test.ts index aa2aba618b4bcf..5de015b6c50ede 100644 --- a/web3.js/test/connection.test.ts +++ b/web3.js/test/connection.test.ts @@ -655,6 +655,86 @@ describe('Connection', () => { expect(balance).to.be.at.least(0); }); + it('get balance - batch a single request', async () => { + const account = Keypair.generate(); + + await mockRpcBatchResponse({ + batch: [ + { + methodName: 'getBalance', + args: [account.publicKey.toBase58()], + }, + ], + result: [ + { + context: { + slot: 11, + }, + value: 5, + }, + ], + }); + + const [balance] = await connection.performBatchRequest([ + () => connection.getBalance(account.publicKey), + ]); + expect(balance).to.equal(5); + }); + + it('get balance - batch multiple requests', async () => { + const account1 = Keypair.generate(); + const account2 = Keypair.generate(); + const account3 = Keypair.generate(); + + await mockRpcBatchResponse({ + batch: [ + { + methodName: 'getBalance', + args: [account1.publicKey.toBase58()], + }, + { + methodName: 'getBalance', + args: [account2.publicKey.toBase58()], + }, + { + methodName: 'getBalance', + args: [account3.publicKey.toBase58()], + }, + ], + result: [ + { + context: { + slot: 11, + }, + value: 5, + }, + { + context: { + slot: 11, + }, + value: 10, + }, + { + context: { + slot: 11, + }, + value: 15, + }, + ], + }); + + const [balance1, balance2, balance3] = await connection.performBatchRequest( + [ + () => connection.getBalance(account1.publicKey), + () => connection.getBalance(account2.publicKey), + () => connection.getBalance(account3.publicKey), + ], + ); + expect(balance1).to.equal(5); + expect(balance2).to.equal(10); + expect(balance3).to.equal(15); + }); + it('get inflation', async () => { await mockRpcResponse({ method: 'getInflationGovernor',