Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup of leaky types, nested results #20

Merged
merged 2 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions examples/twilight/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,9 @@ async fn join(msg: Message, state: State) -> Result<(), Box<dyn Error + Send + S

let (_handle, success) = state.songbird.join(guild_id, channel_id).await;

let content = match success?.recv_async().await {
Ok(Ok(())) => format!("Joined <#{}>!", channel_id),
Ok(Err(e)) => format!("Failed to join <#{}>! Why: {:?}", channel_id, e),
_ => format!("Failed to join <#{}>: Gateway error!", channel_id),
let content = match success {
Ok(()) => format!("Joined <#{}>!", channel_id),
Err(e) => format!("Failed to join <#{}>! Why: {:?}", channel_id, e),
};

state
Expand Down Expand Up @@ -237,7 +236,7 @@ async fn pause(msg: Message, state: State) -> Result<(), Box<dyn Error + Send +
let store = state.trackdata.read().await;

let content = if let Some(handle) = store.get(&guild_id) {
let info = handle.get_info()?.await?;
let info = handle.get_info().await?;

let paused = match info.playing {
PlayMode::Play => {
Expand Down
5 changes: 5 additions & 0 deletions src/driver/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ use xsalsa20poly1305::aead::Error as CryptoError;

/// Errors encountered while connecting to a Discord voice server over the driver.
#[derive(Debug)]
#[non_exhaustive]
pub enum Error {
/// The driver hung up an internal signaller, either due to another connection attempt
/// or a crash.
AttemptDiscarded,
/// An error occurred during [en/de]cryption of voice packets or key generation.
Crypto(CryptoError),
/// Server did not return the expected crypto mode during negotiation.
Expand Down Expand Up @@ -83,6 +87,7 @@ impl fmt::Display for Error {
write!(f, "Failed to connect to Discord RTP server: ")?;
use Error::*;
match self {
AttemptDiscarded => write!(f, "connection attempt was aborted/discarded."),
Crypto(c) => write!(f, "cryptography error {}.", c),
CryptoModeInvalid => write!(f, "server changed negotiated encryption mode."),
CryptoModeUnavailable => write!(f, "server did not offer chosen encryption mode."),
Expand Down
39 changes: 35 additions & 4 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod decode_mode;
pub(crate) mod tasks;

pub use config::Config;
use connection::error::Result;
use connection::error::{Error, Result};
pub use crypto::*;
pub use decode_mode::DecodeMode;

Expand All @@ -30,7 +30,12 @@ use crate::{
EventHandler,
};
use audiopus::Bitrate;
use flume::{Receiver, SendError, Sender};
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use flume::{r#async::RecvFut, SendError, Sender};
use tasks::message::CoreMessage;
use tracing::instrument;

Expand Down Expand Up @@ -80,13 +85,18 @@ impl Driver {
}

/// Connects to a voice channel using the specified server.
///
/// This method instantly contacts the driver tasks, and its
/// does not need to be `await`ed to start the actual connection.
#[instrument(skip(self))]
pub fn connect(&mut self, info: ConnectionInfo) -> Receiver<Result<()>> {
pub fn connect(&mut self, info: ConnectionInfo) -> Connect {
let (tx, rx) = flume::bounded(1);

self.raw_connect(info, tx);

rx
Connect {
inner: rx.into_recv_async(),
}
}

/// Connects to a voice channel using the specified server.
Expand Down Expand Up @@ -285,3 +295,24 @@ impl Drop for Driver {
let _ = self.sender.send(CoreMessage::Poison);
}
}

/// Future for a call to [`Driver::connect`].
///
/// This future awaits the *result* of a connection; the driver
/// is messaged at the time of the call.
///
/// [`Driver::connect`]: Driver::connect
pub struct Connect {
inner: RecvFut<'static, Result<()>>,
}

impl Future for Connect {
type Output = Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.inner).poll(cx) {
Poll::Ready(r) => Poll::Ready(r.map_err(|_| Error::AttemptDiscarded).and_then(|x| x)),
Poll::Pending => Poll::Pending,
}
}
}
2 changes: 1 addition & 1 deletion src/driver/tasks/mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl Mixer {
if temp_len > 0 || opus_len.is_some() {
track.step_frame();
} else if track.do_loop() {
if let Some(time) = track.seek_time(Default::default()) {
if let Ok(time) = track.seek_time(Default::default()) {
// have to reproduce self.fire_event here
// to circumvent the borrow checker's lack of knowledge.
//
Expand Down
20 changes: 19 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ use twilight_gateway::shard::CommandError;
/// Error returned when a manager or call handler is
/// unable to send messages over Discord's gateway.
pub enum JoinError {
/// Request to join was dropped, cancelled, or replaced.
Dropped,
/// No available gateway connection was provided to send
/// voice state update messages.
NoSender,
/// Tried to leave a [`Call`] which was not found.
///
/// [`Call`]: crate::Call
NoCall,
#[cfg(feature = "driver")]
/// The driver failed to establish a voice connection.
Driver(ConnectionError),
#[cfg(feature = "serenity")]
/// Serenity-specific WebSocket send error.
Serenity(TrySendError<InterMessage>),
Expand All @@ -34,8 +39,11 @@ impl fmt::Display for JoinError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Failed to Join Voice channel: ")?;
match self {
JoinError::Dropped => write!(f, "request was cancelled/dropped."),
JoinError::NoSender => write!(f, "no gateway destination."),
JoinError::NoCall => write!(f, "tried to leave a non-existent call."),
#[cfg(feature = "driver")]
JoinError::Driver(t) => write!(f, "internal driver error {}.", t),
#[cfg(feature = "serenity")]
JoinError::Serenity(t) => write!(f, "serenity failure {}.", t),
#[cfg(feature = "twilight")]
Expand All @@ -61,9 +69,19 @@ impl From<CommandError> for JoinError {
}
}

#[cfg(all(feature = "driver", feature = "gateway"))]
impl From<ConnectionError> for JoinError {
fn from(e: ConnectionError) -> Self {
JoinError::Driver(e)
}
}

#[cfg(feature = "gateway")]
/// Convenience type for Discord gateway error handling.
pub type JoinResult<T> = Result<T, JoinError>;

#[cfg(feature = "driver")]
pub use crate::driver::connection::error::{Error as ConnectionError, Result as ConnectionResult};
pub use crate::{
driver::connection::error::{Error as ConnectionError, Result as ConnectionResult},
tracks::{TrackError, TrackResult},
};
30 changes: 25 additions & 5 deletions src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
info::{ConnectionInfo, ConnectionProgress},
shards::Shard,
};
use flume::{Receiver, Sender};
use flume::{r#async::RecvFut, Sender};
use serde_json::json;
use tracing::instrument;

Expand Down Expand Up @@ -173,11 +173,21 @@ impl Call {

#[cfg(feature = "driver")]
/// Connect or switch to the given voice channel by its Id.
///
/// This function acts as a future in two stages:
/// * The first `await` sends the request over the gateway.
/// * The second `await`s a the driver's connection attempt.
/// To prevent deadlock, any mutexes around this Call
/// *must* be released before this result is queried.
///
/// When using [`Songbird::join`], this pattern is correctly handled for you.
///
/// [`Songbird::join`]: crate::Songbird::join
#[instrument(skip(self))]
pub async fn join(
&mut self,
channel_id: ChannelId,
) -> JoinResult<Receiver<ConnectionResult<()>>> {
) -> JoinResult<RecvFut<'static, ConnectionResult<()>>> {
let (tx, rx) = flume::unbounded();

self.connection = Some((
Expand All @@ -186,19 +196,29 @@ impl Call {
Return::Conn(tx),
));

self.update().await.map(|_| rx)
self.update().await.map(|_| rx.into_recv_async())
}

/// Join the selected voice channel, *without* running/starting an RTP
/// session or running the driver.
///
/// Use this if you require connection info for lavalink,
/// some other voice implementation, or don't want to use the driver for a given call.
///
/// This function acts as a future in two stages:
/// * The first `await` sends the request over the gateway.
/// * The second `await`s voice session data from Discord.
/// To prevent deadlock, any mutexes around this Call
/// *must* be released before this result is queried.
///
/// When using [`Songbird::join_gateway`], this pattern is correctly handled for you.
///
/// [`Songbird::join_gateway`]: crate::Songbird::join_gateway
#[instrument(skip(self))]
pub async fn join_gateway(
&mut self,
channel_id: ChannelId,
) -> JoinResult<Receiver<ConnectionInfo>> {
) -> JoinResult<RecvFut<'static, ConnectionInfo>> {
let (tx, rx) = flume::unbounded();

self.connection = Some((
Expand All @@ -207,7 +227,7 @@ impl Call {
Return::Info(tx),
));

self.update().await.map(|_| rx)
self.update().await.map(|_| rx.into_recv_async())
}

/// Leaves the current voice channel, disconnecting from it.
Expand Down
6 changes: 6 additions & 0 deletions src/input/ffmpeg_src.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ use tokio::process::Command as TokioCommand;
use tracing::debug;

/// Opens an audio file through `ffmpeg` and creates an audio source.
///
/// This source is not seek-compatible.
/// If you need looping or track seeking, then consider using
/// [`Restartable::ffmpeg`].
///
/// [`Restartable::ffmpeg`]: crate::input::restartable::Restartable::ffmpeg
pub async fn ffmpeg<P: AsRef<OsStr>>(path: P) -> Result<Input> {
_ffmpeg(path.as_ref()).await
}
Expand Down
16 changes: 15 additions & 1 deletion src/input/ytdl_src.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ const YOUTUBE_DL_COMMAND: &str = if cfg!(feature = "youtube-dlc") {

/// Creates a streamed audio source with `youtube-dl` and `ffmpeg`.
///
/// Uses `youtube-dlc` if the `youtube-dlc` feature is enabled.
/// This source is not seek-compatible.
/// If you need looping or track seeking, then consider using
/// [`Restartable::ytdl`].
///
/// Uses `youtube-dlc` if the `"youtube-dlc"` feature is enabled.
///
/// [`Restartable::ytdl`]: crate::input::restartable::Restartable::ytdl
pub async fn ytdl(uri: &str) -> Result<Input> {
_ytdl(uri, &[]).await
}
Expand Down Expand Up @@ -110,6 +116,14 @@ pub(crate) async fn _ytdl(uri: &str, pre_args: &[&str]) -> Result<Input> {

/// Creates a streamed audio source from YouTube search results with `youtube-dl(c)`,`ffmpeg`, and `ytsearch`.
/// Takes the first video listed from the YouTube search.
///
/// This source is not seek-compatible.
/// If you need looping or track seeking, then consider using
/// [`Restartable::ytdl_search`].
///
/// Uses `youtube-dlc` if the `"youtube-dlc"` feature is enabled.
///
/// [`Restartable::ytdl_search`]: crate::input::restartable::Restartable::ytdl_search
pub async fn ytdl_search(name: &str) -> Result<Input> {
ytdl(&format!("ytsearch1:{}", name)).await
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub use crate::{
};

#[cfg(feature = "gateway")]
pub use crate::{handler::Call, manager::Songbird};
pub use crate::{handler::*, manager::*};

#[cfg(feature = "serenity")]
pub use crate::serenity::*;
Expand Down
36 changes: 22 additions & 14 deletions src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[cfg(feature = "driver")]
use crate::{driver::Config, error::ConnectionResult};
use crate::driver::Config;
use crate::{
error::{JoinError, JoinResult},
id::{ChannelId, GuildId, UserId},
Expand All @@ -9,7 +9,6 @@ use crate::{
};
#[cfg(feature = "serenity")]
use async_trait::async_trait;
use flume::Receiver;
#[cfg(feature = "serenity")]
use futures::channel::mpsc::UnboundedSender as Sender;
use parking_lot::RwLock as PRwLock;
Expand Down Expand Up @@ -114,15 +113,15 @@ impl Songbird {
client_data.initialised = true;
}

/// Retreives a [`Call`] for the given guild, if one already exists.
/// Retrieves a [`Call`] for the given guild, if one already exists.
///
/// [`Call`]: Call
pub fn get<G: Into<GuildId>>(&self, guild_id: G) -> Option<Arc<Mutex<Call>>> {
let map_read = self.calls.read();
map_read.get(&guild_id.into()).cloned()
}

/// Retreives a [`Call`] for the given guild, creating a new one if
/// Retrieves a [`Call`] for the given guild, creating a new one if
/// none is found.
///
/// This will not join any calls, or cause connection state to change.
Expand Down Expand Up @@ -186,11 +185,7 @@ impl Songbird {
/// [`Call`]: Call
/// [`get`]: Songbird::get
#[inline]
pub async fn join<C, G>(
&self,
guild_id: G,
channel_id: C,
) -> (Arc<Mutex<Call>>, JoinResult<Receiver<ConnectionResult<()>>>)
pub async fn join<C, G>(&self, guild_id: G, channel_id: C) -> (Arc<Mutex<Call>>, JoinResult<()>)
where
C: Into<ChannelId>,
G: Into<GuildId>,
Expand All @@ -203,14 +198,22 @@ impl Songbird {
&self,
guild_id: GuildId,
channel_id: ChannelId,
) -> (Arc<Mutex<Call>>, JoinResult<Receiver<ConnectionResult<()>>>) {
) -> (Arc<Mutex<Call>>, JoinResult<()>) {
let call = self.get_or_insert(guild_id);

let result = {
let stage_1 = {
let mut handler = call.lock().await;
handler.join(channel_id).await
};

let result = match stage_1 {
Ok(chan) => chan
.await
.map_err(|_| JoinError::Dropped)
.and_then(|x| x.map_err(JoinError::from)),
Err(e) => Err(e),
};

(call, result)
}

Expand All @@ -226,7 +229,7 @@ impl Songbird {
&self,
guild_id: G,
channel_id: C,
) -> (Arc<Mutex<Call>>, JoinResult<Receiver<ConnectionInfo>>)
) -> (Arc<Mutex<Call>>, JoinResult<ConnectionInfo>)
where
C: Into<ChannelId>,
G: Into<GuildId>,
Expand All @@ -238,14 +241,19 @@ impl Songbird {
&self,
guild_id: GuildId,
channel_id: ChannelId,
) -> (Arc<Mutex<Call>>, JoinResult<Receiver<ConnectionInfo>>) {
) -> (Arc<Mutex<Call>>, JoinResult<ConnectionInfo>) {
let call = self.get_or_insert(guild_id);

let result = {
let stage_1 = {
let mut handler = call.lock().await;
handler.join_gateway(channel_id).await
};

let result = match stage_1 {
Ok(chan) => chan.await.map_err(|_| JoinError::Dropped),
Err(e) => Err(e),
};

(call, result)
}

Expand Down
Loading