Skip to content

Commit

Permalink
Driver: Fix scheduler crash after task closure
Browse files Browse the repository at this point in the history
A removed audio task could still have one or more driver messages left in its queue, leading to a crash when the id->mixer lookup failed. This removes an unwrap which is invalid under these assumptions and includes an extra cleanup measure for message forwarders under the same circumstances.

This was tested using `cargo make ready`.
  • Loading branch information
FelixMcFelix committed Nov 20, 2023
1 parent c976d50 commit 77e3916
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 19 deletions.
30 changes: 18 additions & 12 deletions src/driver/scheduler/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use flume::{Receiver, Sender};
use nohash_hasher::{BuildNoHashHasher, IntMap};
use tokio::time::{Instant as TokInstant, Interval};
use tracing::warn;

use crate::constants::*;

Expand Down Expand Up @@ -71,30 +72,32 @@ impl Idle {
biased;
msg = self.rx.recv_async() => match msg {
Ok(SchedulerMessage::NewMixer(rx, ic, cfg)) => {
let mixer = ParkedMixer::new(rx, ic, cfg);
let mut mixer = ParkedMixer::new(rx, ic, cfg);
let id = self.next_id.incr();

mixer.spawn_forwarder(self.tx.clone(), id);
self.tasks.insert(id, mixer);
self.stats.add_idle_mixer();
},
Ok(SchedulerMessage::Demote(id, task)) => {
Ok(SchedulerMessage::Demote(id, mut task)) => {
task.send_gateway_not_speaking();

task.spawn_forwarder(self.tx.clone(), id);
self.tasks.insert(id, task);
},
Ok(SchedulerMessage::Do(id, mix_msg)) => {
let now_live = mix_msg.is_mixer_now_live();
let task = self.tasks.get_mut(&id).unwrap();

match task.handle_message(mix_msg) {
Ok(false) if now_live => {
let task = self.tasks.remove(&id).unwrap();
self.schedule_mixer(task, id, None);
},
Ok(false) => {},
Ok(true) | Err(_) => self.to_cull.push(id),
if let Some(task) = self.tasks.get_mut(&id) {
match task.handle_message(mix_msg) {
Ok(false) if now_live => {
let task = self.tasks.remove(&id).unwrap();
self.schedule_mixer(task, id, None);
},
Ok(false) => {},
Ok(true) | Err(_) => self.to_cull.push(id),
}
} else {
warn!("Received post-cull message for {id:?}, discarding.");
}
},
Ok(SchedulerMessage::Overspill(worker_id, id, task)) => {
Expand Down Expand Up @@ -136,7 +139,9 @@ impl Idle {
}

for id in self.to_cull.drain(..) {
self.tasks.remove(&id);
if let Some(tx) = self.tasks.remove(&id).and_then(|t| t.cull_handle) {
_ = tx.send_async(()).await;
}
}

true
Expand Down Expand Up @@ -292,6 +297,7 @@ mod test {
rtp_timestamp: i,
park_time: TokInstant::now().into(),
last_cost: None,
cull_handle: None,
};
core.stats.add_idle_mixer();
core.stats.move_mixer_to_live();
Expand Down
2 changes: 2 additions & 0 deletions src/driver/scheduler/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ impl Live {
rtp_timestamp: id_0 as u32,
park_time: Instant::now(),
last_cost: None,
cull_handle: None,
},
id,
Instant::now(),
Expand Down Expand Up @@ -617,6 +618,7 @@ impl Live {
rtp_timestamp,
park_time,
last_cost: None,
cull_handle: None,
},
)
})
Expand Down
29 changes: 23 additions & 6 deletions src/driver/scheduler/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ pub struct ParkedMixer {
/// The last known cost of executing this task, if it had to be moved
/// due to a limit on thread resources.
pub last_cost: Option<Duration>,
/// Handle to any forwarder task, used if this mixer is culled while idle.
pub cull_handle: Option<Sender<()>>,
}

#[allow(missing_docs)]
Expand All @@ -88,21 +90,36 @@ impl ParkedMixer {
rtp_timestamp: random::<u32>(),
park_time: Instant::now(),
last_cost: None,
cull_handle: None,
}
}

/// Spawn a tokio task which forwards any mixer messages to the central `Idle` task pool.
///
/// Any requests which would cause this mixer to become live will terminate
/// this task.
pub fn spawn_forwarder(&self, tx: Sender<SchedulerMessage>, id: TaskId) {
pub fn spawn_forwarder(&mut self, tx: Sender<SchedulerMessage>, id: TaskId) {
let (kill_tx, kill_rx) = flume::bounded(1);
self.cull_handle = Some(kill_tx);

let remote_rx = self.mixer.mix_rx.clone();
tokio::spawn(async move {
while let Ok(msg) = remote_rx.recv_async().await {
let exit = msg.is_mixer_now_live();
let dead = tx.send_async(SchedulerMessage::Do(id, msg)).await.is_err();
if exit || dead {
break;
loop {
tokio::select! {
biased;
_ = kill_rx.recv_async() => break,
msg = remote_rx.recv_async() => {
let exit = if let Ok(msg) = msg {
let remove_self = msg.is_mixer_now_live();
tx.send_async(SchedulerMessage::Do(id, msg)).await.is_err() || remove_self
} else {
true
};

if exit {
break;
}
}
}
}
});
Expand Down
1 change: 0 additions & 1 deletion src/input/sources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use reqwest::{
header::{HeaderMap, ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_TYPE, RANGE, RETRY_AFTER},
Client,
};
use std::fmt::format;
use std::{
io::{Error as IoError, ErrorKind as IoErrorKind, Result as IoResult, SeekFrom},
pin::Pin,
Expand Down

0 comments on commit 77e3916

Please sign in to comment.