Skip to content

Commit

Permalink
Don't kill SSE stream if channel fills up (sigp#4500)
Browse files Browse the repository at this point in the history
Closes sigp#4245

- If an SSE channel fills up, send a comment instead of terminating the stream.
- Add a CLI flag for scaling up the SSE buffer: `--http-sse-capacity-multiplier N`.

~~Blocked on sigp#4462. I haven't rebased on that PR yet for initial testing, because it still needs some more work to handle long-running HTTP threads.~~

- [x] Add CLI flag tests.
  • Loading branch information
michaelsproul authored and Woodpile37 committed Jan 6, 2024
1 parent 828f3e5 commit e7675fe
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 3 deletions.
7 changes: 5 additions & 2 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ pub struct ServerSentEventHandler<T: EthSpec> {
}

impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn new(log: Logger) -> Self {
Self::new_with_capacity(log, DEFAULT_CHANNEL_CAPACITY)
pub fn new(log: Logger, capacity_multiplier: usize) -> Self {
Self::new_with_capacity(
log,
capacity_multiplier.saturating_mul(DEFAULT_CHANNEL_CAPACITY),
)
}

pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {
Expand Down
5 changes: 4 additions & 1 deletion beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ where
let context = runtime_context.service_context("beacon".into());
let spec = chain_spec.ok_or("beacon_chain_start_method requires a chain spec")?;
let event_handler = if self.http_api_config.enabled {
Some(ServerSentEventHandler::new(context.log().clone()))
Some(ServerSentEventHandler::new(
context.log().clone(),
self.http_api_config.sse_capacity_multiplier,
))
} else {
None
};
Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
allow_sync_stalled: false,
data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR),
spec_fork_name: None,
sse_capacity_multiplier: 1,
enable_beacon_processor: true,
},
chain: Some(chain),
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
stalled. This is useful for very small testnets. TESTING ONLY. DO NOT USE ON \
MAINNET.")
)
.arg(
Arg::with_name("http-sse-capacity-multiplier")
.long("http-sse-capacity-multiplier")
.takes_value(true)
.default_value("1")
.value_name("N")
.help("Multiplier to apply to the length of HTTP server-sent-event (SSE) channels. \
Increasing this value can prevent messages from being dropped.")
)
.arg(
Arg::with_name("http-enable-beacon-processor")
.long("http-enable-beacon-processor")
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ pub fn get_config<E: EthSpec>(
client_config.http_api.allow_sync_stalled = true;
}

client_config.http_api.sse_capacity_multiplier =
parse_required(cli_args, "http-sse-capacity-multiplier")?;

client_config.http_api.enable_beacon_processor =
parse_required(cli_args, "http-enable-beacon-processor")?;

Expand Down
15 changes: 15 additions & 0 deletions lighthouse/tests/beacon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2388,3 +2388,18 @@ fn beacon_processor_zero_workers() {
.flag("beacon-processor-max-workers", Some("0"))
.run_with_zero_port();
}

#[test]
fn http_sse_capacity_multiplier_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 1));
}

#[test]
fn http_sse_capacity_multiplier_override() {
CommandLineTest::new()
.flag("http-sse-capacity-multiplier", Some("10"))
.run_with_zero_port()
.with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 10));
}

0 comments on commit e7675fe

Please sign in to comment.