Skip to content

Commit

Permalink
Handle Voice close codes, prevent Songbird spinning WS threads (#1068)
Browse files Browse the repository at this point in the history
Voice `CloseCode`s now map to a type rather than a collection of constants. Correct close code handling in this way terminates the websocket task when there is no likelihood of resuming, which was causing leftover tasks to spin at the `tokio::select` in some circumstances (i.e., ::leave, which keeps the `Driver` alive).
  • Loading branch information
FelixMcFelix authored Nov 8, 2020
1 parent 38a55da commit 26c9c91
Showing 1 changed file with 33 additions and 6 deletions.
39 changes: 33 additions & 6 deletions src/driver/tasks/ws.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use super::{error::Result, message::*};
use super::message::*;
use crate::{
events::CoreContext,
model::{
payload::{Heartbeat, Speaking},
CloseCode as VoiceCloseCode,
Event as GatewayEvent,
FromPrimitive,
SpeakingState,
},
ws::{Error as WsError, ReceiverExt, SenderExt, WsStream},
};
use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use flume::Receiver;
use rand::random;
use std::time::Duration;
Expand Down Expand Up @@ -52,14 +55,15 @@ impl AuxNetwork {

loop {
let mut ws_error = false;
let mut should_reconnect = false;

let hb = time::delay_until(next_heartbeat);

tokio::select! {
_ = hb => {
ws_error = match self.send_heartbeat().await {
Err(e) => {
error!("Heartbeat send failure {:?}.", e);
should_reconnect = ws_error_is_not_final(e);
true
},
_ => false,
Expand All @@ -73,7 +77,7 @@ impl AuxNetwork {
false
},
Err(e) => {
error!("Error processing ws {:?}.", e);
should_reconnect = ws_error_is_not_final(e);
true
},
Ok(Some(msg)) => {
Expand Down Expand Up @@ -113,7 +117,7 @@ impl AuxNetwork {

ws_error |= match ssu_status {
Err(e) => {
error!("Issue sending speaking update {:?}.", e);
should_reconnect = ws_error_is_not_final(e);
true
},
_ => false,
Expand All @@ -128,8 +132,13 @@ impl AuxNetwork {
}

if ws_error {
let _ = interconnect.core.send(CoreMessage::Reconnect);
self.dont_send = true;

if should_reconnect {
let _ = interconnect.core.send(CoreMessage::Reconnect);
} else {
break;
}
}
}
}
Expand All @@ -138,7 +147,7 @@ impl AuxNetwork {
Instant::now() + self.heartbeat_interval
}

async fn send_heartbeat(&mut self) -> Result<()> {
async fn send_heartbeat(&mut self) -> Result<(), WsError> {
let nonce = random::<u64>();
self.last_heartbeat_nonce = Some(nonce);

Expand Down Expand Up @@ -203,3 +212,21 @@ pub(crate) async fn runner(
aux.run(&mut interconnect).await;
info!("WS thread finished.");
}

fn ws_error_is_not_final(err: WsError) -> bool {
match err {
WsError::WsClosed(Some(frame)) => match frame.code {
CloseCode::Library(l) =>
if let Some(code) = VoiceCloseCode::from_u16(l) {
code.should_resume()
} else {
true
},
_ => true,
},
e => {
error!("Error sending/receiving ws {:?}.", e);
true
},
}
}

0 comments on commit 26c9c91

Please sign in to comment.