Skip to content

Commit

Permalink
Driver: Split receive into its own feature (#141)
Browse files Browse the repository at this point in the history
Adds the "receive" feature, which is disabled by default. When this is disabled, the UDP receive task is not compiled and not run, and as an optimisation the UDP receive buffer size is set to 0. All related events are also removed.

This also removes the UDP Tx task, and moves packet and keepalive sends back into the mixer thread. This allows us to entirely remove channels and various allocations between the mixer and an async task created only for sending data (i.e., fewer memcopies).

If "receive" is enabled, UDP sends are now non-blocking due to technical constraints -- failure to send is non-fatal, but *will* drop affected packets. Given that blocking on a UDP send indicates that the OS cannot clear send buffers fast enough, this should alleviate OS load.

Closes #131.
  • Loading branch information
FelixMcFelix committed Nov 20, 2023
1 parent c1d93f7 commit 2277595
Show file tree
Hide file tree
Showing 27 changed files with 294 additions and 201 deletions.
5 changes: 2 additions & 3 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ Audio processing remains synchronous for the following reasons:
Songbird subdivides voice connection handling into several long- and short-lived tasks.

* **Core**: Handles and directs commands received from the driver. Responsible for connection/reconnection, and creates network tasks.
* **Mixer**: Combines audio sources together, Opus encodes the result, and encrypts the built packets every 20ms. Responsible for handling track commands/state. ***Synchronous***.
* **Mixer**: Combines audio sources together, Opus encodes the result, and encrypts the built packets every 20ms. Responsible for handling track commands/state, and transmitting completed voice packets and keepalive messages. ***Synchronous***.
* **Thread Pool**: A dynamically sized thread-pool for I/O tasks. Creates lazy tracks using `Compose` if sync creation is needed, otherwise spawns a tokio task. Seek operations always go to the thread pool. ***Synchronous***.
* **Disposer**: Used by mixer thread to dispose of data with potentially long/blocking `Drop` implementations (i.e., audio sources). ***Synchronous***.
* **Events**: Stores and runs event handlers, tracks event timing, and handles
* **Websocket**: *Network task.* Sends speaking status updates and keepalives to Discord, and receives client (dis)connect events.
* **UDP Tx**: *Network task.* Responsible for transmitting completed voice packets.
* **UDP Rx**: *Network task.* Decrypts/decodes received voice packets and statistics information.
* **UDP Rx**: *Optional network task.* Decrypts/decodes received voice packets and statistics information.

*Note: all tasks are able to message the permanent tasks via a block of interconnecting channels.*

Expand Down
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ audiopus = { optional = true, version = "0.3.0-rc.0" }
byteorder = { optional = true, version = "1" }
dashmap = { optional = true, version = "5" }
derivative = "2"
discortp = { features = ["discord-full"], optional = true, version = "0.5" }
discortp = { default-features = false, features = ["discord", "pnet", "rtp"], optional = true, version = "0.5" }
flume = { optional = true, version = "0.10" }
futures = "0.3"
once_cell = { optional = true, version = "1" }
Expand All @@ -32,8 +32,9 @@ rubato = { optional = true, version = "0.12" }
rusty_pool = { optional = true, version = "0.7" }
serde = { version = "1", features = ["derive"] }
serde-aux = { default-features = false, optional = true, version = "3"}
simd-json = { features = ["serde_impl"], optional = true, version = "0.6.0" }
serde_json = "1"
simd-json = { features = ["serde_impl"], optional = true, version = "0.6.0" }
socket2 = { optional = true, version = "0.4" }
streamcatcher = { optional = true, version = "1" }
tokio = { default-features = false, optional = true, version = "1.0" }
tokio-tungstenite = { optional = true, version = "0.17" }
Expand Down Expand Up @@ -110,6 +111,7 @@ driver = [
"dep:rusty_pool",
"dep:serde-aux",
"dep:serenity-voice-model",
"dep:socket2",
"dep:streamcatcher",
"dep:symphonia",
"dep:symphonia-core",
Expand Down Expand Up @@ -145,9 +147,10 @@ twilight = ["dep:twilight-gateway","dep:twilight-model"]

# Behaviour altering features.
builtin-queue = []
receive = ["discortp?/demux", "discortp?/rtcp"]

# Used for docgen/testing/benchmarking.
full-doc = ["default", "twilight", "builtin-queue"]
full-doc = ["default", "twilight", "builtin-queue", "receive"]
internals = []

[[bench]]
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ The library offers:
* A standalone driver for voice calls, via the `"driver"` feature. If you can create
a `ConnectionInfo` using any other gateway, or language for your bot, then you
can run the songbird voice driver.
* And, by default, a fully featured voice system featuring events, queues, RT(C)P packet
handling, seeking on compatible streams, shared multithreaded audio stream caches,
* Voice receive and RT(C)P packet handling via the `"receive"` feature.
* SIMD-accelerated JSON decoding via the `"simd-json"` feature.
* And, by default, a fully featured voice system featuring events, queues,
seeking on compatible streams, shared multithreaded audio stream caches,
and direct Opus data passthrough from DCA files.
* To be able to use `simd-json` from serenity, you will need to enable the `simdjson`
feature on both songbird and serenity.

## Intents
Songbird's gateway functionality requires you to specify the `GUILD_VOICE_STATES` intent.
Expand Down
16 changes: 8 additions & 8 deletions benches/mixing-task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use songbird::{
tracks,
Config,
};
use std::io::Cursor;
use std::{io::Cursor, net::UdpSocket};
use tokio::runtime::{Handle, Runtime};
use xsalsa20poly1305::{aead::NewAead, XSalsa20Poly1305 as Cipher, KEY_SIZE};

Expand All @@ -41,14 +41,12 @@ fn dummied_mixer(
Receiver<CoreMessage>,
Receiver<EventMessage>,
Receiver<UdpRxMessage>,
Receiver<UdpTxMessage>,
),
) {
let (mix_tx, mix_rx) = flume::unbounded();
let (core_tx, core_rx) = flume::unbounded();
let (event_tx, event_rx) = flume::unbounded();

let (udp_sender_tx, udp_sender_rx) = flume::unbounded();
let (udp_receiver_tx, udp_receiver_rx) = flume::unbounded();

let ic = Interconnect {
Expand All @@ -61,18 +59,23 @@ fn dummied_mixer(

let mut out = Mixer::new(mix_rx, handle, ic, config);

let udp_tx = UdpSocket::bind("0.0.0.0:0").expect("Failed to create send port.");
udp_tx
.connect("127.0.0.1:5316")
.expect("Failed to connect to local dest port.");

let fake_conn = MixerConnection {
cipher: Cipher::new_from_slice(&vec![0u8; KEY_SIZE]).unwrap(),
crypto_state: CryptoState::Normal,
udp_rx: udp_receiver_tx,
udp_tx: udp_sender_tx,
udp_tx,
};

out.conn_active = Some(fake_conn);

out.skip_sleep = true;

(out, (core_rx, event_rx, udp_receiver_rx, udp_sender_rx))
(out, (core_rx, event_rx, udp_receiver_rx))
}

fn mixer_float(
Expand All @@ -85,7 +88,6 @@ fn mixer_float(
Receiver<CoreMessage>,
Receiver<EventMessage>,
Receiver<UdpRxMessage>,
Receiver<UdpTxMessage>,
),
) {
let mut out = dummied_mixer(handle, softclip);
Expand Down Expand Up @@ -115,7 +117,6 @@ fn mixer_float_drop(
Receiver<CoreMessage>,
Receiver<EventMessage>,
Receiver<UdpRxMessage>,
Receiver<UdpTxMessage>,
),
) {
let mut out = dummied_mixer(handle, true);
Expand Down Expand Up @@ -143,7 +144,6 @@ fn mixer_opus(
Receiver<CoreMessage>,
Receiver<EventMessage>,
Receiver<UdpRxMessage>,
Receiver<UdpTxMessage>,
),
) {
// should add a single opus-based track.
Expand Down
3 changes: 3 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ members = [
"serenity/voice_receive",
"twilight",
]

[profile.release]
debug = true
1 change: 1 addition & 0 deletions examples/serenity/voice_receive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ tracing-subscriber = "0.2"
tracing-futures = "0.2"

[dependencies.songbird]
features = ["receive"]
path = "../../../"

[dependencies.serenity]
Expand Down
Binary file modified images/arch.afdesign
Binary file not shown.
Binary file modified images/driver.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 0 additions & 14 deletions images/driver.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified images/gateway.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
22 changes: 17 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#[cfg(feature = "receive")]
use crate::driver::DecodeMode;
#[cfg(feature = "driver")]
use crate::{
driver::{retry::Retry, CryptoMode, DecodeMode, MixMode},
driver::{retry::Retry, CryptoMode, MixMode},
input::codecs::*,
};

Expand Down Expand Up @@ -29,7 +31,8 @@ pub struct Config {
///
/// [`CryptoMode::Normal`]: CryptoMode::Normal
pub crypto_mode: CryptoMode,
#[cfg(feature = "driver")]

#[cfg(all(feature = "driver", feature = "receive"))]
/// Configures whether decoding and decryption occur for all received packets.
///
/// If voice receiving voice packets, generally you should choose [`DecodeMode::Decode`].
Expand All @@ -45,6 +48,7 @@ pub struct Config {
/// [`DecodeMode::Pass`]: DecodeMode::Pass
/// [user speaking events]: crate::events::CoreEvent::SpeakingUpdate
pub decode_mode: DecodeMode,

#[cfg(feature = "gateway")]
/// Configures the amount of time to wait for Discord to reply with connection information
/// if [`Call::join`]/[`join_gateway`] are used.
Expand All @@ -58,14 +62,16 @@ pub struct Config {
/// [`Call::join`]: crate::Call::join
/// [`join_gateway`]: crate::Call::join_gateway
pub gateway_timeout: Option<Duration>,

#[cfg(feature = "driver")]
/// Configures the maximum amount of time to wait for an attempted voice
/// connection to Discord.
/// Configures whether the driver will mix and output stereo or mono Opus data
/// over a voice channel.
///
/// Defaults to [`Stereo`].
///
/// [`Stereo`]: MixMode::Stereo
pub mix_mode: MixMode,

#[cfg(feature = "driver")]
/// Number of concurrently active tracks to allocate memory for.
///
Expand All @@ -79,6 +85,7 @@ pub struct Config {
/// Changes to this field in a running driver will only ever increase
/// the capacity of the track store.
pub preallocated_tracks: usize,

#[cfg(feature = "driver")]
/// Connection retry logic for the [`Driver`].
///
Expand All @@ -87,6 +94,7 @@ pub struct Config {
///
/// [`Driver`]: crate::driver::Driver
pub driver_retry: Retry,

#[cfg(feature = "driver")]
/// Configures whether or not each mixed audio packet is [soft-clipped] into the
/// [-1, 1] audio range.
Expand All @@ -101,12 +109,14 @@ pub struct Config {
///
/// [soft-clipped]: https://opus-codec.org/docs/opus_api-1.3.1/group__opus__decoder.html#gaff99598b352e8939dded08d96e125e0b
pub use_softclip: bool,

#[cfg(feature = "driver")]
/// Configures the maximum amount of time to wait for an attempted voice
/// connection to Discord.
///
/// Defaults to 10 seconds. If set to `None`, connections will never time out.
pub driver_timeout: Option<Duration>,

#[cfg(feature = "driver")]
#[derivative(Debug = "ignore")]
/// Registry of the inner codecs supported by the driver, adding audiopus-based
Expand All @@ -116,6 +126,7 @@ pub struct Config {
///
/// [`CODEC_REGISTRY`]: static@CODEC_REGISTRY
pub codec_registry: &'static CodecRegistry,

#[cfg(feature = "driver")]
#[derivative(Debug = "ignore")]
/// Registry of the muxers and container formats supported by the driver.
Expand All @@ -142,7 +153,7 @@ impl Default for Config {
Self {
#[cfg(feature = "driver")]
crypto_mode: CryptoMode::Normal,
#[cfg(feature = "driver")]
#[cfg(all(feature = "driver", feature = "receive"))]
decode_mode: DecodeMode::Decrypt,
#[cfg(feature = "gateway")]
gateway_timeout: Some(Duration::from_secs(10)),
Expand Down Expand Up @@ -179,6 +190,7 @@ impl Config {
self
}

#[cfg(feature = "receive")]
/// Sets this `Config`'s received packet decryption/decoding behaviour.
#[must_use]
pub fn decode_mode(mut self, decode_mode: DecodeMode) -> Self {
Expand Down
41 changes: 34 additions & 7 deletions src/driver/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
pub mod error;

#[cfg(feature = "receive")]
use super::tasks::udp_rx;
use super::{
tasks::{message::*, udp_rx, udp_tx, ws as ws_task},
tasks::{message::*, ws as ws_task},
Config,
CryptoMode,
};
Expand All @@ -18,7 +20,8 @@ use crate::{
use discortp::discord::{IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket};
use error::{Error, Result};
use flume::Sender;
use std::{net::IpAddr, str::FromStr, sync::Arc};
use socket2::Socket;
use std::{net::IpAddr, str::FromStr};
use tokio::{net::UdpSocket, spawn, time::timeout};
use tracing::{debug, info, instrument};
use url::Url;
Expand Down Expand Up @@ -103,6 +106,16 @@ impl Connection {
}

let udp = UdpSocket::bind("0.0.0.0:0").await?;

// Optimisation for non-receive case: set rx buffer size to zero.
let udp = if cfg!(feature = "receive") {
udp
} else {
let socket = Socket::from(udp.into_std()?);
socket.set_recv_buffer_size(0)?;
UdpSocket::from_std(socket.into())?
};

udp.connect((ready.ip, ready.port)).await?;

// Follow Discord's IP Discovery procedures, in case NAT tunnelling is needed.
Expand Down Expand Up @@ -164,22 +177,36 @@ impl Connection {
info!("WS heartbeat duration {}ms.", hello.heartbeat_interval,);

let (ws_msg_tx, ws_msg_rx) = flume::unbounded();
let (udp_sender_msg_tx, udp_sender_msg_rx) = flume::unbounded();
#[cfg(feature = "receive")]
let (udp_receiver_msg_tx, udp_receiver_msg_rx) = flume::unbounded();

// NOTE: This causes the UDP Socket on "receive" to be non-blocking,
// and the standard to be blocking. A UDP send should only WouldBlock if
// you're sending more data than the OS can handle (not likely, and
// at that point you should scale horizontally).
//
// If this is a problem for anyone, we can make non-blocking sends
// queue up a delayed send up to a limit.
#[cfg(feature = "receive")]
let (udp_rx, udp_tx) = {
let udp_rx = Arc::new(udp);
let udp_tx = Arc::clone(&udp_rx);
let udp_tx = udp.into_std()?;
let udp_rx = UdpSocket::from_std(udp_tx.try_clone()?)?;
(udp_rx, udp_tx)
};
#[cfg(not(feature = "receive"))]
let udp_tx = udp.into_std()?;

let ssrc = ready.ssrc;

let mix_conn = MixerConnection {
#[cfg(feature = "receive")]
cipher: cipher.clone(),
#[cfg(not(feature = "receive"))]
cipher,
crypto_state: config.crypto_mode.into(),
#[cfg(feature = "receive")]
udp_rx: udp_receiver_msg_tx,
udp_tx: udp_sender_msg_tx,
udp_tx,
};

interconnect
Expand All @@ -200,14 +227,14 @@ impl Connection {
info.clone(),
));

#[cfg(feature = "receive")]
spawn(udp_rx::runner(
interconnect.clone(),
udp_receiver_msg_rx,
cipher,
config.clone(),
udp_rx,
));
spawn(udp_tx::runner(udp_sender_msg_rx, ssrc, udp_tx));

Ok(Connection {
info,
Expand Down
Loading

0 comments on commit 2277595

Please sign in to comment.