Skip to content

Commit

Permalink
Remove unnecessary poison messages (#6)
Browse files Browse the repository at this point in the history
* Remove unnecessary poison messages

* Re-add Poison for CoreMessage
  • Loading branch information
GnomedDev authored Jun 27, 2022
1 parent 90f813a commit c069ac4
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 75 deletions.
7 changes: 1 addition & 6 deletions src/driver/tasks/disposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,5 @@ use tracing::instrument;
/// to prevent deadline misses.
#[instrument(skip(mix_rx))]
pub(crate) fn runner(mix_rx: Receiver<DisposalMessage>) {
loop {
match mix_rx.recv() {
Err(_) | Ok(DisposalMessage::Poison) => break,
_ => {},
}
}
while mix_rx.recv().is_ok() {}
}
26 changes: 12 additions & 14 deletions src/driver/tasks/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver<EventMe
let mut states: Vec<TrackState> = vec![];
let mut handles: Vec<TrackHandle> = vec![];

loop {
match evt_rx.recv_async().await {
Ok(EventMessage::AddGlobalEvent(data)) => {
while let Ok(msg) = evt_rx.recv_async().await {
match msg {
EventMessage::AddGlobalEvent(data) => {
info!("Global event added.");
global.add_event(data);
},
Ok(EventMessage::AddTrackEvent(i, data)) => {
EventMessage::AddTrackEvent(i, data) => {
info!("Adding event to track {}.", i);

let event_store = events
Expand All @@ -32,7 +32,7 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver<EventMe

event_store.add_event(data, state.position);
},
Ok(EventMessage::FireCoreEvent(ctx)) => {
EventMessage::FireCoreEvent(ctx) => {
let ctx = ctx.to_user_context();
let evt = ctx
.to_core_event()
Expand All @@ -42,17 +42,17 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver<EventMe

global.fire_core_event(evt, ctx).await;
},
Ok(EventMessage::RemoveGlobalEvents) => {
EventMessage::RemoveGlobalEvents => {
global.remove_handlers();
},
Ok(EventMessage::AddTrack(store, state, handle)) => {
EventMessage::AddTrack(store, state, handle) => {
events.push(store);
states.push(state);
handles.push(handle);

info!("Event state for track {} added", events.len());
},
Ok(EventMessage::ChangeState(i, change)) => {
EventMessage::ChangeState(i, change) => {
let max_states = states.len();
debug!(
"Changing state for track {} of {}: {:?}",
Expand Down Expand Up @@ -107,27 +107,25 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver<EventMe
},
}
},
Ok(EventMessage::RemoveTrack(i)) => {
EventMessage::RemoveTrack(i) => {
info!("Event state for track {} of {} removed.", i, events.len());

events.swap_remove(i);
states.swap_remove(i);
handles.swap_remove(i);
},
Ok(EventMessage::RemoveAllTracks) => {
EventMessage::RemoveAllTracks => {
info!("Event state for all tracks removed.");

events.clear();
states.clear();
handles.clear();
},
Ok(EventMessage::Tick) => {
EventMessage::Tick => {
// NOTE: this should fire saved up blocks of state change evts.
global.tick(&mut events, &mut states, &mut handles).await;
},
Err(_) | Ok(EventMessage::Poison) => {
break;
},
EventMessage::Poison => break,
}
}

Expand Down
2 changes: 0 additions & 2 deletions src/driver/tasks/message/disposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,4 @@ use crate::{driver::tasks::mixer::InternalTrack, tracks::TrackHandle};
pub enum DisposalMessage {
Track(Box<InternalTrack>),
Handle(TrackHandle),

Poison,
}
7 changes: 0 additions & 7 deletions src/driver/tasks/message/mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ pub struct MixerConnection {
pub udp_tx: Sender<UdpTxMessage>,
}

impl Drop for MixerConnection {
fn drop(&mut self) {
drop(self.udp_rx.send(UdpRxMessage::Poison));
drop(self.udp_tx.send(UdpTxMessage::Poison));
}
}

pub enum MixerMessage {
AddTrack(TrackContext),
SetTrack(Option<TrackContext>),
Expand Down
2 changes: 0 additions & 2 deletions src/driver/tasks/message/udp_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,4 @@ use crate::driver::Config;
pub enum UdpRxMessage {
SetConfig(Config),
ReplaceInterconnect(Interconnect),

Poison,
}
6 changes: 2 additions & 4 deletions src/driver/tasks/message/udp_tx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
#![allow(missing_docs)]

pub enum UdpTxMessage {
Packet(Vec<u8>), // TODO: do something cheaper.
Poison,
}
// TODO: do something cheaper.
pub type UdpTxMessage = Vec<u8>;
3 changes: 0 additions & 3 deletions src/driver/tasks/message/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
use super::Interconnect;
use crate::ws::WsStream;

#[allow(dead_code)]
pub enum WsMessage {
Ws(Box<WsStream>),
ReplaceInterconnect(Interconnect),
SetKeepalive(f64),
Speaking(bool),

Poison,
}
12 changes: 3 additions & 9 deletions src/driver/tasks/mixer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,7 @@ impl Mixer {
// Test mode: send unencrypted (compressed) packets to local receiver.
drop(tx.send(self.packet[..index].to_vec().into()));
} else {
conn.udp_tx
.send(UdpTxMessage::Packet(self.packet[..index].to_vec()))?;
conn.udp_tx.send(self.packet[..index].to_vec())?;
}

#[cfg(not(test))]
Expand All @@ -815,8 +814,7 @@ impl Mixer {
// TODO: This is dog slow, don't do this.
// Can we replace this with a shared ring buffer + semaphore?
// or the BBQueue crate?
conn.udp_tx
.send(UdpTxMessage::Packet(self.packet[..index].to_vec()))?;
conn.udp_tx.send(self.packet[..index].to_vec())?;
}

let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect(
Expand Down Expand Up @@ -955,9 +953,5 @@ pub(crate) fn runner(
async_handle: Handle,
config: Config,
) {
let mut mixer = Mixer::new(mix_rx, async_handle, interconnect, config);

mixer.run();

drop(mixer.disposer.send(DisposalMessage::Poison));
Mixer::new(mix_rx, async_handle, interconnect, config).run();
}
38 changes: 18 additions & 20 deletions src/driver/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
Config,
ConnectionInfo,
};
use flume::{Receiver, RecvError, Sender};
use flume::{Receiver, Sender};
use message::*;
use tokio::{runtime::Handle, spawn, time::sleep as tsleep};
use tracing::{debug, instrument, trace};
Expand Down Expand Up @@ -70,9 +70,9 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
let mut retrying = None;
let mut attempt_idx = 0;

loop {
match rx.recv_async().await {
Ok(CoreMessage::ConnectWithResult(info, tx)) => {
while let Ok(msg) = rx.recv_async().await {
match msg {
CoreMessage::ConnectWithResult(info, tx) => {
config = if let Some(new_config) = next_config.take() {
drop(
interconnect
Expand All @@ -98,7 +98,7 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
drop(tx.send(Ok(())));
}
},
Ok(CoreMessage::RetryConnect(retry_idx)) => {
CoreMessage::RetryConnect(retry_idx) => {
debug!("Retrying idx: {} (vs. {})", retry_idx, attempt_idx);
if retry_idx == attempt_idx {
if let Some(progress) = retrying.take() {
Expand All @@ -108,7 +108,7 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
}
}
},
Ok(CoreMessage::Disconnect) => {
CoreMessage::Disconnect => {
let last_conn = connection.take();
drop(interconnect.mixer.send(MixerMessage::DropConn));
drop(interconnect.mixer.send(MixerMessage::RebuildEncoder));
Expand All @@ -123,7 +123,7 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
)));
}
},
Ok(CoreMessage::SignalWsClosure(ws_idx, ws_info, mut reason)) => {
CoreMessage::SignalWsClosure(ws_idx, ws_info, mut reason) => {
// if idx is not a match, quash reason
// (i.e., prevent users from mistakenly trying to reconnect for an *old* dead conn).
// if it *is* a match, the conn needs to die!
Expand All @@ -144,32 +144,32 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
}),
)));
},
Ok(CoreMessage::SetTrack(s)) => {
CoreMessage::SetTrack(s) => {
drop(interconnect.mixer.send(MixerMessage::SetTrack(s)));
},
Ok(CoreMessage::AddTrack(s)) => {
CoreMessage::AddTrack(s) => {
drop(interconnect.mixer.send(MixerMessage::AddTrack(s)));
},
Ok(CoreMessage::SetBitrate(b)) => {
CoreMessage::SetBitrate(b) => {
drop(interconnect.mixer.send(MixerMessage::SetBitrate(b)));
},
Ok(CoreMessage::SetConfig(mut new_config)) => {
CoreMessage::SetConfig(mut new_config) => {
next_config = Some(new_config.clone());

new_config.make_safe(&config, connection.is_some());

drop(interconnect.mixer.send(MixerMessage::SetConfig(new_config)));
},
Ok(CoreMessage::AddEvent(evt)) => {
CoreMessage::AddEvent(evt) => {
drop(interconnect.events.send(EventMessage::AddGlobalEvent(evt)));
},
Ok(CoreMessage::RemoveGlobalEvents) => {
CoreMessage::RemoveGlobalEvents => {
drop(interconnect.events.send(EventMessage::RemoveGlobalEvents));
},
Ok(CoreMessage::Mute(m)) => {
CoreMessage::Mute(m) => {
drop(interconnect.mixer.send(MixerMessage::SetMute(m)));
},
Ok(CoreMessage::Reconnect) => {
CoreMessage::Reconnect => {
if let Some(mut conn) = connection.take() {
// try once: if interconnect, try again.
// if still issue, full connect.
Expand Down Expand Up @@ -208,20 +208,18 @@ async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
}
}
},
Ok(CoreMessage::FullReconnect) =>
CoreMessage::FullReconnect =>
if let Some(conn) = connection.take() {
let info = conn.info.clone();

connection = ConnectionRetryData::reconnect(info, &mut attempt_idx)
.attempt(&mut retrying, &interconnect, &config)
.await;
},
Ok(CoreMessage::RebuildInterconnect) => {
CoreMessage::RebuildInterconnect => {
interconnect.restart_volatile_internals();
},
Err(RecvError::Disconnected) | Ok(CoreMessage::Poison) => {
break;
},
CoreMessage::Poison => break,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/driver/tasks/udp_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl UdpRx {
Ok(UdpRxMessage::SetConfig(c)) => {
self.config = c;
},
Ok(UdpRxMessage::Poison) | Err(_) => break,
Err(flume::RecvError::Disconnected) => break,
}
}
}
Expand Down
8 changes: 2 additions & 6 deletions src/driver/tasks/udp_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,12 @@ impl UdpTx {
}
ka_time += UDP_KEEPALIVE_GAP;
},
Ok(Ok(UdpTxMessage::Packet(p))) =>
Ok(Ok(p)) =>
if let Err(e) = self.udp_tx.send(&p[..]).await {
error!("Fatal UDP packet send error: {:?}.", e);
break;
},
Ok(Err(e)) => {
error!("Fatal UDP packet receive error: {:?}.", e);
break;
},
Ok(Ok(UdpTxMessage::Poison)) => {
Ok(Err(flume::RecvError::Disconnected)) => {
break;
},
}
Expand Down
2 changes: 1 addition & 1 deletion src/driver/tasks/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl AuxNetwork {
}
}
},
Err(_) | Ok(WsMessage::Poison) => {
Err(flume::RecvError::Disconnected) => {
break;
},
}
Expand Down

0 comments on commit c069ac4

Please sign in to comment.