Skip to content

Commit

Permalink
Prevent mixer thread from waking while inactive (#46)
Browse files Browse the repository at this point in the history
This change prevents mixer threads from waking every 20ms without an active voice connection. This was leading to unacceptably high CPU usage in cases where users needed to preserve this state between many active connections. Additionally, this modifies the documentation of `Songbird::leave` to emphasise why users would prefer to `remove` their calls.

This was tested by examining the CPU usage in task manager before and after the change was made, using a control of 10k manually created `Driver` instances. After creation is finished, the Drivers no longer saturate a 6-core laptop Intel i7 (while they very much did so before).

Closes #42.
  • Loading branch information
FelixMcFelix authored Mar 14, 2021
1 parent c488ce3 commit a9b4cb7
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 118 deletions.
275 changes: 157 additions & 118 deletions src/driver/tasks/mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,131 +109,54 @@ impl Mixer {
let mut conn_failure = false;

'runner: loop {
loop {
use MixerMessage::*;

let error = match self.mix_rx.try_recv() {
Ok(AddTrack(mut t)) => {
t.source.prep_with_handle(self.async_handle.clone());
self.add_track(t)
},
Ok(SetTrack(t)) => {
self.tracks.clear();

let mut out = self.fire_event(EventMessage::RemoveAllTracks);

if let Some(mut t) = t {
t.source.prep_with_handle(self.async_handle.clone());

// Do this unconditionally: this affects local state infallibly,
// with the event installation being the remote part.
if let Err(e) = self.add_track(t) {
out = Err(e);
if self.conn_active.is_some() {
loop {
use MixerMessage::*;

match self.mix_rx.try_recv() {
Ok(m) => {
let (events, conn, should_exit) = self.handle_message(m);
events_failure |= events;
conn_failure |= conn;

if should_exit {
break 'runner;
}
}
},

out
},
Ok(SetBitrate(b)) => {
self.bitrate = b;
if let Err(e) = self.set_bitrate(b) {
error!("Failed to update bitrate {:?}", e);
}
Ok(())
},
Ok(SetMute(m)) => {
self.muted = m;
Ok(())
},
Ok(SetConn(conn, ssrc)) => {
self.conn_active = Some(conn);
let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect(
"Too few bytes in self.packet for RTP header.\
(Blame: VOICE_PACKET_MAX?)",
);
rtp.set_ssrc(ssrc);
rtp.set_sequence(random::<u16>().into());
rtp.set_timestamp(random::<u32>().into());
self.deadline = Instant::now();
Ok(())
},
Ok(DropConn) => {
self.conn_active = None;
Ok(())
},
Ok(ReplaceInterconnect(i)) => {
self.prevent_events = false;
if let Some(ws) = &self.ws {
conn_failure |=
ws.send(WsMessage::ReplaceInterconnect(i.clone())).is_err();
}
if let Some(conn) = &self.conn_active {
conn_failure |= conn
.udp_rx
.send(UdpRxMessage::ReplaceInterconnect(i.clone()))
.is_err();
}
self.interconnect = i;
Err(TryRecvError::Disconnected) => {
break 'runner;
},

self.rebuild_tracks()
},
Ok(SetConfig(new_config)) => {
self.config = new_config.clone();
Err(TryRecvError::Empty) => {
break;
},
};
}

if self.tracks.capacity() < self.config.preallocated_tracks {
self.tracks
.reserve(self.config.preallocated_tracks - self.tracks.len());
}
if let Err(e) = self.cycle().and_then(|_| self.audio_commands_events()) {
events_failure |= e.should_trigger_interconnect_rebuild();
conn_failure |= e.should_trigger_connect();

if let Some(conn) = &self.conn_active {
conn_failure |= conn
.udp_rx
.send(UdpRxMessage::SetConfig(new_config))
.is_err();
error!("Mixer thread cycle: {:?}", e);
}
} else {
match self.mix_rx.recv() {
Ok(m) => {
let (events, conn, should_exit) = self.handle_message(m);
events_failure |= events;
conn_failure |= conn;

if should_exit {
break 'runner;
}

Ok(())
},
Ok(RebuildEncoder) => match new_encoder(self.bitrate) {
Ok(encoder) => {
self.encoder = encoder;
Ok(())
},
Err(e) => {
error!("Failed to rebuild encoder. Resetting bitrate. {:?}", e);
self.bitrate = DEFAULT_BITRATE;
self.encoder = new_encoder(self.bitrate)
.expect("Failed fallback rebuild of OpusEncoder with safe inputs.");
Ok(())
},
},
Ok(Ws(new_ws_handle)) => {
self.ws = new_ws_handle;
Ok(())
},

Err(TryRecvError::Disconnected) | Ok(Poison) => {
Err(_) => {
break 'runner;
},

Err(TryRecvError::Empty) => {
break;
},
};

if let Err(e) = error {
events_failure |= e.should_trigger_interconnect_rebuild();
conn_failure |= e.should_trigger_connect();
}
}

if let Err(e) = self.cycle().and_then(|_| self.audio_commands_events()) {
events_failure |= e.should_trigger_interconnect_rebuild();
conn_failure |= e.should_trigger_connect();

error!("Mixer thread cycle: {:?}", e);
}

// event failure? rebuild interconnect.
// ws or udp failure? full connect
// (soft reconnect is covered by the ws task.)
Expand Down Expand Up @@ -266,6 +189,126 @@ impl Mixer {
}
}

#[inline]
fn handle_message(&mut self, msg: MixerMessage) -> (bool, bool, bool) {
let mut events_failure = false;
let mut conn_failure = false;
let mut should_exit = false;

use MixerMessage::*;

let error = match msg {
AddTrack(mut t) => {
t.source.prep_with_handle(self.async_handle.clone());
self.add_track(t)
},
SetTrack(t) => {
self.tracks.clear();

let mut out = self.fire_event(EventMessage::RemoveAllTracks);

if let Some(mut t) = t {
t.source.prep_with_handle(self.async_handle.clone());

// Do this unconditionally: this affects local state infallibly,
// with the event installation being the remote part.
if let Err(e) = self.add_track(t) {
out = Err(e);
}
}

out
},
SetBitrate(b) => {
self.bitrate = b;
if let Err(e) = self.set_bitrate(b) {
error!("Failed to update bitrate {:?}", e);
}
Ok(())
},
SetMute(m) => {
self.muted = m;
Ok(())
},
SetConn(conn, ssrc) => {
self.conn_active = Some(conn);
let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect(
"Too few bytes in self.packet for RTP header.\
(Blame: VOICE_PACKET_MAX?)",
);
rtp.set_ssrc(ssrc);
rtp.set_sequence(random::<u16>().into());
rtp.set_timestamp(random::<u32>().into());
self.deadline = Instant::now();
Ok(())
},
DropConn => {
self.conn_active = None;
Ok(())
},
ReplaceInterconnect(i) => {
self.prevent_events = false;
if let Some(ws) = &self.ws {
conn_failure |= ws.send(WsMessage::ReplaceInterconnect(i.clone())).is_err();
}
if let Some(conn) = &self.conn_active {
conn_failure |= conn
.udp_rx
.send(UdpRxMessage::ReplaceInterconnect(i.clone()))
.is_err();
}
self.interconnect = i;

self.rebuild_tracks()
},
SetConfig(new_config) => {
self.config = new_config.clone();

if self.tracks.capacity() < self.config.preallocated_tracks {
self.tracks
.reserve(self.config.preallocated_tracks - self.tracks.len());
}

if let Some(conn) = &self.conn_active {
conn_failure |= conn
.udp_rx
.send(UdpRxMessage::SetConfig(new_config))
.is_err();
}

Ok(())
},
RebuildEncoder => match new_encoder(self.bitrate) {
Ok(encoder) => {
self.encoder = encoder;
Ok(())
},
Err(e) => {
error!("Failed to rebuild encoder. Resetting bitrate. {:?}", e);
self.bitrate = DEFAULT_BITRATE;
self.encoder = new_encoder(self.bitrate)
.expect("Failed fallback rebuild of OpusEncoder with safe inputs.");
Ok(())
},
},
Ws(new_ws_handle) => {
self.ws = new_ws_handle;
Ok(())
},
Poison => {
should_exit = true;
Ok(())
},
};

if let Err(e) = error {
events_failure |= e.should_trigger_interconnect_rebuild();
conn_failure |= e.should_trigger_connect();
}

(events_failure, conn_failure, should_exit)
}

#[inline]
fn fire_event(&self, event: EventMessage) -> Result<()> {
// As this task is responsible for noticing the potential death of an event context,
Expand Down Expand Up @@ -360,17 +403,13 @@ impl Mixer {
return;
}

// FIXME: make choice of spin-sleep/imprecise sleep optional in next breaking.
self.sleeper
.sleep(self.deadline.saturating_duration_since(Instant::now()));
self.deadline += TIMESTEP_LENGTH;
}

pub fn cycle(&mut self) -> Result<()> {
if self.conn_active.is_none() {
self.march_deadline();
return Ok(());
}

let mut mix_buffer = [0f32; STEREO_FRAME_SIZE];

// Walk over all the audio files, combining into one audio frame according
Expand Down
4 changes: 4 additions & 0 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,17 @@ impl Songbird {
/// associated voice channel, if connected.
///
/// This will _not_ drop the handler, and will preserve it and its settings.
/// If you do not need to reuse event handlers, configuration, or active tracks
/// in the underlying driver *consider calling [`remove`]* to release tasks,
/// threads, and memory.
///
/// This is a wrapper around [getting][`get`] a handler and calling
/// [`leave`] on it.
///
/// [`Call`]: Call
/// [`get`]: Songbird::get
/// [`leave`]: Call::leave
/// [`remove`]: Songbird::remove
#[inline]
pub async fn leave<G: Into<GuildId>>(&self, guild_id: G) -> JoinResult<()> {
self._leave(guild_id.into()).await
Expand Down

0 comments on commit a9b4cb7

Please sign in to comment.