From c069ac4cfb8ddd6202fe856a1d49a109cb13c41c Mon Sep 17 00:00:00 2001 From: Gnome! <45660393+GnomedDev@users.noreply.github.com> Date: Mon, 27 Jun 2022 23:20:06 +0100 Subject: [PATCH] Remove unnecessary poison messages (#6) * Remove unnecessary poison messages * Re-add Poison for CoreMessage --- src/driver/tasks/disposal.rs | 7 +---- src/driver/tasks/events.rs | 26 +++++++++---------- src/driver/tasks/message/disposal.rs | 2 -- src/driver/tasks/message/mixer.rs | 7 ----- src/driver/tasks/message/udp_rx.rs | 2 -- src/driver/tasks/message/udp_tx.rs | 6 ++--- src/driver/tasks/message/ws.rs | 3 --- src/driver/tasks/mixer/mod.rs | 12 +++------ src/driver/tasks/mod.rs | 38 +++++++++++++--------------- src/driver/tasks/udp_rx.rs | 2 +- src/driver/tasks/udp_tx.rs | 8 ++---- src/driver/tasks/ws.rs | 2 +- 12 files changed, 40 insertions(+), 75 deletions(-) diff --git a/src/driver/tasks/disposal.rs b/src/driver/tasks/disposal.rs index b10a56f39..0b13014b0 100644 --- a/src/driver/tasks/disposal.rs +++ b/src/driver/tasks/disposal.rs @@ -9,10 +9,5 @@ use tracing::instrument; /// to prevent deadline misses. #[instrument(skip(mix_rx))] pub(crate) fn runner(mix_rx: Receiver) { - loop { - match mix_rx.recv() { - Err(_) | Ok(DisposalMessage::Poison) => break, - _ => {}, - } - } + while mix_rx.recv().is_ok() {} } diff --git a/src/driver/tasks/events.rs b/src/driver/tasks/events.rs index eaf464b93..9becd2db0 100644 --- a/src/driver/tasks/events.rs +++ b/src/driver/tasks/events.rs @@ -14,13 +14,13 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver = vec![]; let mut handles: Vec = vec![]; - loop { - match evt_rx.recv_async().await { - Ok(EventMessage::AddGlobalEvent(data)) => { + while let Ok(msg) = evt_rx.recv_async().await { + match msg { + EventMessage::AddGlobalEvent(data) => { info!("Global event added."); global.add_event(data); }, - Ok(EventMessage::AddTrackEvent(i, data)) => { + EventMessage::AddTrackEvent(i, data) => { info!("Adding event to track {}.", i); let event_store = events @@ -32,7 +32,7 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver { + EventMessage::FireCoreEvent(ctx) => { let ctx = ctx.to_user_context(); let evt = ctx .to_core_event() @@ -42,17 +42,17 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver { + EventMessage::RemoveGlobalEvents => { global.remove_handlers(); }, - Ok(EventMessage::AddTrack(store, state, handle)) => { + EventMessage::AddTrack(store, state, handle) => { events.push(store); states.push(state); handles.push(handle); info!("Event state for track {} added", events.len()); }, - Ok(EventMessage::ChangeState(i, change)) => { + EventMessage::ChangeState(i, change) => { let max_states = states.len(); debug!( "Changing state for track {} of {}: {:?}", @@ -107,27 +107,25 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver { + EventMessage::RemoveTrack(i) => { info!("Event state for track {} of {} removed.", i, events.len()); events.swap_remove(i); states.swap_remove(i); handles.swap_remove(i); }, - Ok(EventMessage::RemoveAllTracks) => { + EventMessage::RemoveAllTracks => { info!("Event state for all tracks removed."); events.clear(); states.clear(); handles.clear(); }, - Ok(EventMessage::Tick) => { + EventMessage::Tick => { // NOTE: this should fire saved up blocks of state change evts. global.tick(&mut events, &mut states, &mut handles).await; }, - Err(_) | Ok(EventMessage::Poison) => { - break; - }, + EventMessage::Poison => break, } } diff --git a/src/driver/tasks/message/disposal.rs b/src/driver/tasks/message/disposal.rs index 3940046e7..795e5bcef 100644 --- a/src/driver/tasks/message/disposal.rs +++ b/src/driver/tasks/message/disposal.rs @@ -5,6 +5,4 @@ use crate::{driver::tasks::mixer::InternalTrack, tracks::TrackHandle}; pub enum DisposalMessage { Track(Box), Handle(TrackHandle), - - Poison, } diff --git a/src/driver/tasks/message/mixer.rs b/src/driver/tasks/message/mixer.rs index 45063212f..175295aa7 100644 --- a/src/driver/tasks/message/mixer.rs +++ b/src/driver/tasks/message/mixer.rs @@ -18,13 +18,6 @@ pub struct MixerConnection { pub udp_tx: Sender, } -impl Drop for MixerConnection { - fn drop(&mut self) { - drop(self.udp_rx.send(UdpRxMessage::Poison)); - drop(self.udp_tx.send(UdpTxMessage::Poison)); - } -} - pub enum MixerMessage { AddTrack(TrackContext), SetTrack(Option), diff --git a/src/driver/tasks/message/udp_rx.rs b/src/driver/tasks/message/udp_rx.rs index 9034090ae..12a6d0c31 100644 --- a/src/driver/tasks/message/udp_rx.rs +++ b/src/driver/tasks/message/udp_rx.rs @@ -6,6 +6,4 @@ use crate::driver::Config; pub enum UdpRxMessage { SetConfig(Config), ReplaceInterconnect(Interconnect), - - Poison, } diff --git a/src/driver/tasks/message/udp_tx.rs b/src/driver/tasks/message/udp_tx.rs index d3dbf3602..16b7ad1fd 100644 --- a/src/driver/tasks/message/udp_tx.rs +++ b/src/driver/tasks/message/udp_tx.rs @@ -1,6 +1,4 @@ #![allow(missing_docs)] -pub enum UdpTxMessage { - Packet(Vec), // TODO: do something cheaper. - Poison, -} +// TODO: do something cheaper. +pub type UdpTxMessage = Vec; diff --git a/src/driver/tasks/message/ws.rs b/src/driver/tasks/message/ws.rs index 1cd7e49e6..4faf68367 100644 --- a/src/driver/tasks/message/ws.rs +++ b/src/driver/tasks/message/ws.rs @@ -3,12 +3,9 @@ use super::Interconnect; use crate::ws::WsStream; -#[allow(dead_code)] pub enum WsMessage { Ws(Box), ReplaceInterconnect(Interconnect), SetKeepalive(f64), Speaking(bool), - - Poison, } diff --git a/src/driver/tasks/mixer/mod.rs b/src/driver/tasks/mixer/mod.rs index 2f36a3748..d8a771781 100644 --- a/src/driver/tasks/mixer/mod.rs +++ b/src/driver/tasks/mixer/mod.rs @@ -804,8 +804,7 @@ impl Mixer { // Test mode: send unencrypted (compressed) packets to local receiver. drop(tx.send(self.packet[..index].to_vec().into())); } else { - conn.udp_tx - .send(UdpTxMessage::Packet(self.packet[..index].to_vec()))?; + conn.udp_tx.send(self.packet[..index].to_vec())?; } #[cfg(not(test))] @@ -815,8 +814,7 @@ impl Mixer { // TODO: This is dog slow, don't do this. // Can we replace this with a shared ring buffer + semaphore? // or the BBQueue crate? - conn.udp_tx - .send(UdpTxMessage::Packet(self.packet[..index].to_vec()))?; + conn.udp_tx.send(self.packet[..index].to_vec())?; } let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( @@ -955,9 +953,5 @@ pub(crate) fn runner( async_handle: Handle, config: Config, ) { - let mut mixer = Mixer::new(mix_rx, async_handle, interconnect, config); - - mixer.run(); - - drop(mixer.disposer.send(DisposalMessage::Poison)); + Mixer::new(mix_rx, async_handle, interconnect, config).run(); } diff --git a/src/driver/tasks/mod.rs b/src/driver/tasks/mod.rs index 275d3a318..945c6e222 100644 --- a/src/driver/tasks/mod.rs +++ b/src/driver/tasks/mod.rs @@ -21,7 +21,7 @@ use crate::{ Config, ConnectionInfo, }; -use flume::{Receiver, RecvError, Sender}; +use flume::{Receiver, Sender}; use message::*; use tokio::{runtime::Handle, spawn, time::sleep as tsleep}; use tracing::{debug, instrument, trace}; @@ -70,9 +70,9 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { + while let Ok(msg) = rx.recv_async().await { + match msg { + CoreMessage::ConnectWithResult(info, tx) => { config = if let Some(new_config) = next_config.take() { drop( interconnect @@ -98,7 +98,7 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { + CoreMessage::RetryConnect(retry_idx) => { debug!("Retrying idx: {} (vs. {})", retry_idx, attempt_idx); if retry_idx == attempt_idx { if let Some(progress) = retrying.take() { @@ -108,7 +108,7 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { + CoreMessage::Disconnect => { let last_conn = connection.take(); drop(interconnect.mixer.send(MixerMessage::DropConn)); drop(interconnect.mixer.send(MixerMessage::RebuildEncoder)); @@ -123,7 +123,7 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { + CoreMessage::SignalWsClosure(ws_idx, ws_info, mut reason) => { // if idx is not a match, quash reason // (i.e., prevent users from mistakenly trying to reconnect for an *old* dead conn). // if it *is* a match, the conn needs to die! @@ -144,32 +144,32 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { + CoreMessage::SetTrack(s) => { drop(interconnect.mixer.send(MixerMessage::SetTrack(s))); }, - Ok(CoreMessage::AddTrack(s)) => { + CoreMessage::AddTrack(s) => { drop(interconnect.mixer.send(MixerMessage::AddTrack(s))); }, - Ok(CoreMessage::SetBitrate(b)) => { + CoreMessage::SetBitrate(b) => { drop(interconnect.mixer.send(MixerMessage::SetBitrate(b))); }, - Ok(CoreMessage::SetConfig(mut new_config)) => { + CoreMessage::SetConfig(mut new_config) => { next_config = Some(new_config.clone()); new_config.make_safe(&config, connection.is_some()); drop(interconnect.mixer.send(MixerMessage::SetConfig(new_config))); }, - Ok(CoreMessage::AddEvent(evt)) => { + CoreMessage::AddEvent(evt) => { drop(interconnect.events.send(EventMessage::AddGlobalEvent(evt))); }, - Ok(CoreMessage::RemoveGlobalEvents) => { + CoreMessage::RemoveGlobalEvents => { drop(interconnect.events.send(EventMessage::RemoveGlobalEvents)); }, - Ok(CoreMessage::Mute(m)) => { + CoreMessage::Mute(m) => { drop(interconnect.mixer.send(MixerMessage::SetMute(m))); }, - Ok(CoreMessage::Reconnect) => { + CoreMessage::Reconnect => { if let Some(mut conn) = connection.take() { // try once: if interconnect, try again. // if still issue, full connect. @@ -208,7 +208,7 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender + CoreMessage::FullReconnect => if let Some(conn) = connection.take() { let info = conn.info.clone(); @@ -216,12 +216,10 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { + CoreMessage::RebuildInterconnect => { interconnect.restart_volatile_internals(); }, - Err(RecvError::Disconnected) | Ok(CoreMessage::Poison) => { - break; - }, + CoreMessage::Poison => break, } } diff --git a/src/driver/tasks/udp_rx.rs b/src/driver/tasks/udp_rx.rs index bd024fc4a..c0bd3d2d1 100644 --- a/src/driver/tasks/udp_rx.rs +++ b/src/driver/tasks/udp_rx.rs @@ -259,7 +259,7 @@ impl UdpRx { Ok(UdpRxMessage::SetConfig(c)) => { self.config = c; }, - Ok(UdpRxMessage::Poison) | Err(_) => break, + Err(flume::RecvError::Disconnected) => break, } } } diff --git a/src/driver/tasks/udp_tx.rs b/src/driver/tasks/udp_tx.rs index 88b685725..7eb9e852d 100644 --- a/src/driver/tasks/udp_tx.rs +++ b/src/driver/tasks/udp_tx.rs @@ -34,16 +34,12 @@ impl UdpTx { } ka_time += UDP_KEEPALIVE_GAP; }, - Ok(Ok(UdpTxMessage::Packet(p))) => + Ok(Ok(p)) => if let Err(e) = self.udp_tx.send(&p[..]).await { error!("Fatal UDP packet send error: {:?}.", e); break; }, - Ok(Err(e)) => { - error!("Fatal UDP packet receive error: {:?}.", e); - break; - }, - Ok(Ok(UdpTxMessage::Poison)) => { + Ok(Err(flume::RecvError::Disconnected)) => { break; }, } diff --git a/src/driver/tasks/ws.rs b/src/driver/tasks/ws.rs index c9137f361..c7a3626a3 100644 --- a/src/driver/tasks/ws.rs +++ b/src/driver/tasks/ws.rs @@ -140,7 +140,7 @@ impl AuxNetwork { } } }, - Err(_) | Ok(WsMessage::Poison) => { + Err(flume::RecvError::Disconnected) => { break; }, }