From 86ea7e9d53a2a68f59731c1f71e4bf9783091ca5 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Sat, 28 Nov 2020 21:28:12 +0000 Subject: [PATCH] Cleanup of leaky types Main goal: a lot of nested future/result folding. This mainly modifies error handling for Tracks and TrackHandles to be more consistent, and hides the underlying channel result passing in get_info. Errors returned should be far clearer, and are domain specific rather than falling back to a very opaque use of the underlying channel error. It should be clearer to users why their handle commands failed, or why they can't make a ytdl track loop or similar. Also fixed/cleaned up Songbird::join(_gateway) to return in a single await, sparing the user from the underlying channel details and repeated Errs. I was trying for some time to extend the same graces to `Call`, but could not figure out a sane way to get a 'static version of the first future in the chain (i.e., the gateway send) so that the whole thing could happen after dropping the lock around the Call. I really wanted to fix this to happen as a single folded await too, but I think this might need some crazy hack or redesign. --- examples/twilight/src/main.rs | 9 ++- src/driver/connection/error.rs | 5 ++ src/driver/mod.rs | 39 +++++++++-- src/driver/tasks/mixer.rs | 2 +- src/error.rs | 20 +++++- src/handler.rs | 30 +++++++-- src/input/ffmpeg_src.rs | 6 ++ src/input/ytdl_src.rs | 16 ++++- src/lib.rs | 2 +- src/manager.rs | 36 ++++++---- src/tracks/error.rs | 39 +++++++++++ src/tracks/handle.rs | 82 ++++++++++++++--------- src/tracks/mod.rs | 116 ++++++++++++++------------------- src/tracks/queue.rs | 8 +-- 14 files changed, 276 insertions(+), 134 deletions(-) create mode 100644 src/tracks/error.rs diff --git a/examples/twilight/src/main.rs b/examples/twilight/src/main.rs index eb511d268..665868bcd 100644 --- a/examples/twilight/src/main.rs +++ b/examples/twilight/src/main.rs @@ -127,10 +127,9 @@ async fn join(msg: Message, state: State) -> Result<(), Box format!("Joined <#{}>!", channel_id), - Ok(Err(e)) => format!("Failed to join <#{}>! Why: {:?}", channel_id, e), - _ => format!("Failed to join <#{}>: Gateway error!", channel_id), + let content = match success { + Ok(()) => format!("Joined <#{}>!", channel_id), + Err(e) => format!("Failed to join <#{}>! Why: {:?}", channel_id, e), }; state @@ -237,7 +236,7 @@ async fn pause(msg: Message, state: State) -> Result<(), Box { diff --git a/src/driver/connection/error.rs b/src/driver/connection/error.rs index cb6f8c3e1..c3995e84a 100644 --- a/src/driver/connection/error.rs +++ b/src/driver/connection/error.rs @@ -11,7 +11,11 @@ use xsalsa20poly1305::aead::Error as CryptoError; /// Errors encountered while connecting to a Discord voice server over the driver. #[derive(Debug)] +#[non_exhaustive] pub enum Error { + /// The driver hung up an internal signaller, either due to another connection attempt + /// or a crash. + AttemptDiscarded, /// An error occurred during [en/de]cryption of voice packets or key generation. Crypto(CryptoError), /// Server did not return the expected crypto mode during negotiation. @@ -83,6 +87,7 @@ impl fmt::Display for Error { write!(f, "Failed to connect to Discord RTP server: ")?; use Error::*; match self { + AttemptDiscarded => write!(f, "connection attempt was aborted/discarded."), Crypto(c) => write!(f, "cryptography error {}.", c), CryptoModeInvalid => write!(f, "server changed negotiated encryption mode."), CryptoModeUnavailable => write!(f, "server did not offer chosen encryption mode."), diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 0f08ae22e..08a1a1dc5 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -15,7 +15,7 @@ mod decode_mode; pub(crate) mod tasks; pub use config::Config; -use connection::error::Result; +use connection::error::{Error, Result}; pub use crypto::*; pub use decode_mode::DecodeMode; @@ -30,7 +30,12 @@ use crate::{ EventHandler, }; use audiopus::Bitrate; -use flume::{Receiver, SendError, Sender}; +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use flume::{r#async::RecvFut, SendError, Sender}; use tasks::message::CoreMessage; use tracing::instrument; @@ -80,13 +85,18 @@ impl Driver { } /// Connects to a voice channel using the specified server. + /// + /// This method instantly contacts the driver tasks, and its + /// does not need to be `await`ed to start the actual connection. #[instrument(skip(self))] - pub fn connect(&mut self, info: ConnectionInfo) -> Receiver> { + pub fn connect(&mut self, info: ConnectionInfo) -> Connect { let (tx, rx) = flume::bounded(1); self.raw_connect(info, tx); - rx + Connect { + inner: rx.into_recv_async(), + } } /// Connects to a voice channel using the specified server. @@ -285,3 +295,24 @@ impl Drop for Driver { let _ = self.sender.send(CoreMessage::Poison); } } + +/// Future for a call to [`Driver::connect`]. +/// +/// This future awaits the *result* of a connection; the driver +/// is messaged at the time of the call. +/// +/// [`Driver::connect`]: Driver::connect +pub struct Connect { + inner: RecvFut<'static, Result<()>>, +} + +impl Future for Connect { + type Output = Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.inner).poll(cx) { + Poll::Ready(r) => Poll::Ready(r.map_err(|_| Error::AttemptDiscarded).and_then(|x| x)), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/driver/tasks/mixer.rs b/src/driver/tasks/mixer.rs index 2d27bffcf..ec8a121a8 100644 --- a/src/driver/tasks/mixer.rs +++ b/src/driver/tasks/mixer.rs @@ -322,7 +322,7 @@ impl Mixer { if temp_len > 0 || opus_len.is_some() { track.step_frame(); } else if track.do_loop() { - if let Some(time) = track.seek_time(Default::default()) { + if let Ok(time) = track.seek_time(Default::default()) { // have to reproduce self.fire_event here // to circumvent the borrow checker's lack of knowledge. // diff --git a/src/error.rs b/src/error.rs index 2e02735f7..c8d77f0fe 100644 --- a/src/error.rs +++ b/src/error.rs @@ -14,6 +14,8 @@ use twilight_gateway::shard::CommandError; /// Error returned when a manager or call handler is /// unable to send messages over Discord's gateway. pub enum JoinError { + /// Request to join was dropped, cancelled, or replaced. + Dropped, /// No available gateway connection was provided to send /// voice state update messages. NoSender, @@ -21,6 +23,9 @@ pub enum JoinError { /// /// [`Call`]: crate::Call NoCall, + #[cfg(feature = "driver")] + /// The driver failed to establish a voice connection. + Driver(ConnectionError), #[cfg(feature = "serenity")] /// Serenity-specific WebSocket send error. Serenity(TrySendError), @@ -34,8 +39,11 @@ impl fmt::Display for JoinError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Failed to Join Voice channel: ")?; match self { + JoinError::Dropped => write!(f, "request was cancelled/dropped."), JoinError::NoSender => write!(f, "no gateway destination."), JoinError::NoCall => write!(f, "tried to leave a non-existent call."), + #[cfg(feature = "driver")] + JoinError::Driver(t) => write!(f, "internal driver error {}.", t), #[cfg(feature = "serenity")] JoinError::Serenity(t) => write!(f, "serenity failure {}.", t), #[cfg(feature = "twilight")] @@ -61,9 +69,19 @@ impl From for JoinError { } } +#[cfg(all(feature = "driver", feature = "gateway"))] +impl From for JoinError { + fn from(e: ConnectionError) -> Self { + JoinError::Driver(e) + } +} + #[cfg(feature = "gateway")] /// Convenience type for Discord gateway error handling. pub type JoinResult = Result; #[cfg(feature = "driver")] -pub use crate::driver::connection::error::{Error as ConnectionError, Result as ConnectionResult}; +pub use crate::{ + driver::connection::error::{Error as ConnectionError, Result as ConnectionResult}, + tracks::{TrackError, TrackResult}, +}; diff --git a/src/handler.rs b/src/handler.rs index 641584899..e150272ee 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -9,7 +9,7 @@ use crate::{ info::{ConnectionInfo, ConnectionProgress}, shards::Shard, }; -use flume::{Receiver, Sender}; +use flume::{r#async::RecvFut, Sender}; use serde_json::json; use tracing::instrument; @@ -173,11 +173,21 @@ impl Call { #[cfg(feature = "driver")] /// Connect or switch to the given voice channel by its Id. + /// + /// This function acts as a future in two stages: + /// * The first `await` sends the request over the gateway. + /// * The second `await`s a the driver's connection attempt. + /// To prevent deadlock, any mutexes around this Call + /// *must* be released before this result is queried. + /// + /// When using [`Songbird::join`], this pattern is correctly handled for you. + /// + /// [`Songbird::join`]: crate::Songbird::join #[instrument(skip(self))] pub async fn join( &mut self, channel_id: ChannelId, - ) -> JoinResult>> { + ) -> JoinResult>> { let (tx, rx) = flume::unbounded(); self.connection = Some(( @@ -186,7 +196,7 @@ impl Call { Return::Conn(tx), )); - self.update().await.map(|_| rx) + self.update().await.map(|_| rx.into_recv_async()) } /// Join the selected voice channel, *without* running/starting an RTP @@ -194,11 +204,21 @@ impl Call { /// /// Use this if you require connection info for lavalink, /// some other voice implementation, or don't want to use the driver for a given call. + /// + /// This function acts as a future in two stages: + /// * The first `await` sends the request over the gateway. + /// * The second `await`s voice session data from Discord. + /// To prevent deadlock, any mutexes around this Call + /// *must* be released before this result is queried. + /// + /// When using [`Songbird::join_gateway`], this pattern is correctly handled for you. + /// + /// [`Songbird::join_gateway`]: crate::Songbird::join_gateway #[instrument(skip(self))] pub async fn join_gateway( &mut self, channel_id: ChannelId, - ) -> JoinResult> { + ) -> JoinResult> { let (tx, rx) = flume::unbounded(); self.connection = Some(( @@ -207,7 +227,7 @@ impl Call { Return::Info(tx), )); - self.update().await.map(|_| rx) + self.update().await.map(|_| rx.into_recv_async()) } /// Leaves the current voice channel, disconnecting from it. diff --git a/src/input/ffmpeg_src.rs b/src/input/ffmpeg_src.rs index 150996a22..d094fb8aa 100644 --- a/src/input/ffmpeg_src.rs +++ b/src/input/ffmpeg_src.rs @@ -15,6 +15,12 @@ use tokio::process::Command as TokioCommand; use tracing::debug; /// Opens an audio file through `ffmpeg` and creates an audio source. +/// +/// This source is not seek-compatible. +/// If you need looping or track seeking, then consider using +/// [`Restartable::ffmpeg`]. +/// +/// [`Restartable::ffmpeg`]: crate::input::restartable::Restartable::ffmpeg pub async fn ffmpeg>(path: P) -> Result { _ffmpeg(path.as_ref()).await } diff --git a/src/input/ytdl_src.rs b/src/input/ytdl_src.rs index f03e35662..19e1fcd4d 100644 --- a/src/input/ytdl_src.rs +++ b/src/input/ytdl_src.rs @@ -22,7 +22,13 @@ const YOUTUBE_DL_COMMAND: &str = if cfg!(feature = "youtube-dlc") { /// Creates a streamed audio source with `youtube-dl` and `ffmpeg`. /// -/// Uses `youtube-dlc` if the `youtube-dlc` feature is enabled. +/// This source is not seek-compatible. +/// If you need looping or track seeking, then consider using +/// [`Restartable::ytdl`]. +/// +/// Uses `youtube-dlc` if the `"youtube-dlc"` feature is enabled. +/// +/// [`Restartable::ytdl`]: crate::input::restartable::Restartable::ytdl pub async fn ytdl(uri: &str) -> Result { _ytdl(uri, &[]).await } @@ -110,6 +116,14 @@ pub(crate) async fn _ytdl(uri: &str, pre_args: &[&str]) -> Result { /// Creates a streamed audio source from YouTube search results with `youtube-dl(c)`,`ffmpeg`, and `ytsearch`. /// Takes the first video listed from the YouTube search. +/// +/// This source is not seek-compatible. +/// If you need looping or track seeking, then consider using +/// [`Restartable::ytdl_search`]. +/// +/// Uses `youtube-dlc` if the `"youtube-dlc"` feature is enabled. +/// +/// [`Restartable::ytdl_search`]: crate::input::restartable::Restartable::ytdl_search pub async fn ytdl_search(name: &str) -> Result { ytdl(&format!("ytsearch1:{}", name)).await } diff --git a/src/lib.rs b/src/lib.rs index 372c08a85..40cd306bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,7 +79,7 @@ pub use crate::{ }; #[cfg(feature = "gateway")] -pub use crate::{handler::Call, manager::Songbird}; +pub use crate::{handler::*, manager::*}; #[cfg(feature = "serenity")] pub use crate::serenity::*; diff --git a/src/manager.rs b/src/manager.rs index 959d52a48..ca78a9192 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -1,5 +1,5 @@ #[cfg(feature = "driver")] -use crate::{driver::Config, error::ConnectionResult}; +use crate::driver::Config; use crate::{ error::{JoinError, JoinResult}, id::{ChannelId, GuildId, UserId}, @@ -9,7 +9,6 @@ use crate::{ }; #[cfg(feature = "serenity")] use async_trait::async_trait; -use flume::Receiver; #[cfg(feature = "serenity")] use futures::channel::mpsc::UnboundedSender as Sender; use parking_lot::RwLock as PRwLock; @@ -114,7 +113,7 @@ impl Songbird { client_data.initialised = true; } - /// Retreives a [`Call`] for the given guild, if one already exists. + /// Retrieves a [`Call`] for the given guild, if one already exists. /// /// [`Call`]: Call pub fn get>(&self, guild_id: G) -> Option>> { @@ -122,7 +121,7 @@ impl Songbird { map_read.get(&guild_id.into()).cloned() } - /// Retreives a [`Call`] for the given guild, creating a new one if + /// Retrieves a [`Call`] for the given guild, creating a new one if /// none is found. /// /// This will not join any calls, or cause connection state to change. @@ -186,11 +185,7 @@ impl Songbird { /// [`Call`]: Call /// [`get`]: Songbird::get #[inline] - pub async fn join( - &self, - guild_id: G, - channel_id: C, - ) -> (Arc>, JoinResult>>) + pub async fn join(&self, guild_id: G, channel_id: C) -> (Arc>, JoinResult<()>) where C: Into, G: Into, @@ -203,14 +198,22 @@ impl Songbird { &self, guild_id: GuildId, channel_id: ChannelId, - ) -> (Arc>, JoinResult>>) { + ) -> (Arc>, JoinResult<()>) { let call = self.get_or_insert(guild_id); - let result = { + let stage_1 = { let mut handler = call.lock().await; handler.join(channel_id).await }; + let result = match stage_1 { + Ok(chan) => chan + .await + .map_err(|_| JoinError::Dropped) + .and_then(|x| x.map_err(JoinError::from)), + Err(e) => Err(e), + }; + (call, result) } @@ -226,7 +229,7 @@ impl Songbird { &self, guild_id: G, channel_id: C, - ) -> (Arc>, JoinResult>) + ) -> (Arc>, JoinResult) where C: Into, G: Into, @@ -238,14 +241,19 @@ impl Songbird { &self, guild_id: GuildId, channel_id: ChannelId, - ) -> (Arc>, JoinResult>) { + ) -> (Arc>, JoinResult) { let call = self.get_or_insert(guild_id); - let result = { + let stage_1 = { let mut handler = call.lock().await; handler.join_gateway(channel_id).await }; + let result = match stage_1 { + Ok(chan) => chan.await.map_err(|_| JoinError::Dropped), + Err(e) => Err(e), + }; + (call, result) } diff --git a/src/tracks/error.rs b/src/tracks/error.rs new file mode 100644 index 000000000..e66863030 --- /dev/null +++ b/src/tracks/error.rs @@ -0,0 +1,39 @@ +use std::{error::Error, fmt}; + +/// Errors associated with control and manipulation of tracks. +/// +/// Unless otherwise stated, these don't invalidate an existing track, +/// but do advise on valid operations and commands. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[non_exhaustive] +pub enum TrackError { + /// The operation failed because the track has ended, has been removed + /// due to call closure, or some error within the driver. + Finished, + /// The supplied event listener can never be fired by a track, and should + /// be attached to the driver instead. + InvalidTrackEvent, + /// The track's underlying [`Input`] doesn't support seeking operations. + /// + /// [`Input`]: crate::input::Input + SeekUnsupported, +} + +impl fmt::Display for TrackError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Failed to operate on track (handle): ")?; + match self { + TrackError::Finished => write!(f, "track ended."), + TrackError::InvalidTrackEvent => + write!(f, "given event listener can't be fired on a track."), + TrackError::SeekUnsupported => write!(f, "track did not support seeking."), + } + } +} + +impl Error for TrackError {} + +/// Alias for most calls to a [`TrackHandle`]. +/// +/// [`TrackHandle`]: super::TrackHandle +pub type TrackResult = Result; diff --git a/src/tracks/handle.rs b/src/tracks/handle.rs index da3233906..489eb5777 100644 --- a/src/tracks/handle.rs +++ b/src/tracks/handle.rs @@ -1,10 +1,7 @@ use super::*; use crate::events::{Event, EventData, EventHandler}; use std::time::Duration; -use tokio::sync::{ - mpsc::{error::SendError, UnboundedSender}, - oneshot, -}; +use tokio::sync::{mpsc::UnboundedSender, oneshot}; use uuid::Uuid; #[derive(Clone, Debug)] @@ -36,12 +33,12 @@ impl TrackHandle { } /// Unpauses an audio track. - pub fn play(&self) -> TrackResult { + pub fn play(&self) -> TrackResult<()> { self.send(TrackCommand::Play) } /// Pauses an audio track. - pub fn pause(&self) -> TrackResult { + pub fn pause(&self) -> TrackResult<()> { self.send(TrackCommand::Pause) } @@ -51,12 +48,12 @@ impl TrackHandle { /// a [`TrackEvent::End`] event. /// /// [`TrackEvent::End`]: crate::events::TrackEvent::End - pub fn stop(&self) -> TrackResult { + pub fn stop(&self) -> TrackResult<()> { self.send(TrackCommand::Stop) } /// Sets the volume of an audio track. - pub fn set_volume(&self, volume: f32) -> TrackResult { + pub fn set_volume(&self, volume: f32) -> TrackResult<()> { self.send(TrackCommand::Volume(volume)) } @@ -73,43 +70,43 @@ impl TrackHandle { /// Seeks along the track to the specified position. /// - /// If the underlying [`Input`] does not support this behaviour, - /// then all calls will fail. + /// If the underlying [`Input`] does not support seeking, + /// then all calls will fail with [`TrackError::SeekUnsupported`]. /// /// [`Input`]: crate::input::Input - pub fn seek_time(&self, position: Duration) -> TrackResult { + /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported + pub fn seek_time(&self, position: Duration) -> TrackResult<()> { if self.seekable { self.send(TrackCommand::Seek(position)) } else { - Err(SendError(TrackCommand::Seek(position))) + Err(TrackError::SeekUnsupported) } } /// Attach an event handler to an audio track. These will receive [`EventContext::Track`]. /// - /// Users **must** ensure that no costly work or blocking occurs - /// within the supplied function or closure. *Taking excess time could prevent - /// timely sending of packets, causing audio glitches and delays*. + /// Events which can only be fired by the global context return [`TrackError::InvalidTrackEvent`] /// /// [`Track`]: Track /// [`EventContext::Track`]: crate::events::EventContext::Track - pub fn add_event(&self, event: Event, action: F) -> TrackResult { + /// [`TrackError::InvalidTrackEvent`]: TrackError::InvalidTrackEvent + pub fn add_event(&self, event: Event, action: F) -> TrackResult<()> { let cmd = TrackCommand::AddEvent(EventData::new(event, action)); if event.is_global_only() { - Err(SendError(cmd)) + Err(TrackError::InvalidTrackEvent) } else { self.send(cmd) } } - /// Perform an arbitrary action on a raw [`Track`] object. + /// Perform an arbitrary synchronous action on a raw [`Track`] object. /// /// Users **must** ensure that no costly work or blocking occurs /// within the supplied function or closure. *Taking excess time could prevent /// timely sending of packets, causing audio glitches and delays*. /// /// [`Track`]: Track - pub fn action(&self, action: F) -> TrackResult + pub fn action(&self, action: F) -> TrackResult<()> where F: FnOnce(&mut Track) + Send + Sync + 'static, { @@ -117,38 +114,55 @@ impl TrackHandle { } /// Request playback information and state from the audio context. - /// - /// Crucially, the audio thread will respond *at a later time*: - /// It is up to the user when or how this should be read from the returned channel. - pub fn get_info(&self) -> TrackQueryResult { + pub async fn get_info(&self) -> TrackResult> { let (tx, rx) = oneshot::channel(); - self.send(TrackCommand::Request(tx)).map(move |_| rx) + self.send(TrackCommand::Request(tx))?; + + rx.await.map_err(|_| TrackError::Finished) } /// Set an audio track to loop indefinitely. - pub fn enable_loop(&self) -> TrackResult { + /// + /// If the underlying [`Input`] does not support seeking, + /// then all calls will fail with [`TrackError::SeekUnsupported`]. + /// + /// [`Input`]: crate::input::Input + /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported + pub fn enable_loop(&self) -> TrackResult<()> { if self.seekable { self.send(TrackCommand::Loop(LoopState::Infinite)) } else { - Err(SendError(TrackCommand::Loop(LoopState::Infinite))) + Err(TrackError::SeekUnsupported) } } /// Set an audio track to no longer loop. - pub fn disable_loop(&self) -> TrackResult { + /// + /// If the underlying [`Input`] does not support seeking, + /// then all calls will fail with [`TrackError::SeekUnsupported`]. + /// + /// [`Input`]: crate::input::Input + /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported + pub fn disable_loop(&self) -> TrackResult<()> { if self.seekable { self.send(TrackCommand::Loop(LoopState::Finite(0))) } else { - Err(SendError(TrackCommand::Loop(LoopState::Finite(0)))) + Err(TrackError::SeekUnsupported) } } /// Set an audio track to loop a set number of times. - pub fn loop_for(&self, count: usize) -> TrackResult { + /// + /// If the underlying [`Input`] does not support seeking, + /// then all calls will fail with [`TrackError::SeekUnsupported`]. + /// + /// [`Input`]: crate::input::Input + /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported + pub fn loop_for(&self, count: usize) -> TrackResult<()> { if self.seekable { self.send(TrackCommand::Loop(LoopState::Finite(count))) } else { - Err(SendError(TrackCommand::Loop(LoopState::Finite(count)))) + Err(TrackError::SeekUnsupported) } } @@ -161,7 +175,11 @@ impl TrackHandle { /// Send a raw command to the [`Track`] object. /// /// [`Track`]: Track - pub fn send(&self, cmd: TrackCommand) -> TrackResult { - self.command_channel.send(cmd) + pub fn send(&self, cmd: TrackCommand) -> TrackResult<()> { + // As the send channels are unbounded, we can be reasonably certain + // that send failure == cancellation. + self.command_channel + .send(cmd) + .map_err(|_e| TrackError::Finished) } } diff --git a/src/tracks/mod.rs b/src/tracks/mod.rs index 51a37e965..2f9f99f8c 100644 --- a/src/tracks/mod.rs +++ b/src/tracks/mod.rs @@ -15,24 +15,18 @@ //! [`create_player`]: fn.create_player.html mod command; +mod error; mod handle; mod looping; mod mode; mod queue; mod state; -pub use self::{command::*, handle::*, looping::*, mode::*, queue::*, state::*}; +pub use self::{command::*, error::*, handle::*, looping::*, mode::*, queue::*, state::*}; use crate::{constants::*, driver::tasks::message::*, events::EventStore, input::Input}; use std::time::Duration; -use tokio::sync::{ - mpsc::{ - self, - error::{SendError, TryRecvError}, - UnboundedReceiver, - }, - oneshot::Receiver as OneshotReceiver, -}; +use tokio::sync::mpsc::{self, error::TryRecvError, UnboundedReceiver}; use uuid::Uuid; /// Control object for audio playback. @@ -63,18 +57,18 @@ use uuid::Uuid; /// # }; /// ``` /// -/// [`Driver::play_only`]: ../struct.Driver.html#method.play_only -/// [`Driver::play`]: ../struct.Driver.html#method.play -/// [`TrackHandle`]: struct.TrackHandle.html -/// [`create_player`]: fn.create_player.html +/// [`Driver::play_only`]: crate::driver::Driver::play_only +/// [`Driver::play`]: crate::driver::Driver::play +/// [`TrackHandle`]: TrackHandle +/// [`create_player`]: create_player #[derive(Debug)] pub struct Track { /// Whether or not this sound is currently playing. /// /// Can be controlled with [`play`] or [`pause`] if chaining is desired. /// - /// [`play`]: #method.play - /// [`pause`]: #method.pause + /// [`play`]: Track::play + /// [`pause`]: Track::pause pub(crate) playing: PlayMode, /// The desired volume for playback. @@ -83,7 +77,7 @@ pub struct Track { /// /// Can be controlled with [`volume`] if chaining is desired. /// - /// [`volume`]: #method.volume + /// [`volume`]: Track::volume pub(crate) volume: f32, /// Underlying data access object. @@ -187,7 +181,7 @@ impl Track { /// Sets [`volume`] in a manner that allows method chaining. /// - /// [`volume`]: #structfield.volume + /// [`volume`]: Track::volume pub fn set_volume(&mut self, volume: f32) -> &mut Self { self.volume = volume; @@ -209,12 +203,20 @@ impl Track { self.play_time } - /// Sets [`loops`] in a manner that allows method chaining. + /// Set an audio track to loop a set number of times. /// - /// [`loops`]: #structfield.loops - pub fn set_loops(&mut self, loops: LoopState) -> &mut Self { - self.loops = loops; - self + /// If the underlying [`Input`] does not support seeking, + /// then all calls will fail with [`TrackError::SeekUnsupported`]. + /// + /// [`Input`]: crate::input::Input + /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported + pub fn set_loops(&mut self, loops: LoopState) -> TrackResult<()> { + if self.source.is_seekable() { + self.loops = loops; + Ok(()) + } else { + Err(TrackError::SeekUnsupported) + } } pub(crate) fn do_loop(&mut self) -> bool { @@ -234,11 +236,9 @@ impl Track { self.play_time += TIMESTEP_LENGTH; } - /// Receives and acts upon any commands forwarded by [`TrackHandle`]s. + /// Receives and acts upon any commands forwarded by TrackHandles. /// /// *Used internally*, this should not be exposed to users. - /// - /// [`TrackHandle`]: struct.TrackHandle.html pub(crate) fn process_commands(&mut self, index: usize, ic: &Interconnect) { // Note: disconnection and an empty channel are both valid, // and should allow the audio object to keep running as intended. @@ -280,13 +280,13 @@ impl Track { TrackStateChange::Volume(self.volume), )); }, - Seek(time) => { - self.seek_time(time); - let _ = ic.events.send(EventMessage::ChangeState( - index, - TrackStateChange::Position(self.position), - )); - }, + Seek(time) => + if let Ok(new_time) = self.seek_time(time) { + let _ = ic.events.send(EventMessage::ChangeState( + index, + TrackStateChange::Position(new_time), + )); + }, AddEvent(evt) => { let _ = ic.events.send(EventMessage::AddTrackEvent(index, evt)); }, @@ -300,13 +300,13 @@ impl Track { Request(tx) => { let _ = tx.send(Box::new(self.state())); }, - Loop(loops) => { - self.set_loops(loops); - let _ = ic.events.send(EventMessage::ChangeState( - index, - TrackStateChange::Loops(self.loops, true), - )); - }, + Loop(loops) => + if self.set_loops(loops).is_ok() { + let _ = ic.events.send(EventMessage::ChangeState( + index, + TrackStateChange::Loops(self.loops, true), + )); + }, } }, Err(TryRecvError::Closed) => { @@ -325,7 +325,7 @@ impl Track { /// The primary use-case of this is sending information across /// threads in response to a [`TrackHandle`]. /// - /// [`TrackHandle`]: struct.TrackHandle.html + /// [`TrackHandle`]: TrackHandle pub fn state(&self) -> TrackState { TrackState { playing: self.playing, @@ -338,15 +338,18 @@ impl Track { /// Seek to a specific point in the track. /// - /// Returns `None` if unsupported. - pub fn seek_time(&mut self, pos: Duration) -> Option { - let out = self.source.seek_time(pos); - - if let Some(t) = out { + /// If the underlying [`Input`] does not support seeking, + /// then all calls will fail with [`TrackError::SeekUnsupported`]. + /// + /// [`Input`]: crate::input::Input + /// [`TrackError::SeekUnsupported`]: TrackError::SeekUnsupported + pub fn seek_time(&mut self, pos: Duration) -> TrackResult { + if let Some(t) = self.source.seek_time(pos) { self.position = t; + Ok(t) + } else { + Err(TrackError::SeekUnsupported) } - - out } /// Returns this track's unique identifier. @@ -372,22 +375,3 @@ pub fn create_player(source: Input) -> (Track, TrackHandle) { (player, handle) } - -/// Alias for most result-free calls to a [`TrackHandle`]. -/// -/// Failure indicates that the accessed audio object has been -/// removed or deleted by the audio context. -/// -/// [`TrackHandle`]: TrackHandle -pub type TrackResult = Result<(), SendError>; - -/// Alias for return value from calls to [`TrackHandle::get_info`]. -/// -/// Crucially, the audio thread will respond *at a later time*: -/// It is up to the user when or how this should be read from the returned channel. -/// -/// Failure indicates that the accessed audio object has been -/// removed or deleted by the audio context. -/// -/// [`TrackHandle::get_info`]: TrackHandle::get_info -pub type TrackQueryResult = Result>, SendError>; diff --git a/src/tracks/queue.rs b/src/tracks/queue.rs index 62db4a6ed..e62cd0da1 100644 --- a/src/tracks/queue.rs +++ b/src/tracks/queue.rs @@ -240,7 +240,7 @@ impl TrackQueue { } /// Pause the track at the head of the queue. - pub fn pause(&self) -> TrackResult { + pub fn pause(&self) -> TrackResult<()> { let inner = self.inner.lock(); if let Some(handle) = inner.tracks.front() { @@ -251,7 +251,7 @@ impl TrackQueue { } /// Resume the track at the head of the queue. - pub fn resume(&self) -> TrackResult { + pub fn resume(&self) -> TrackResult<()> { let inner = self.inner.lock(); if let Some(handle) = inner.tracks.front() { @@ -273,7 +273,7 @@ impl TrackQueue { } /// Skip to the next track in the queue, if it exists. - pub fn skip(&self) -> TrackResult { + pub fn skip(&self) -> TrackResult<()> { let inner = self.inner.lock(); inner.stop_current() @@ -295,7 +295,7 @@ impl TrackQueue { impl TrackQueueCore { /// Skip to the next track in the queue, if it exists. - fn stop_current(&self) -> TrackResult { + fn stop_current(&self) -> TrackResult<()> { if let Some(handle) = self.tracks.front() { handle.stop() } else {