Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add timeouts for request / response stream connections #8434

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b51e908
feat: cleanup publisher
LHerskind Aug 22, 2024
da2eb7a
refactor: get rid of timetraveler from l1-publisher
LHerskind Aug 28, 2024
3388966
feat: revert if timestamp in future
LHerskind Aug 28, 2024
13a60a3
feat: including txhashes explicitly in the rollup attestations
Maddiaa0 Aug 29, 2024
86026f2
temp
Maddiaa0 Aug 30, 2024
f3eac5b
Merge branch 'master' into md/check-tx-requests-before-signing
Maddiaa0 Aug 30, 2024
fc7a04a
temp
Maddiaa0 Aug 30, 2024
9eed298
temp
Maddiaa0 Sep 2, 2024
cc09455
temp
Maddiaa0 Sep 2, 2024
06f950f
Merge branch 'master' into md/check-tx-requests-before-signing
Maddiaa0 Sep 2, 2024
4727cd9
temp: get passing with txhash payloads
Maddiaa0 Sep 3, 2024
b4c2a46
fix: make sure transactions are available in the tx pool
Maddiaa0 Sep 5, 2024
4a8d178
chore: remove logs
Maddiaa0 Sep 5, 2024
b4324fc
fmt
Maddiaa0 Sep 5, 2024
052641a
Merge branch 'master' into md/check-tx-requests-before-signing
Maddiaa0 Sep 5, 2024
a803a94
🪿
Maddiaa0 Sep 5, 2024
164c117
chore: validator tests
Maddiaa0 Sep 6, 2024
27da59d
Add timeouts to individual reqresp connections
Maddiaa0 Sep 6, 2024
9e7d2d8
fix
Maddiaa0 Sep 6, 2024
3c8e1b9
chore: include tests for specific error messages
Maddiaa0 Sep 6, 2024
9a738e5
fmt
Maddiaa0 Sep 6, 2024
c10260c
Merge branch 'master' into md/check-tx-requests-before-signing
Maddiaa0 Sep 6, 2024
045af5a
clean
Maddiaa0 Sep 6, 2024
73d26ec
🧹
Maddiaa0 Sep 6, 2024
d358228
chore: fix sequencing tests
Maddiaa0 Sep 7, 2024
4b31953
Merge branch 'master' into md/check-tx-requests-before-signing
Maddiaa0 Sep 7, 2024
2f82a8f
Merge branch 'md/check-tx-requests-before-signing' into md/09-06-add_…
Maddiaa0 Sep 7, 2024
f673593
fmt
Maddiaa0 Sep 7, 2024
a15ab17
fmt
Maddiaa0 Sep 7, 2024
2e3f80b
fmt solidity
Maddiaa0 Sep 7, 2024
ae2a05e
Merge branch 'md/check-tx-requests-before-signing' into md/09-06-add_…
Maddiaa0 Sep 7, 2024
5734006
Merge branch 'master' into md/check-tx-requests-before-signing
Maddiaa0 Sep 7, 2024
1bde1fe
fix: test hash
Maddiaa0 Sep 8, 2024
c775b26
Merge branch 'md/check-tx-requests-before-signing' into md/09-06-add_…
Maddiaa0 Sep 8, 2024
7a50a2b
exp: adjust test nodes
Maddiaa0 Sep 8, 2024
72f98bd
Merge branch 'md/check-tx-requests-before-signing' into md/09-06-add_…
Maddiaa0 Sep 8, 2024
998f38c
chore: add reqresp configuration values to p2p config
Maddiaa0 Sep 8, 2024
1c2b151
fix: use abi.encode vs encodePacked
Maddiaa0 Sep 11, 2024
e6e7f6b
fix
Maddiaa0 Sep 11, 2024
6f417fc
Merge branch 'master' into md/check-tx-requests-before-signing
Maddiaa0 Sep 11, 2024
cde6283
fix: merge fix
Maddiaa0 Sep 11, 2024
8290c99
fmt
Maddiaa0 Sep 11, 2024
b13ca93
Merge branch 'md/check-tx-requests-before-signing' into md/09-06-add_…
Maddiaa0 Sep 11, 2024
1452017
Merge branch 'master' into md/check-tx-requests-before-signing
Maddiaa0 Sep 11, 2024
120c9a3
Merge branch 'md/check-tx-requests-before-signing' into md/09-06-add_…
Maddiaa0 Sep 11, 2024
9b90fe1
Merge branch 'master' into md/09-06-add_timeouts_to_individual_reqres…
Maddiaa0 Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ describe('L1Publisher integration', () => {

writeJson(`mixed_block_${block.number}`, block, l1ToL2Content, recipientAddress, deployerAccount.address);

await publisher.processL2Block(block);
await publisher.proposeL2Block(block);

const logs = await publicClient.getLogs({
address: rollupAddress,
Expand Down Expand Up @@ -485,7 +485,7 @@ describe('L1Publisher integration', () => {

writeJson(`empty_block_${block.number}`, block, [], AztecAddress.ZERO, deployerAccount.address);

await publisher.processL2Block(block);
await publisher.proposeL2Block(block);

const logs = await publicClient.getLogs({
address: rollupAddress,
Expand Down
2 changes: 2 additions & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export type EnvVar =
| 'TX_GOSSIP_VERSION'
| 'P2P_QUERY_FOR_IP'
| 'P2P_TX_POOL_KEEP_PROVEN_FOR'
| 'P2P_REQRESP_OVERALL_REQUEST_TIMEOUT_MS'
| 'P2P_REQRESP_INDIVIDUAL_REQUEST_TIMEOUT_MS'
| 'TELEMETRY'
| 'OTEL_SERVICE_NAME'
| 'OTEL_EXPORTER_OTLP_METRICS_ENDPOINT'
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/foundation/src/timer/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export { TimeoutTask } from './timeout.js';
export { TimeoutTask, executeTimeoutWithCustomError } from './timeout.js';
export { Timer } from './timer.js';
export { elapsed, elapsedSync } from './elapsed.js';
19 changes: 17 additions & 2 deletions yarn-project/foundation/src/timer/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ export class TimeoutTask<T> {
private interrupt = () => {};
private totalTime = 0;

constructor(private fn: () => Promise<T>, private timeout = 0, fnName = '') {
constructor(
private fn: () => Promise<T>,
private timeout = 0,
fnName = '',
error = () => new Error(`Timeout${fnName ? ` running ${fnName}` : ''} after ${timeout}ms.`),
) {
this.interruptPromise = new Promise<T>((_, reject) => {
this.interrupt = () => reject(new Error(`Timeout${fnName ? ` running ${fnName}` : ''} after ${timeout}ms.`));
this.interrupt = () => reject(error());
});
}

Expand Down Expand Up @@ -62,3 +67,13 @@ export const executeTimeout = async <T>(fn: () => Promise<T>, timeout = 0, fnNam
const task = new TimeoutTask(fn, timeout, fnName);
return await task.exec();
};

export const executeTimeoutWithCustomError = async <T>(
fn: () => Promise<T>,
timeout = 0,
error = () => new Error('No custom error provided'),
fnName = '',
) => {
const task = new TimeoutTask(fn, timeout, fnName, error);
return await task.exec();
};
5 changes: 4 additions & 1 deletion yarn-project/p2p/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import {
pickConfigMappings,
} from '@aztec/foundation/config';

import { type P2PReqRespConfig, p2pReqRespConfigMappings } from './service/reqresp/config.js';

/**
* P2P client configuration values.
*/
export interface P2PConfig {
export interface P2PConfig extends P2PReqRespConfig {
/**
* A flag dictating whether the P2P subsystem should be enabled.
*/
Expand Down Expand Up @@ -170,6 +172,7 @@ export const p2pConfigMappings: ConfigMappingsType<P2PConfig> = {
'How many blocks have to pass after a block is proven before its txs are deleted (zero to delete immediately once proven)',
...numberConfigHelper(0),
},
...p2pReqRespConfigMappings,
};

/**
Expand Down
21 changes: 21 additions & 0 deletions yarn-project/p2p/src/errors/reqresp.error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/** Individual request timeout error
*
* This error will be thrown when a request to a specific peer times out.
* @category Errors
*/
export class IndiviualReqRespTimeoutError extends Error {
constructor() {
super(`Request to peer timed out`);
}
}

/** Collective request timeout error
*
* This error will be thrown when a req resp request times out regardless of the peer.
* @category Errors
*/
export class CollectiveReqRespTimeoutError extends Error {
constructor() {
super(`Request to all peers timed out`);
}
}
7 changes: 6 additions & 1 deletion yarn-project/p2p/src/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { bootstrap } from '@libp2p/bootstrap';
import { tcp } from '@libp2p/tcp';
import { type Libp2p, type Libp2pOptions, createLibp2p } from 'libp2p';

import { type P2PReqRespConfig } from '../service/reqresp/config.js';
import { pingHandler, statusHandler } from '../service/reqresp/handlers.js';
import {
PING_PROTOCOL,
Expand Down Expand Up @@ -80,7 +81,11 @@ export const stopNodes = async (nodes: ReqRespNode[]): Promise<void> => {
// Create a req resp node, exposing the underlying p2p node
export const createReqResp = async (): Promise<ReqRespNode> => {
const p2p = await createLibp2pNode();
const req = new ReqResp(p2p);
const config: P2PReqRespConfig = {
overallRequestTimeoutMs: 4000,
individualRequestTimeoutMs: 2000,
};
const req = new ReqResp(config, p2p);
return {
p2p,
req,
Expand Down
2 changes: 2 additions & 0 deletions yarn-project/p2p/src/service/discv5_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { BootstrapNode } from '../bootstrap/bootstrap.js';
import { type P2PConfig } from '../config.js';
import { DiscV5Service } from './discV5_service.js';
import { createLibP2PPeerId } from './libp2p_service.js';
import { DEFAULT_P2P_REQRESP_CONFIG } from './reqresp/config.js';
import { PeerDiscoveryState } from './service.js';

const waitForPeers = (node: DiscV5Service, expectedCount: number): Promise<void> => {
Expand Down Expand Up @@ -135,6 +136,7 @@ describe('Discv5Service', () => {
p2pEnabled: true,
l2QueueSize: 100,
keepProvenTxsInPoolFor: 0,
...DEFAULT_P2P_REQRESP_CONFIG,
};
return new DiscV5Service(peerId, config);
};
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/service/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export class LibP2PService implements P2PService {
private logger = createDebugLogger('aztec:libp2p_service'),
) {
this.peerManager = new PeerManager(node, peerDiscoveryService, config, logger);
this.reqresp = new ReqResp(node);
this.reqresp = new ReqResp(config, node);

this.blockReceivedCallback = (block: BlockProposal): Promise<BlockAttestation | undefined> => {
this.logger.verbose(
Expand Down
35 changes: 35 additions & 0 deletions yarn-project/p2p/src/service/reqresp/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { type ConfigMapping, numberConfigHelper } from '@aztec/foundation/config';

export const DEFAULT_INDIVIDUAL_REQUEST_TIMEOUT_MS = 2000;
export const DEFAULT_OVERALL_REQUEST_TIMEOUT_MS = 4000;

// For use in tests.
export const DEFAULT_P2P_REQRESP_CONFIG: P2PReqRespConfig = {
overallRequestTimeoutMs: DEFAULT_OVERALL_REQUEST_TIMEOUT_MS,
individualRequestTimeoutMs: DEFAULT_INDIVIDUAL_REQUEST_TIMEOUT_MS,
};

export interface P2PReqRespConfig {
/**
* The overall timeout for a request response operation.
*/
overallRequestTimeoutMs: number;

/**
* The timeout for an individual request response peer interaction.
*/
individualRequestTimeoutMs: number;
}

export const p2pReqRespConfigMappings: Record<keyof P2PReqRespConfig, ConfigMapping> = {
overallRequestTimeoutMs: {
env: 'P2P_REQRESP_OVERALL_REQUEST_TIMEOUT_MS',
description: 'The overall timeout for a request response operation.',
...numberConfigHelper(DEFAULT_OVERALL_REQUEST_TIMEOUT_MS),
},
individualRequestTimeoutMs: {
env: 'P2P_REQRESP_INDIVIDUAL_REQUEST_TIMEOUT_MS',
description: 'The timeout for an individual request response peer interaction.',
...numberConfigHelper(DEFAULT_INDIVIDUAL_REQUEST_TIMEOUT_MS),
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { type P2PClient } from '../../client/p2p_client.js';
import { type BootnodeConfig, type P2PConfig } from '../../config.js';
import { type TxPool } from '../../tx_pool/index.js';
import { createLibP2PPeerId } from '../index.js';
import { DEFAULT_P2P_REQRESP_CONFIG } from './config.js';

/**
* Mockify helper for testing purposes.
Expand Down Expand Up @@ -92,6 +93,7 @@ describe('Req Resp p2p client integration', () => {
queryForIp: false,
dataDirectory: undefined,
l1Contracts: { rollupAddress: EthAddress.ZERO },
...DEFAULT_P2P_REQRESP_CONFIG,
};

txPool = {
Expand Down
59 changes: 58 additions & 1 deletion yarn-project/p2p/src/service/reqresp/reqresp.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { TxHash, mockTx } from '@aztec/circuit-types';
import { sleep } from '@aztec/foundation/sleep';

import { describe, expect, it } from '@jest/globals';
import { describe, expect, it, jest } from '@jest/globals';

import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js';
import { MOCK_SUB_PROTOCOL_HANDLERS, connectToPeers, createNodes, startNodes, stopNodes } from '../../mocks/index.js';
import { PING_PROTOCOL, TX_REQ_PROTOCOL } from './interface.js';

Expand Down Expand Up @@ -120,5 +121,61 @@ describe('ReqResp', () => {

await stopNodes(nodes);
});

it('Should hit individual timeout if nothing is returned over the stream', async () => {
const nodes = await createNodes(2);

await startNodes(nodes);

jest.spyOn((nodes[1].req as any).subProtocolHandlers, TX_REQ_PROTOCOL).mockImplementation(() => {
return new Promise(() => {});
});

// Spy on the logger to make sure the error message is logged
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'error');

await sleep(500);
await connectToPeers(nodes);
await sleep(500);

const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, Buffer.from('tx'));
expect(res).toBeUndefined();

// Make sure the error message is logged
const errorMessage = `${
new IndiviualReqRespTimeoutError().message
} | peerId: ${nodes[1].p2p.peerId.toString()} | subProtocol: ${TX_REQ_PROTOCOL}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);

await stopNodes(nodes);
});

it('Should hit collective timeout if nothing is returned over the stream from multiple peers', async () => {
const nodes = await createNodes(4);

await startNodes(nodes);

for (let i = 1; i < nodes.length; i++) {
jest.spyOn((nodes[i].req as any).subProtocolHandlers, TX_REQ_PROTOCOL).mockImplementation(() => {
return new Promise(() => {});
});
}

// Spy on the logger to make sure the error message is logged
const loggerSpy = jest.spyOn((nodes[0].req as any).logger, 'error');

await sleep(500);
await connectToPeers(nodes);
await sleep(500);

const res = await nodes[0].req.sendRequest(TX_REQ_PROTOCOL, Buffer.from('tx'));
expect(res).toBeUndefined();

// Make sure the error message is logged
const errorMessage = `${new CollectiveReqRespTimeoutError().message} | subProtocol: ${TX_REQ_PROTOCOL}`;
expect(loggerSpy).toHaveBeenCalledWith(errorMessage);

await stopNodes(nodes);
});
});
});
Loading
Loading