diff --git a/chain/network/src/concurrency/demux.rs b/chain/network/src/concurrency/demux.rs index 251fe5c3a1c..08b9ccee3d6 100644 --- a/chain/network/src/concurrency/demux.rs +++ b/chain/network/src/concurrency/demux.rs @@ -122,6 +122,7 @@ impl Demux { if tokens < rl.burst && next_token.is_none() { next_token = Some(tokio::time::Instant::now() + interval); } + tokio::select! { // TODO(gprusak): implement sleep future support for FakeClock, // so that we don't use tokio directly here. @@ -139,35 +140,50 @@ impl Demux { }, } if !calls.is_empty() && tokens > 0 { + // First pop all the elements already accumulated on the queue. + // TODO(gprusak): technically calling try_recv() in a loop may cause a starvation, + // in case elements are added to the queue faster than we can take them out, + // so ideally we should rather atomically dump the content of the queue: + // we can achieve that by maintaining an atomic counter with number of elements in + // the queue. + while let Ok(call) = recv.try_recv() { + calls.push(call); + } + tokens -= 1; // TODO(gprusak): as of now Demux (as a concurrency primitive) doesn't support // cancellation. Once we add cancellation support, this task could accept a context sum: // the sum is valid iff any context is valid. let calls = std::mem::take(&mut calls); - // TODO(gprusak): don't spawn on the "current" runtime. See one of the previous TODOs. - tokio::spawn(async move { - let mut args = vec![]; - let mut outs = vec![]; - let mut handlers = vec![]; - for call in calls { - args.push(call.arg); - outs.push(call.out); - handlers.push(call.handler); - } - let res = handlers.swap_remove(0)(args).await; - assert_eq!( - res.len(), - outs.len(), - "demux handler returned {} results, expected {}", - res.len(), - outs.len(), - ); - for (res, out) in std::iter::zip(res, outs) { - // If the caller is no longer interested in the result, - // the channel will be closed. Ignore that. - let _ = out.send(res); - } - }); + let mut args = vec![]; + let mut outs = vec![]; + let mut handlers = vec![]; + // TODO(gprusak): due to inlining the call at most 1 call is executed at any + // given time. Ideally we should have a separate limit for the number of + // in-flight calls. It would be dangerous to have it unbounded, especially in a + // case when the concurrent calls would cause a contention. + // TODO(gprusak): add metrics for: + // - demuxed call latency (the one inlined here) + // - outer call latency (i.e. latency of the whole Demux.call, from the PoV of + // the caller). + for call in calls { + args.push(call.arg); + outs.push(call.out); + handlers.push(call.handler); + } + let res = handlers.swap_remove(0)(args).await; + assert_eq!( + res.len(), + outs.len(), + "demux handler returned {} results, expected {}", + res.len(), + outs.len(), + ); + for (res, out) in std::iter::zip(res, outs) { + // If the caller is no longer interested in the result, + // the channel will be closed. Ignore that. + let _ = out.send(res); + } } } });