Skip to content

Commit

Permalink
set the in-flight limit on the demuxed calls to 1 for now (#8249)
Browse files Browse the repository at this point in the history
Lack of in-flight limit caused a bug in implementation of network graph recomputation:
- recomputation demux rate limit was set to 1/s
- recomputation was taking >10s on mainnet (for unrelated reasons)
- recomputations cannot be executed in parallel (they acquire a single mutex)
- steady inflow of new edges was triggering a recomputation every 1s, while each recomputation took >10s (after acquiring the mutex), making the recomputations queue unboundedly
- the delay of adding an edge to the graph was growing indefinitely, eventually making the edges expire before even it was added to the graph
  • Loading branch information
pompon0 authored Dec 21, 2022
1 parent d57ae75 commit d462b8f
Showing 1 changed file with 40 additions and 24 deletions.
64 changes: 40 additions & 24 deletions chain/network/src/concurrency/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl<Arg: 'static + Send, Res: 'static + Send> Demux<Arg, Res> {
if tokens < rl.burst && next_token.is_none() {
next_token = Some(tokio::time::Instant::now() + interval);
}

tokio::select! {
// TODO(gprusak): implement sleep future support for FakeClock,
// so that we don't use tokio directly here.
Expand All @@ -139,35 +140,50 @@ impl<Arg: 'static + Send, Res: 'static + Send> Demux<Arg, Res> {
},
}
if !calls.is_empty() && tokens > 0 {
// First pop all the elements already accumulated on the queue.
// TODO(gprusak): technically calling try_recv() in a loop may cause a starvation,
// in case elements are added to the queue faster than we can take them out,
// so ideally we should rather atomically dump the content of the queue:
// we can achieve that by maintaining an atomic counter with number of elements in
// the queue.
while let Ok(call) = recv.try_recv() {
calls.push(call);
}

tokens -= 1;
// TODO(gprusak): as of now Demux (as a concurrency primitive) doesn't support
// cancellation. Once we add cancellation support, this task could accept a context sum:
// the sum is valid iff any context is valid.
let calls = std::mem::take(&mut calls);
// TODO(gprusak): don't spawn on the "current" runtime. See one of the previous TODOs.
tokio::spawn(async move {
let mut args = vec![];
let mut outs = vec![];
let mut handlers = vec![];
for call in calls {
args.push(call.arg);
outs.push(call.out);
handlers.push(call.handler);
}
let res = handlers.swap_remove(0)(args).await;
assert_eq!(
res.len(),
outs.len(),
"demux handler returned {} results, expected {}",
res.len(),
outs.len(),
);
for (res, out) in std::iter::zip(res, outs) {
// If the caller is no longer interested in the result,
// the channel will be closed. Ignore that.
let _ = out.send(res);
}
});
let mut args = vec![];
let mut outs = vec![];
let mut handlers = vec![];
// TODO(gprusak): due to inlining the call at most 1 call is executed at any
// given time. Ideally we should have a separate limit for the number of
// in-flight calls. It would be dangerous to have it unbounded, especially in a
// case when the concurrent calls would cause a contention.
// TODO(gprusak): add metrics for:
// - demuxed call latency (the one inlined here)
// - outer call latency (i.e. latency of the whole Demux.call, from the PoV of
// the caller).
for call in calls {
args.push(call.arg);
outs.push(call.out);
handlers.push(call.handler);
}
let res = handlers.swap_remove(0)(args).await;
assert_eq!(
res.len(),
outs.len(),
"demux handler returned {} results, expected {}",
res.len(),
outs.len(),
);
for (res, out) in std::iter::zip(res, outs) {
// If the caller is no longer interested in the result,
// the channel will be closed. Ignore that.
let _ = out.send(res);
}
}
}
});
Expand Down

0 comments on commit d462b8f

Please sign in to comment.