Skip to content

Commit

Permalink
update -- use method instead of timer, which is more performant
Browse files Browse the repository at this point in the history
  • Loading branch information
epicfaace committed Mar 13, 2022
1 parent 043c20e commit 4250046
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 28 deletions.
38 changes: 38 additions & 0 deletions web3.js/examples/get_account_info_batch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import * as web3 from '@solana/web3.js';

(async () => {
// Connect to cluster
var connection = new web3.Connection(
web3.clusterApiUrl('devnet'),
'confirmed',
);

// Generate a new wallet keypair and airdrop SOL
var wallet1 = web3.Keypair.generate();
var airdropSignature = await connection.requestAirdrop(
wallet1.publicKey,
web3.LAMPORTS_PER_SOL,
);

//wait for airdrop confirmation
await connection.confirmTransaction(airdropSignature);

// Generate a new wallet keypair and airdrop SOL
var wallet2 = web3.Keypair.generate();
var airdropSignature2 = await connection.requestAirdrop(
wallet2.publicKey,
web3.LAMPORTS_PER_SOL,
);

//wait for airdrop confirmation
await connection.confirmTransaction(airdropSignature);

// get both accounts' info through a single JSON RPC batch transaction
// account data is bytecode that needs to be deserialized
// serialization and deserialization is program specific
let [account1, account2] = await connection.performBatchRequest([
() => connection.getAccountInfo(wallet1.publicKey),
() => connection.getAccountInfo(wallet2.publicKey)
]);
console.log(account1, account2);
})();
102 changes: 74 additions & 28 deletions web3.js/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -865,33 +865,18 @@ function createRpcRequest(
connection: Connection,
): RpcRequest {
return (method, args) => {
if (connection._autoBatch) {
if (connection._numRequestsToBatch > 0) {
return new Promise((resolve, reject) => {
// Automatically batch requests every 100 ms.
const BATCH_INTERVAL_MS = 100;

connection._batchRequests.push([
client.request(method, args),
// Automatically queue request to be processed in this batch.
connection._batchedRequests.push({
params: {methodName: method, args},
resolve,
reject,
]);

if (!connection._pendingBatchTimer) {
connection._pendingBatchTimer = setTimeout(() => {
const batch = client.batchRequests.map((e: any) => e[0]);
client.request(batch, (err: any, response: any) => {
if (err) {
// Call reject handler of each promise
connection._batchRequests.map((e: any) => e[2](err));
} else {
// Call resolve handler of each promise
connection._batchRequests.map((e: any, i: number) =>
e[1](response[i]),
);
}
connection._pendingBatchTimer = 0;
});
}, BATCH_INTERVAL_MS);
});
if (
connection._batchedRequests.length === connection._numRequestsToBatch
) {
connection._resolvePendingBatchRequests();
}
});
} else {
Expand Down Expand Up @@ -2119,9 +2104,14 @@ export class Connection {
/** @internal */ _confirmTransactionInitialTimeout?: number;
/** @internal */ _rpcEndpoint: string;
/** @internal */ _rpcWsEndpoint: string;
/** @internal */ _autoBatch?: boolean;
/** @internal */ _batchRequests: any[] = [];
/** @internal */ _pendingBatchTimer: number = 0;
/** @internal */ _numRequestsToBatch: number = 0;
/** @internal */ _resolvePendingBatchRequests: (value?: unknown) => void =
() => null;
/** @internal */ _batchedRequests: {
params: RpcParams;
resolve: (value?: unknown) => void;
reject: (reason?: any) => void;
}[] = [];
/** @internal */ _rpcClient: RpcClient;
/** @internal */ _rpcRequest: RpcRequest;
/** @internal */ _rpcBatchRequest: RpcBatchRequest;
Expand Down Expand Up @@ -2210,7 +2200,6 @@ export class Connection {
httpHeaders = commitmentOrConfig.httpHeaders;
fetchMiddleware = commitmentOrConfig.fetchMiddleware;
disableRetryOnRateLimit = commitmentOrConfig.disableRetryOnRateLimit;
this._autoBatch = commitmentOrConfig.autoBatch;
}

this._rpcEndpoint = endpoint;
Expand Down Expand Up @@ -4009,6 +3998,63 @@ export class Connection {
return res.result;
}

/**
* Perform the provided requests in a single batch JSON RPC request. Basically, this function allows you to
* replace the following code, which executes multiple JSON RPC requests in parallel:
*
* Promise.all(addresses.map(address => connection.getSignaturesForAddress(address, undefined, 'confirmed'))
*
* with the below code, which batches all requests into a single JSON RPC request:
*
* connection.performBatchRequest(addresses.map(address => () => connection.getSignaturesForAddress(address, undefined, 'confirmed'))
*
* @param deferredRequests an array of functions, each which returns a promise to be batched. Each promise should call a
* method on this Connection instance that performs a non-batched request. Note: only methods on
* the Connection class that call _rpcRequest are supported (most do).
* @return {Promise<Array<any>>} an array of responses that correspond to each request.
*/
async performBatchRequest(
deferredRequests: Array<() => Promise<any>>,
): Promise<Array<any>> {
this._numRequestsToBatch = deferredRequests.length;
let promises: Array<any> = [];
await new Promise((resolve, reject) => {
this._resolvePendingBatchRequests = resolve;

// Begin executing the promises.
promises = deferredRequests.map(e => e().catch(reject));

// Each promise generates an RPC payload, and it stores
// that payload, resolve function, and reject function
// in this._batchedRequests.
//
// This outer Promise is resolved only when all the entries
// in this._batchedRequests are created, at which
// point _resolvePendingBatchRequests is called.
});

assert(
this._batchedRequests.length === this._numRequestsToBatch,
'all requests were not properly batched',
);

// Now call the RPC batch request with the data.
try {
const unsafeRes = await this._rpcBatchRequest(
this._batchedRequests.map(e => e.params),
);

// Finally, resolve the promises created by deferredRequests with the appropriate data for each promise.
this._batchedRequests.forEach(({resolve}, i) => resolve(unsafeRes[i]));
} catch (err) {
// Propagate the error to the promises created by deferredRequests.
this._batchedRequests.forEach(({reject}) => reject(err));
}

// Await all promises so we return a list of the results from each one.
return Promise.all(promises);
}

/**
* @internal
*/
Expand Down
80 changes: 80 additions & 0 deletions web3.js/test/connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,86 @@ describe('Connection', () => {
expect(balance).to.be.at.least(0);
});

it('get balance - batch a single request', async () => {
const account = Keypair.generate();

await mockRpcBatchResponse({
batch: [
{
methodName: 'getBalance',
args: [account.publicKey.toBase58()],
},
],
result: [
{
context: {
slot: 11,
},
value: 5,
},
],
});

const [balance] = await connection.performBatchRequest([
() => connection.getBalance(account.publicKey),
]);
expect(balance).to.equal(5);
});

it('get balance - batch multiple requests', async () => {
const account1 = Keypair.generate();
const account2 = Keypair.generate();
const account3 = Keypair.generate();

await mockRpcBatchResponse({
batch: [
{
methodName: 'getBalance',
args: [account1.publicKey.toBase58()],
},
{
methodName: 'getBalance',
args: [account2.publicKey.toBase58()],
},
{
methodName: 'getBalance',
args: [account3.publicKey.toBase58()],
},
],
result: [
{
context: {
slot: 11,
},
value: 5,
},
{
context: {
slot: 11,
},
value: 10,
},
{
context: {
slot: 11,
},
value: 15,
},
],
});

const [balance1, balance2, balance3] = await connection.performBatchRequest(
[
() => connection.getBalance(account1.publicKey),
() => connection.getBalance(account2.publicKey),
() => connection.getBalance(account3.publicKey),
],
);
expect(balance1).to.equal(5);
expect(balance2).to.equal(10);
expect(balance3).to.equal(15);
});

it('get inflation', async () => {
await mockRpcResponse({
method: 'getInflationGovernor',
Expand Down

0 comments on commit 4250046

Please sign in to comment.