Skip to content

Commit

Permalink
fix(request-response): Report failure when streams are at capacity
Browse files Browse the repository at this point in the history
Fixes potential hanging issue if use relies on response or failures to make progress

Pull-Request: #5417.
  • Loading branch information
oblique authored Jun 4, 2024
1 parent 68301b8 commit af42122
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 4 deletions.
3 changes: 3 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 0.26.3

- Report failure when streams are at capacity.
See [PR 5417](https://github.com/libp2p/rust-libp2p/pull/5417).

- Report dial IO errors to the user.
See [PR 5429](https://github.com/libp2p/rust-libp2p/pull/5429).

Expand Down
8 changes: 7 additions & 1 deletion protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ where
}
};

// Inbound connections are reported to the upper layer from within the above task,
// so by failing to schedule it, it means the upper layer will never know about the
// inbound request. Because of that we do not report any inbound failure.
if self
.worker_streams
.try_push(RequestId::Inbound(request_id), recv.boxed())
Expand Down Expand Up @@ -204,7 +207,10 @@ where
.try_push(RequestId::Outbound(request_id), send.boxed())
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
self.pending_events.push_back(Event::OutboundStreamFailed {
request_id: message.request_id,
error: io::Error::new(io::ErrorKind::Other, "max sub-streams reached"),
});
}
}

Expand Down
67 changes: 64 additions & 3 deletions protocols/request-response/tests/error_reporting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,58 @@ async fn report_outbound_timeout_on_read_response() {
futures::future::select(server_task, client_task).await;
}

#[async_std::test]
async fn report_outbound_failure_on_max_streams() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();

// `swarm2` will be able to handle only 1 stream per time.
let swarm2_config = request_response::Config::default()
.with_request_timeout(Duration::from_millis(100))
.with_max_concurrent_streams(1);

let (peer1_id, mut swarm1) = new_swarm();
let (peer2_id, mut swarm2) = new_swarm_with_config(swarm2_config);

swarm1.listen().with_memory_addr_external().await;
swarm2.connect(&mut swarm1).await;

let swarm1_task = async move {
let _req_id = swarm1
.behaviour_mut()
.send_request(&peer2_id, Action::FailOnMaxStreams);

// Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead.
wait_no_events(&mut swarm1).await;
};

// Expects OutboundFailure::Io failure.
let swarm2_task = async move {
let (peer, _inbound_req_id, action, _resp_channel) =
wait_request(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(action, Action::FailOnMaxStreams);

// A task for sending back a response is already scheduled so max concurrent
// streams is reached and no new tasks can be sheduled.
//
// We produce the failure by creating new request before we response.
let outbound_req_id = swarm2
.behaviour_mut()
.send_request(&peer1_id, Action::FailOnMaxStreams);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(req_id_done, outbound_req_id);
assert!(matches!(error, OutboundFailure::Io(_)));
};

let swarm1_task = pin!(swarm1_task);
let swarm2_task = pin!(swarm2_task);
futures::future::select(swarm1_task, swarm2_task).await;
}

#[async_std::test]
async fn report_inbound_failure_on_read_request() {
let _ = tracing_subscriber::fmt()
Expand Down Expand Up @@ -332,6 +384,7 @@ enum Action {
FailOnWriteRequest,
FailOnWriteResponse,
TimeoutOnWriteResponse,
FailOnMaxStreams,
}

impl From<Action> for u8 {
Expand All @@ -343,6 +396,7 @@ impl From<Action> for u8 {
Action::FailOnWriteRequest => 3,
Action::FailOnWriteResponse => 4,
Action::TimeoutOnWriteResponse => 5,
Action::FailOnMaxStreams => 6,
}
}
}
Expand All @@ -358,6 +412,7 @@ impl TryFrom<u8> for Action {
3 => Ok(Action::FailOnWriteRequest),
4 => Ok(Action::FailOnWriteResponse),
5 => Ok(Action::TimeoutOnWriteResponse),
6 => Ok(Action::FailOnMaxStreams),
_ => Err(io::Error::new(io::ErrorKind::Other, "invalid action")),
}
}
Expand Down Expand Up @@ -468,11 +523,10 @@ impl Codec for TestCodec {
}
}

fn new_swarm_with_timeout(
timeout: Duration,
fn new_swarm_with_config(
cfg: request_response::Config,
) -> (PeerId, Swarm<request_response::Behaviour<TestCodec>>) {
let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full));
let cfg = request_response::Config::default().with_request_timeout(timeout);

let swarm =
Swarm::new_ephemeral(|_| request_response::Behaviour::<TestCodec>::new(protocols, cfg));
Expand All @@ -481,6 +535,13 @@ fn new_swarm_with_timeout(
(peed_id, swarm)
}

fn new_swarm_with_timeout(
timeout: Duration,
) -> (PeerId, Swarm<request_response::Behaviour<TestCodec>>) {
let cfg = request_response::Config::default().with_request_timeout(timeout);
new_swarm_with_config(cfg)
}

fn new_swarm() -> (PeerId, Swarm<request_response::Behaviour<TestCodec>>) {
new_swarm_with_timeout(Duration::from_millis(100))
}
Expand Down

0 comments on commit af42122

Please sign in to comment.