Skip to content

Commit

Permalink
Songbird: Tokio 1.0 (serenity-rs#36)
Browse files Browse the repository at this point in the history
Migrates to the new version of tokio, requiring channel and sleep changes in a few locations. Additionally points to the in-tree v0.3 version of twilight.
  • Loading branch information
FelixMcFelix authored Jan 6, 2021
1 parent d42e09f commit f05b741
Show file tree
Hide file tree
Showing 14 changed files with 57 additions and 49 deletions.
17 changes: 11 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ version = "0.1"
default-features = false
features = ["tokio-runtime"]
optional = true
version = "0.9"
version = "0.11"

[dependencies.audiopus]
optional = true
Expand Down Expand Up @@ -58,13 +58,15 @@ version = "0.8"

[dependencies.serenity]
optional = true
#version = "0.10"
default-features = false
features = ["voice", "gateway"]
git = "https://github.com/serenity-rs/serenity"
branch = "current"

[dependencies.serenity-voice-model]
optional = true
#version = "0.10"
git = "https://github.com/serenity-rs/serenity"
branch = "current"

Expand All @@ -78,18 +80,22 @@ version = "0.1"

[dependencies.tokio]
optional = true
version = "0.2"
version = "1.0"
default-features = false

[dependencies.twilight-gateway]
optional = true
version = "0.2"
#version = "0.3"
default-features = false
git = "https://github.com/twilight-rs/twilight"
branch = "v0.3"

[dependencies.twilight-model]
optional = true
version = "0.2"
#version = "0.3"
default-features = false
git = "https://github.com/twilight-rs/twilight"
branch = "v0.3"

[dependencies.typemap_rev]
optional = true
Expand Down Expand Up @@ -135,13 +141,12 @@ driver = [
"serenity-voice-model",
"spin_sleep",
"streamcatcher",
"tokio/blocking",
"tokio/fs",
"tokio/io-util",
"tokio/macros",
"tokio/net",
"tokio/process",
"tokio/rt-core",
"tokio/rt",
"tokio/sync",
"tokio/time",
"typemap_rev",
Expand Down
4 changes: 2 additions & 2 deletions examples/serenity/voice/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ git = "https://github.com/serenity-rs/serenity"
branch = "current"

[dependencies.tokio]
version = "0.2"
features = ["macros"]
version = "1.0"
features = ["macros", "rt-multi-thread"]
4 changes: 2 additions & 2 deletions examples/serenity/voice_events_queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ git = "https://github.com/serenity-rs/serenity"
branch = "current"

[dependencies.tokio]
version = "0.2"
features = ["macros"]
version = "1.0"
features = ["macros", "rt-multi-thread"]
9 changes: 5 additions & 4 deletions examples/serenity/voice_receive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ authors = ["my name <my@email.address>"]
edition = "2018"

[dependencies]
env_logger = "~0.6"
log = "~0.4"
tracing = "0.1"
tracing-subscriber = "0.2"
tracing-futures = "0.2"

[dependencies.songbird]
path = "../../../"
Expand All @@ -17,5 +18,5 @@ git = "https://github.com/serenity-rs/serenity"
branch = "current"

[dependencies.tokio]
version = "0.2"
features = ["macros"]
version = "1.0"
features = ["macros", "rt-multi-thread"]
2 changes: 2 additions & 0 deletions examples/serenity/voice_receive/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ struct General;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();

// Configure the client with your Discord bot token in the environment.
let token = env::var("DISCORD_TOKEN")
.expect("Expected a token in the environment");
Expand Down
4 changes: 2 additions & 2 deletions examples/serenity/voice_storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ git = "https://github.com/serenity-rs/serenity"
branch = "current"

[dependencies.tokio]
version = "0.2"
features = ["macros"]
version = "1.0"
features = ["macros", "rt-multi-thread"]
10 changes: 5 additions & 5 deletions examples/twilight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ futures = "0.3"
tracing = "0.1"
tracing-subscriber = "0.2"
serde_json = { version = "1" }
tokio = { features = ["macros", "rt-threaded", "sync"], version = "0.2" }
twilight-gateway = "0.2"
twilight-http = "0.2"
twilight-model = "0.2"
twilight-standby = "0.2"
tokio = { features = ["macros", "rt-multi-thread", "sync"], version = "1" }
twilight-gateway = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" }
twilight-http = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" }
twilight-model = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" }
twilight-standby = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" }

[dependencies.songbird]
path = "../.."
Expand Down
8 changes: 5 additions & 3 deletions src/driver/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
use discortp::discord::{IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket};
use error::{Error, Result};
use flume::Sender;
use std::{net::IpAddr, str::FromStr};
use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::net::UdpSocket;
use tracing::{debug, info, instrument};
use url::Url;
Expand Down Expand Up @@ -97,7 +97,7 @@ impl Connection {
return Err(Error::CryptoModeUnavailable);
}

let mut udp = UdpSocket::bind("0.0.0.0:0").await?;
let udp = UdpSocket::bind("0.0.0.0:0").await?;
udp.connect((ready.ip, ready.port)).await?;

// Follow Discord's IP Discovery procedures, in case NAT tunnelling is needed.
Expand Down Expand Up @@ -161,7 +161,9 @@ impl Connection {
let (ws_msg_tx, ws_msg_rx) = flume::unbounded();
let (udp_sender_msg_tx, udp_sender_msg_rx) = flume::unbounded();
let (udp_receiver_msg_tx, udp_receiver_msg_rx) = flume::unbounded();
let (udp_rx, udp_tx) = udp.split();

let udp_rx = Arc::new(udp);
let udp_tx = Arc::clone(&udp_rx);

let ssrc = ready.ssrc;

Expand Down
8 changes: 4 additions & 4 deletions src/driver/tasks/udp_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use discortp::{
PacketSize,
};
use flume::Receiver;
use std::collections::HashMap;
use tokio::net::udp::RecvHalf;
use std::{collections::HashMap, sync::Arc};
use tokio::net::UdpSocket;
use tracing::{error, info, instrument, warn};
use xsalsa20poly1305::XSalsa20Poly1305 as Cipher;

Expand Down Expand Up @@ -236,7 +236,7 @@ struct UdpRx {
config: Config,
packet_buffer: [u8; VOICE_PACKET_MAX],
rx: Receiver<UdpRxMessage>,
udp_socket: RecvHalf,
udp_socket: Arc<UdpSocket>,
}

impl UdpRx {
Expand Down Expand Up @@ -391,7 +391,7 @@ pub(crate) async fn runner(
rx: Receiver<UdpRxMessage>,
cipher: Cipher,
config: Config,
udp_socket: RecvHalf,
udp_socket: Arc<UdpSocket>,
) {
info!("UDP receive handle started.");

Expand Down
9 changes: 5 additions & 4 deletions src/driver/tasks/udp_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use super::message::*;
use crate::constants::*;
use discortp::discord::MutableKeepalivePacket;
use flume::Receiver;
use std::sync::Arc;
use tokio::{
net::udp::SendHalf,
time::{timeout_at, Elapsed, Instant},
net::UdpSocket,
time::{timeout_at, Instant},
};
use tracing::{error, info, instrument, trace};

#[instrument(skip(udp_msg_rx))]
pub(crate) async fn runner(udp_msg_rx: Receiver<UdpTxMessage>, ssrc: u32, mut udp_tx: SendHalf) {
pub(crate) async fn runner(udp_msg_rx: Receiver<UdpTxMessage>, ssrc: u32, udp_tx: Arc<UdpSocket>) {
info!("UDP transmit handle started.");

let mut keepalive_bytes = [0u8; MutableKeepalivePacket::minimum_packet_size()];
Expand All @@ -22,7 +23,7 @@ pub(crate) async fn runner(udp_msg_rx: Receiver<UdpTxMessage>, ssrc: u32, mut ud
loop {
use UdpTxMessage::*;
match timeout_at(ka_time, udp_msg_rx.recv_async()).await {
Err(Elapsed { .. }) => {
Err(_) => {
trace!("Sending UDP Keepalive.");
if let Err(e) = udp_tx.send(&keepalive_bytes[..]).await {
error!("Fatal UDP keepalive send error: {:?}.", e);
Expand Down
2 changes: 1 addition & 1 deletion src/driver/tasks/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl AuxNetwork {
let mut ws_error = false;
let mut should_reconnect = false;

let hb = time::delay_until(next_heartbeat);
let hb = time::sleep_until(next_heartbeat);

tokio::select! {
_ = hb => {
Expand Down
4 changes: 2 additions & 2 deletions src/tracks/command.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;
use crate::events::EventData;
use flume::Sender;
use std::time::Duration;
use tokio::sync::oneshot::Sender as OneshotSender;

/// A request from external code using a [`TrackHandle`] to modify
/// or act upon an [`Track`] object.
Expand All @@ -27,7 +27,7 @@ pub enum TrackCommand {
/// Run some closure on this track, with direct access to the core object.
Do(Box<dyn FnOnce(&mut Track) + Send + Sync + 'static>),
/// Request a read-only view of this track's state.
Request(OneshotSender<Box<TrackState>>),
Request(Sender<Box<TrackState>>),
/// Change the loop count/strategy of this track.
Loop(LoopState),
/// Prompts a track's input to become live and usable, if it is not already.
Expand Down
11 changes: 6 additions & 5 deletions src/tracks/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use crate::{
events::{Event, EventData, EventHandler},
input::Metadata,
};
use flume::Sender;
use std::{fmt, sync::Arc, time::Duration};
use tokio::sync::{mpsc::UnboundedSender, oneshot, RwLock};
use tokio::sync::RwLock;
use typemap_rev::TypeMap;
use uuid::Uuid;

Expand All @@ -25,7 +26,7 @@ pub struct TrackHandle {
}

struct InnerHandle {
command_channel: UnboundedSender<TrackCommand>,
command_channel: Sender<TrackCommand>,
seekable: bool,
uuid: Uuid,
metadata: Box<Metadata>,
Expand All @@ -50,7 +51,7 @@ impl TrackHandle {
///
/// [`Input`]: crate::input::Input
pub fn new(
command_channel: UnboundedSender<TrackCommand>,
command_channel: Sender<TrackCommand>,
seekable: bool,
uuid: Uuid,
metadata: Box<Metadata>,
Expand Down Expand Up @@ -159,10 +160,10 @@ impl TrackHandle {

/// Request playback information and state from the audio context.
pub async fn get_info(&self) -> TrackResult<Box<TrackState>> {
let (tx, rx) = oneshot::channel();
let (tx, rx) = flume::bounded(1);
self.send(TrackCommand::Request(tx))?;

rx.await.map_err(|_| TrackError::Finished)
rx.recv_async().await.map_err(|_| TrackError::Finished)
}

/// Set an audio track to loop indefinitely.
Expand Down
14 changes: 5 additions & 9 deletions src/tracks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ mod state;
pub use self::{command::*, error::*, handle::*, looping::*, mode::*, queue::*, state::*};

use crate::{constants::*, driver::tasks::message::*, events::EventStore, input::Input};
use flume::{Receiver, TryRecvError};
use std::time::Duration;
use tokio::sync::mpsc::{self, error::TryRecvError, UnboundedReceiver};
use uuid::Uuid;

/// Control object for audio playback.
Expand Down Expand Up @@ -102,7 +102,7 @@ pub struct Track {
/// Track commands are sent in this manner to ensure that access
/// occurs in a thread-safe manner, without allowing any external
/// code to lock access to audio objects and block packet generation.
pub(crate) commands: UnboundedReceiver<TrackCommand>,
pub(crate) commands: Receiver<TrackCommand>,

/// Handle for safe control of this audio track from other threads.
///
Expand All @@ -124,11 +124,7 @@ impl Track {
/// In general, you should probably use [`create_player`].
///
/// [`create_player`]: fn.create_player.html
pub fn new_raw(
source: Input,
commands: UnboundedReceiver<TrackCommand>,
handle: TrackHandle,
) -> Self {
pub fn new_raw(source: Input, commands: Receiver<TrackCommand>, handle: TrackHandle) -> Self {
let uuid = handle.uuid();

Self {
Expand Down Expand Up @@ -310,7 +306,7 @@ impl Track {
MakePlayable => self.make_playable(),
}
},
Err(TryRecvError::Closed) => {
Err(TryRecvError::Disconnected) => {
// this branch will never be visited.
break;
},
Expand Down Expand Up @@ -389,7 +385,7 @@ pub fn create_player(source: Input) -> (Track, TrackHandle) {
/// [`Track`]: Track
/// [`TrackHandle`]: TrackHandle
pub fn create_player_with_uuid(source: Input, uuid: Uuid) -> (Track, TrackHandle) {
let (tx, rx) = mpsc::unbounded_channel();
let (tx, rx) = flume::unbounded();
let can_seek = source.is_seekable();
let metadata = source.metadata.clone();
let handle = TrackHandle::new(tx, can_seek, uuid, metadata);
Expand Down

0 comments on commit f05b741

Please sign in to comment.