diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index fed05032374..b267cc853f8 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -21,8 +21,11 @@ pub struct ServerSentEventHandler { } impl ServerSentEventHandler { - pub fn new(log: Logger) -> Self { - Self::new_with_capacity(log, DEFAULT_CHANNEL_CAPACITY) + pub fn new(log: Logger, capacity_multiplier: usize) -> Self { + Self::new_with_capacity( + log, + capacity_multiplier.saturating_mul(DEFAULT_CHANNEL_CAPACITY), + ) } pub fn new_with_capacity(log: Logger, capacity: usize) -> Self { diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 8383963b7c2..bd2946bb7a4 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -157,7 +157,10 @@ where let context = runtime_context.service_context("beacon".into()); let spec = chain_spec.ok_or("beacon_chain_start_method requires a chain spec")?; let event_handler = if self.http_api_config.enabled { - Some(ServerSentEventHandler::new(context.log().clone())) + Some(ServerSentEventHandler::new( + context.log().clone(), + self.http_api_config.sse_capacity_multiplier, + )) } else { None }; diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 4d5b98a8238..8e316834d0e 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -65,7 +65,10 @@ use tokio::sync::{ mpsc::{Sender, UnboundedSender}, oneshot, }; -use tokio_stream::{wrappers::BroadcastStream, StreamExt}; +use tokio_stream::{ + wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}, + StreamExt, +}; use types::{ Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, @@ -132,6 +135,7 @@ pub struct Config { pub allow_sync_stalled: bool, pub spec_fork_name: Option, pub data_dir: PathBuf, + pub sse_capacity_multiplier: usize, pub enable_beacon_processor: bool, } @@ -146,6 +150,7 @@ impl Default for Config { allow_sync_stalled: false, spec_fork_name: None, data_dir: PathBuf::from(DEFAULT_ROOT_DIR), + sse_capacity_multiplier: 1, enable_beacon_processor: true, } } @@ -4348,22 +4353,29 @@ pub fn serve( } }; - receivers.push(BroadcastStream::new(receiver).map(|msg| { - match msg { - Ok(data) => Event::default() - .event(data.topic_name()) - .json_data(data) - .map_err(|e| { - warp_utils::reject::server_sent_event_error(format!( - "{:?}", - e - )) - }), - Err(e) => Err(warp_utils::reject::server_sent_event_error( - format!("{:?}", e), - )), - } - })); + receivers.push( + BroadcastStream::new(receiver) + .map(|msg| { + match msg { + Ok(data) => Event::default() + .event(data.topic_name()) + .json_data(data) + .unwrap_or_else(|e| { + Event::default() + .comment(format!("error - bad json: {e:?}")) + }), + // Do not terminate the stream if the channel fills + // up. Just drop some messages and send a comment to + // the client. + Err(BroadcastStreamRecvError::Lagged(n)) => { + Event::default().comment(format!( + "error - dropped {n} messages" + )) + } + } + }) + .map(Ok::<_, std::convert::Infallible>), + ); } } else { return Err(warp_utils::reject::custom_server_error( @@ -4373,7 +4385,7 @@ pub fn serve( let s = futures::stream::select_all(receivers); - Ok::<_, warp::Rejection>(warp::sse::reply(warp::sse::keep_alive().stream(s))) + Ok(warp::sse::reply(warp::sse::keep_alive().stream(s))) }) }, ); diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 0367776f8de..dcc25322297 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -225,6 +225,7 @@ pub async fn create_api_server_on_port( allow_sync_stalled: false, data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR), spec_fork_name: None, + sse_capacity_multiplier: 1, enable_beacon_processor: true, }, chain: Some(chain), diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 0330bd3f7cc..974dabbf0cf 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -382,6 +382,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { stalled. This is useful for very small testnets. TESTING ONLY. DO NOT USE ON \ MAINNET.") ) + .arg( + Arg::with_name("http-sse-capacity-multiplier") + .long("http-sse-capacity-multiplier") + .takes_value(true) + .default_value("1") + .value_name("N") + .help("Multiplier to apply to the length of HTTP server-sent-event (SSE) channels. \ + Increasing this value can prevent messages from being dropped.") + ) .arg( Arg::with_name("http-enable-beacon-processor") .long("http-enable-beacon-processor") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 21df86620df..0d2a5d812be 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -149,6 +149,9 @@ pub fn get_config( client_config.http_api.allow_sync_stalled = true; } + client_config.http_api.sse_capacity_multiplier = + parse_required(cli_args, "http-sse-capacity-multiplier")?; + client_config.http_api.enable_beacon_processor = parse_required(cli_args, "http-enable-beacon-processor")?; diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index ecc936cbfb4..02b95120127 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -2349,3 +2349,18 @@ fn beacon_processor_zero_workers() { .flag("beacon-processor-max-workers", Some("0")) .run_with_zero_port(); } + +#[test] +fn http_sse_capacity_multiplier_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 1)); +} + +#[test] +fn http_sse_capacity_multiplier_override() { + CommandLineTest::new() + .flag("http-sse-capacity-multiplier", Some("10")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 10)); +}