diff --git a/src/driver/scheduler/idle.rs b/src/driver/scheduler/idle.rs index ad4042b1c..4c3797411 100644 --- a/src/driver/scheduler/idle.rs +++ b/src/driver/scheduler/idle.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use flume::{Receiver, Sender}; use nohash_hasher::{BuildNoHashHasher, IntMap}; use tokio::time::{Instant as TokInstant, Interval}; +use tracing::warn; use crate::constants::*; @@ -71,14 +72,14 @@ impl Idle { biased; msg = self.rx.recv_async() => match msg { Ok(SchedulerMessage::NewMixer(rx, ic, cfg)) => { - let mixer = ParkedMixer::new(rx, ic, cfg); + let mut mixer = ParkedMixer::new(rx, ic, cfg); let id = self.next_id.incr(); mixer.spawn_forwarder(self.tx.clone(), id); self.tasks.insert(id, mixer); self.stats.add_idle_mixer(); }, - Ok(SchedulerMessage::Demote(id, task)) => { + Ok(SchedulerMessage::Demote(id, mut task)) => { task.send_gateway_not_speaking(); task.spawn_forwarder(self.tx.clone(), id); @@ -86,15 +87,17 @@ impl Idle { }, Ok(SchedulerMessage::Do(id, mix_msg)) => { let now_live = mix_msg.is_mixer_now_live(); - let task = self.tasks.get_mut(&id).unwrap(); - - match task.handle_message(mix_msg) { - Ok(false) if now_live => { - let task = self.tasks.remove(&id).unwrap(); - self.schedule_mixer(task, id, None); - }, - Ok(false) => {}, - Ok(true) | Err(_) => self.to_cull.push(id), + if let Some(task) = self.tasks.get_mut(&id) { + match task.handle_message(mix_msg) { + Ok(false) if now_live => { + let task = self.tasks.remove(&id).unwrap(); + self.schedule_mixer(task, id, None); + }, + Ok(false) => {}, + Ok(true) | Err(_) => self.to_cull.push(id), + } + } else { + warn!("Received post-cull message for {id:?}, discarding."); } }, Ok(SchedulerMessage::Overspill(worker_id, id, task)) => { @@ -136,7 +139,9 @@ impl Idle { } for id in self.to_cull.drain(..) { - self.tasks.remove(&id); + if let Some(tx) = self.tasks.remove(&id).and_then(|t| t.cull_handle) { + _ = tx.send_async(()).await; + } } true @@ -292,6 +297,7 @@ mod test { rtp_timestamp: i, park_time: TokInstant::now().into(), last_cost: None, + cull_handle: None, }; core.stats.add_idle_mixer(); core.stats.move_mixer_to_live(); diff --git a/src/driver/scheduler/live.rs b/src/driver/scheduler/live.rs index 76b6fba1d..46ead6e4d 100644 --- a/src/driver/scheduler/live.rs +++ b/src/driver/scheduler/live.rs @@ -554,6 +554,7 @@ impl Live { rtp_timestamp: id_0 as u32, park_time: Instant::now(), last_cost: None, + cull_handle: None, }, id, Instant::now(), @@ -617,6 +618,7 @@ impl Live { rtp_timestamp, park_time, last_cost: None, + cull_handle: None, }, ) }) diff --git a/src/driver/scheduler/task.rs b/src/driver/scheduler/task.rs index a3d7f8537..e30ba10f1 100644 --- a/src/driver/scheduler/task.rs +++ b/src/driver/scheduler/task.rs @@ -75,6 +75,8 @@ pub struct ParkedMixer { /// The last known cost of executing this task, if it had to be moved /// due to a limit on thread resources. pub last_cost: Option, + /// Handle to any forwarder task, used if this mixer is culled while idle. + pub cull_handle: Option>, } #[allow(missing_docs)] @@ -88,6 +90,7 @@ impl ParkedMixer { rtp_timestamp: random::(), park_time: Instant::now(), last_cost: None, + cull_handle: None, } } @@ -95,14 +98,28 @@ impl ParkedMixer { /// /// Any requests which would cause this mixer to become live will terminate /// this task. - pub fn spawn_forwarder(&self, tx: Sender, id: TaskId) { + pub fn spawn_forwarder(&mut self, tx: Sender, id: TaskId) { + let (kill_tx, kill_rx) = flume::bounded(1); + self.cull_handle = Some(kill_tx); + let remote_rx = self.mixer.mix_rx.clone(); tokio::spawn(async move { - while let Ok(msg) = remote_rx.recv_async().await { - let exit = msg.is_mixer_now_live(); - let dead = tx.send_async(SchedulerMessage::Do(id, msg)).await.is_err(); - if exit || dead { - break; + loop { + tokio::select! { + biased; + _ = kill_rx.recv_async() => break, + msg = remote_rx.recv_async() => { + let exit = if let Ok(msg) = msg { + let remove_self = msg.is_mixer_now_live(); + tx.send_async(SchedulerMessage::Do(id, msg)).await.is_err() || remove_self + } else { + true + }; + + if exit { + break; + } + } } } }); diff --git a/src/input/sources/http.rs b/src/input/sources/http.rs index efcec23d2..d729178a5 100644 --- a/src/input/sources/http.rs +++ b/src/input/sources/http.rs @@ -13,7 +13,6 @@ use reqwest::{ header::{HeaderMap, ACCEPT_RANGES, CONTENT_LENGTH, CONTENT_TYPE, RANGE, RETRY_AFTER}, Client, }; -use std::fmt::format; use std::{ io::{Error as IoError, ErrorKind as IoErrorKind, Result as IoResult, SeekFrom}, pin::Pin,