Skip to content

Commit

Permalink
Gateway: Add connection timeout, add Config to gateway. (#51)
Browse files Browse the repository at this point in the history
This change fixes tasks hanging due to rare cases of messages being lost between full Discord reconnections by placing a configurable timeout on the `ConnectionInfo` responses. This is a companion fix to [serenity#1255](serenity-rs/serenity#1255). To make this doable, `Config`s are now used by all versions of `Songbird`/`Call`, and relevant functions are  added to simplify setup with configuration. These are now non-exhaustive, correcting an earlier oversight. For future extensibility, this PR moves the return type of `join`/`join_gateway` into a custom future (no longer leaking flume's `RecvFut` type).

Additionally, this fixes the Makefile's feature sets for driver/gateway-only compilation.

This is a breaking change in:
* the return types of `join`/`join_gateway`
* moving `crate::driver::Config` -> `crate::Config`,
* `Config` and `JoinError` becoming `#[non_breaking]`.

This was tested via `cargo make ready`, and by testing `examples/serenity/voice_receive` with various timeout settings.
  • Loading branch information
FelixMcFelix committed May 10, 2021
1 parent bc952d0 commit d303e0a
Show file tree
Hide file tree
Showing 18 changed files with 426 additions and 119 deletions.
1 change: 1 addition & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ If the driver feature is enabled, then every `Call` is/has an associated `Driver
src/manager.rs
src/handler.rs
src/serenity.rs
src/join.rs
```

# Driver
Expand Down
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ version = "0.3"
optional = true
version = "0.11"

[dependencies.pin-project]
optional = true
version = "1"

[dependencies.rand]
optional = true
version = "0.8"
Expand Down Expand Up @@ -142,11 +146,13 @@ default = [
gateway = [
"gateway-core",
"tokio/sync",
"tokio/time",
]
gateway-core = [
"dashmap",
"flume",
"parking_lot",
"pin-project",
"spinning_top",
]
driver = [
Expand Down
13 changes: 9 additions & 4 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ command = "cargo"
dependencies = ["format"]

[tasks.build-gateway]
args = ["build", "--features", "serenity-rustls"]
args = ["build", "--no-default-features", "--features", "serenity-rustls"]
command = "cargo"
dependencies = ["format"]

[tasks.build-driver]
args = ["build", "--features", "driver,rustls"]
args = ["build", "--no-default-features", "--features", "driver,rustls"]
command = "cargo"
dependencies = ["format"]

[tasks.build-old-tokio]
command = "cargo"
args = ["build", "--features", "serenity-rustls-tokio-02,driver-tokio-02"]
args = ["build", "--no-default-features", "--features", "serenity-rustls-tokio-02,driver-tokio-02"]
dependencies = ["format"]

[tasks.build-variants]
Expand All @@ -45,7 +45,12 @@ command = "cargo"
args = ["bench", "--features", "internals,full-doc"]

[tasks.doc]
command = "cargo"
args = ["doc", "--features", "full-doc"]

[tasks.doc-open]
command = "cargo"
args = ["doc", "--features", "full-doc", "--open"]

[tasks.ready]
dependencies = ["format", "test", "build-variants", "build-examples", "doc", "clippy"]
dependencies = ["format", "test", "build-variants", "build-examples", "doc", "clippy"]
13 changes: 5 additions & 8 deletions examples/serenity/voice_receive/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use serenity::{
};

use songbird::{
driver::{Config as DriverConfig, DecodeMode},
driver::DecodeMode,
model::payload::{ClientConnect, ClientDisconnect, Speaking},
Config,
CoreEvent,
Event,
EventContext,
EventHandler as VoiceEventHandler,
SerenityInit,
Songbird,
};

struct Handler;
Expand Down Expand Up @@ -167,16 +167,13 @@ async fn main() {
// Here, we need to configure Songbird to decode all incoming voice packets.
// If you want, you can do this on a per-call basis---here, we need it to
// read the audio data that other people are sending us!
let songbird = Songbird::serenity();
songbird.set_config(
DriverConfig::default()
.decode_mode(DecodeMode::Decode)
);
let songbird_config = Config::default()
.decode_mode(DecodeMode::Decode);

let mut client = Client::builder(&token)
.event_handler(Handler)
.framework(framework)
.register_songbird_with(songbird.into())
.register_songbird_from_config(songbird_config)
.await
.expect("Err creating client");

Expand Down
41 changes: 38 additions & 3 deletions src/driver/config.rs → src/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use super::{CryptoMode, DecodeMode};
#[cfg(feature = "driver-core")]
use super::driver::{CryptoMode, DecodeMode};

/// Configuration for the inner Driver.
///
#[cfg(feature = "gateway-core")]
use std::time::Duration;

/// Configuration for drivers and calls.
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct Config {
#[cfg(feature = "driver-core")]
/// Selected tagging mode for voice packet encryption.
///
/// Defaults to [`CryptoMode::Normal`].
Expand All @@ -14,6 +19,7 @@ pub struct Config {
///
/// [`CryptoMode::Normal`]: CryptoMode::Normal
pub crypto_mode: CryptoMode,
#[cfg(feature = "driver-core")]
/// Configures whether decoding and decryption occur for all received packets.
///
/// If voice receiving voice packets, generally you should choose [`DecodeMode::Decode`].
Expand All @@ -29,6 +35,20 @@ pub struct Config {
/// [`DecodeMode::Pass`]: DecodeMode::Pass
/// [user speaking events]: crate::events::CoreEvent::SpeakingUpdate
pub decode_mode: DecodeMode,
#[cfg(feature = "gateway-core")]
/// Configures the amount of time to wait for Discord to reply with connection information
/// if [`Call::join`]/[`join_gateway`] are used.
///
/// This is a useful fallback in the event that:
/// * the underlying Discord client restarts and loses a join request, or
/// * a channel join fails because the bot is already believed to be there.
///
/// Defaults to 10 seconds. If set to `None`, connections will never time out.
///
/// [`Call::join`]: crate::Call::join
/// [`join_gateway`]: crate::Call::join_gateway
pub gateway_timeout: Option<Duration>,
#[cfg(feature = "driver-core")]
/// Number of concurrently active tracks to allocate memory for.
///
/// This should be set at, or just above, the maximum number of tracks
Expand All @@ -46,13 +66,19 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Self {
#[cfg(feature = "driver-core")]
crypto_mode: CryptoMode::Normal,
#[cfg(feature = "driver-core")]
decode_mode: DecodeMode::Decrypt,
#[cfg(feature = "gateway-core")]
gateway_timeout: Some(Duration::from_secs(10)),
#[cfg(feature = "driver-core")]
preallocated_tracks: 1,
}
}
}

#[cfg(feature = "driver-core")]
impl Config {
/// Sets this `Config`'s chosen cryptographic tagging scheme.
pub fn crypto_mode(mut self, crypto_mode: CryptoMode) -> Self {
Expand All @@ -79,3 +105,12 @@ impl Config {
}
}
}

#[cfg(feature = "gateway-core")]
impl Config {
/// Sets this `Config`'s timeout for joining a voice channel.
pub fn gateway_timeout(mut self, gateway_timeout: Option<Duration>) -> Self {
self.gateway_timeout = gateway_timeout;
self
}
}
11 changes: 8 additions & 3 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
#[cfg(feature = "internals")]
pub mod bench_internals;

mod config;
pub(crate) mod connection;
mod crypto;
mod decode_mode;
pub(crate) mod tasks;

pub use config::Config;
use connection::error::{Error, Result};
pub use crypto::CryptoMode;
pub(crate) use crypto::CryptoState;
Expand All @@ -29,6 +27,7 @@ use crate::{
events::EventData,
input::Input,
tracks::{self, Track, TrackHandle},
Config,
ConnectionInfo,
Event,
EventHandler,
Expand Down Expand Up @@ -212,13 +211,19 @@ impl Driver {
self.send(CoreMessage::SetTrack(None))
}

/// Sets the configuration for this driver.
/// Sets the configuration for this driver (and parent `Call`, if applicable).
#[instrument(skip(self))]
pub fn set_config(&mut self, config: Config) {
self.config = config.clone();
self.send(CoreMessage::SetConfig(config))
}

/// Returns a view of this driver's configuration.
#[instrument(skip(self))]
pub fn config(&self) -> &Config {
&self.config
}

/// Attach a global event handler to an audio context. Global events may receive
/// any [`EventContext`].
///
Expand Down
1 change: 1 addition & 0 deletions src/driver/tasks/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub enum Recipient {
pub type Result<T> = std::result::Result<T, Error>;

#[derive(Debug)]
#[non_exhaustive]
pub enum Error {
Crypto(CryptoError),
/// Received an illegal voice packet on the voice UDP socket.
Expand Down
3 changes: 2 additions & 1 deletion src/driver/tasks/mixer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::{disposal, error::Result, message::*, Config};
use super::{disposal, error::Result, message::*};
use crate::{
constants::*,
tracks::{PlayMode, Track},
Config,
};
use audiopus::{
coder::Encoder as OpusEncoder,
Expand Down
7 changes: 2 additions & 5 deletions src/driver/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@ pub(crate) mod udp_rx;
pub(crate) mod udp_tx;
pub(crate) mod ws;

use super::{
connection::{error::Error as ConnectionError, Connection},
Config,
};
use crate::events::CoreContext;
use super::connection::{error::Error as ConnectionError, Connection};
use crate::{events::CoreContext, Config};
use flume::{Receiver, RecvError, Sender};
use message::*;
#[cfg(not(feature = "tokio-02-marker"))]
Expand Down
7 changes: 2 additions & 5 deletions src/driver/tasks/udp_rx.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use super::{
error::{Error, Result},
message::*,
Config,
};
use crate::{
constants::*,
driver::{Config, DecodeMode},
events::CoreContext,
};
use crate::{constants::*, driver::DecodeMode, events::CoreContext};
use audiopus::{
coder::Decoder as OpusDecoder,
error::{Error as OpusError, ErrorCode},
Expand Down
42 changes: 42 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use twilight_gateway::shard::CommandError;

#[cfg(feature = "gateway-core")]
#[derive(Debug)]
#[non_exhaustive]
/// Error returned when a manager or call handler is
/// unable to send messages over Discord's gateway.
pub enum JoinError {
Expand All @@ -23,8 +24,23 @@ pub enum JoinError {
///
/// [`Call`]: crate::Call
NoCall,
/// Connection details were not received from Discord in the
/// time given in [the `Call`'s configuration].
///
/// This can occur if a message is lost by the Discord client
/// between restarts, or if Discord's gateway believes that
/// this bot is still in the channel it attempts to join.
///
/// *Users should `leave` the server on the gateway before
/// re-attempting connection.*
///
/// [the `Call`'s configuration]: crate::Config
TimedOut,
#[cfg(feature = "driver-core")]
/// The driver failed to establish a voice connection.
///
/// *Users should `leave` the server on the gateway before
/// re-attempting connection.*
Driver(ConnectionError),
#[cfg(feature = "serenity")]
/// Serenity-specific WebSocket send error.
Expand All @@ -34,6 +50,31 @@ pub enum JoinError {
Twilight(CommandError),
}

#[cfg(feature = "gateway-core")]
impl JoinError {
/// Indicates whether this failure may have left (or been
/// caused by) Discord's gateway state being in an
/// inconsistent state.
///
/// Failure to `leave` before rejoining may cause further
/// timeouts.
pub fn should_leave_server(&self) -> bool {
matches!(self, JoinError::TimedOut)
}

#[cfg(feature = "driver-core")]
/// Indicates whether this failure can be reattempted via
/// [`Driver::connect`] with retreived connection info.
///
/// Failure to `leave` before rejoining may cause further
/// timeouts.
///
/// [`Driver::connect`]: crate::driver::Driver
pub fn should_reconnect_driver(&self) -> bool {
matches!(self, JoinError::Driver(_))
}
}

#[cfg(feature = "gateway-core")]
impl fmt::Display for JoinError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -42,6 +83,7 @@ impl fmt::Display for JoinError {
JoinError::Dropped => write!(f, "request was cancelled/dropped."),
JoinError::NoSender => write!(f, "no gateway destination."),
JoinError::NoCall => write!(f, "tried to leave a non-existent call."),
JoinError::TimedOut => write!(f, "gateway response from Discord timed out."),
#[cfg(feature = "driver-core")]
JoinError::Driver(t) => write!(f, "internal driver error {}.", t),
#[cfg(feature = "serenity")]
Expand Down
Loading

0 comments on commit d303e0a

Please sign in to comment.