Skip to content

Commit

Permalink
Driver, Input: Performance & Benchmarks (#27)
Browse files Browse the repository at this point in the history
* Driver Benchmarks

Benchmarks driver use cases for single packet send,
multiple packet send, float vs opus, and the cost of
head-of-queue track removal.

Mix costs for large packet counts are also included.

This is a prelude to the optimisations discussed in
#21.

* Typo in benchmark

* Place Opus packet directly into packet buffer

Cleans up some other logic surrounding this, too. Gets a 16.9% perf improvement on opus packet passthrough (sub 5us here).

* Better track removal

In theory this should be faster, but it aint. Keeping in case
reducing struct sizes down the line magically makes this
faster.

* Reduce size of Input, TrackHandle

Metadata is now boxed away. Similarly, TrackHandles are neatly Arc'd to reduce their size to pointer length (and mitigate the impact of copies if we add in more fields).
  • Loading branch information
FelixMcFelix authored Dec 26, 2020
1 parent 2fc88a6 commit 504b8df
Show file tree
Hide file tree
Showing 23 changed files with 462 additions and 145 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
env:
RUSTDOCFLAGS: -D broken_intra_doc_links
run: |
cargo doc --no-deps --all-features
cargo doc --no-deps --features default,twilight-rustls,builtin-queue,stock-zlib
- name: Prepare docs
shell: bash -e -O extglob {0}
Expand Down
14 changes: 11 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,18 @@ serenity-deps = ["async-trait"]
youtube-dlc = []
builtin-queue = []

internals = []

[[bench]]
name = "base-mixing"
path = "benches/base-mixing.rs"
harness = false

[[bench]]
name = "mixing"
path = "benches/mixing.rs"
name = "mixing-task"
path = "benches/mixing-task.rs"
required-features = ["internals"]
harness = false

[package.metadata.docs.rs]
all-features = true
features = ["default", "twilight-rustls", "builtin-queue", "stock-zlib"]
File renamed without changes.
237 changes: 237 additions & 0 deletions benches/mixing-task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
use criterion::{
black_box,
criterion_group,
criterion_main,
BatchSize,
Bencher,
BenchmarkId,
Criterion,
};
use flume::{Receiver, Sender, TryRecvError};
use songbird::{
constants::*,
driver::bench_internals::{mixer::Mixer, task_message::*, CryptoState},
input::{cached::Compressed, Input},
tracks,
Bitrate,
};
use tokio::runtime::{Handle, Runtime};
use xsalsa20poly1305::{aead::NewAead, XSalsa20Poly1305 as Cipher, KEY_SIZE};

// create a dummied task + interconnect.
// measure perf at varying numbers of sources (binary 1--64) without passthrough support.

fn dummied_mixer(
handle: Handle,
) -> (
Mixer,
(
Receiver<CoreMessage>,
Receiver<EventMessage>,
Receiver<UdpRxMessage>,
Receiver<UdpTxMessage>,
),
) {
let (mix_tx, mix_rx) = flume::unbounded();
let (core_tx, core_rx) = flume::unbounded();
let (event_tx, event_rx) = flume::unbounded();

let (udp_sender_tx, udp_sender_rx) = flume::unbounded();
let (udp_receiver_tx, udp_receiver_rx) = flume::unbounded();

let ic = Interconnect {
core: core_tx,
events: event_tx,
mixer: mix_tx,
};

let mut out = Mixer::new(mix_rx, handle, ic, Default::default());

let fake_conn = MixerConnection {
cipher: Cipher::new_varkey(&vec![0u8; KEY_SIZE]).unwrap(),
crypto_state: CryptoState::Normal,
udp_rx: udp_receiver_tx,
udp_tx: udp_sender_tx,
};

out.conn_active = Some(fake_conn);

out.skip_sleep = true;

(out, (core_rx, event_rx, udp_receiver_rx, udp_sender_rx))
}

fn mixer_float(
num_tracks: usize,
handle: Handle,
) -> (
Mixer,
(
Receiver<CoreMessage>,
Receiver<EventMessage>,
Receiver<UdpRxMessage>,
Receiver<UdpTxMessage>,
),
) {
let mut out = dummied_mixer(handle);

let floats = utils::make_sine(10 * STEREO_FRAME_SIZE, true);

let mut tracks = vec![];
for i in 0..num_tracks {
let input = Input::float_pcm(true, floats.clone().into());
tracks.push(tracks::create_player(input).0.into());
}

out.0.tracks = tracks;

out
}

fn mixer_float_drop(
num_tracks: usize,
handle: Handle,
) -> (
Mixer,
(
Receiver<CoreMessage>,
Receiver<EventMessage>,
Receiver<UdpRxMessage>,
Receiver<UdpTxMessage>,
),
) {
let mut out = dummied_mixer(handle);

let mut tracks = vec![];
for i in 0..num_tracks {
let floats = utils::make_sine((i / 5) * STEREO_FRAME_SIZE, true);
let input = Input::float_pcm(true, floats.clone().into());
tracks.push(tracks::create_player(input).0.into());
}

out.0.tracks = tracks;

out
}

fn mixer_opus(
handle: Handle,
) -> (
Mixer,
(
Receiver<CoreMessage>,
Receiver<EventMessage>,
Receiver<UdpRxMessage>,
Receiver<UdpTxMessage>,
),
) {
// should add a single opus-based track.
// make this fully loaded to prevent any perf cost there.
let mut out = dummied_mixer(handle);

let floats = utils::make_sine(6 * STEREO_FRAME_SIZE, true);

let mut tracks = vec![];

let mut src = Compressed::new(
Input::float_pcm(true, floats.clone().into()),
Bitrate::BitsPerSecond(128_000),
)
.expect("These parameters are well-defined.");
src.raw.load_all();

tracks.push(tracks::create_player(src.into()).0.into());

out.0.tracks = tracks;

out
}

fn no_passthrough(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

let mut group = c.benchmark_group("Float Input (No Passthrough)");

for shift in 0..=6 {
let track_count = 1 << shift;

group.bench_with_input(
BenchmarkId::new("Single Packet", track_count),
&track_count,
|b, i| {
b.iter_batched_ref(
|| black_box(mixer_float(*i, rt.handle().clone())),
|input| {
black_box(input.0.cycle());
},
BatchSize::SmallInput,
)
},
);
group.bench_with_input(
BenchmarkId::new("n=5 Packets", track_count),
&track_count,
|b, i| {
b.iter_batched_ref(
|| black_box(mixer_float(*i, rt.handle().clone())),
|input| {
for i in 0..5 {
black_box(input.0.cycle());
}
},
BatchSize::SmallInput,
)
},
);
}

group.finish();
}

fn passthrough(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

let mut group = c.benchmark_group("Opus Input (Passthrough)");

group.bench_function("Single Packet", |b| {
b.iter_batched_ref(
|| black_box(mixer_opus(rt.handle().clone())),
|input| {
black_box(input.0.cycle());
},
BatchSize::SmallInput,
)
});
group.bench_function("n=5 Packets", |b| {
b.iter_batched_ref(
|| black_box(mixer_opus(rt.handle().clone())),
|input| {
for i in 0..5 {
black_box(input.0.cycle());
}
},
BatchSize::SmallInput,
)
});

group.finish();
}

fn culling(c: &mut Criterion) {
let rt = Runtime::new().unwrap();

c.bench_function("Worst-case Track Culling (15 tracks, 5 pkts)", |b| {
b.iter_batched_ref(
|| black_box(mixer_float_drop(15, rt.handle().clone())),
|input| {
for i in 0..5 {
black_box(input.0.cycle());
}
},
BatchSize::SmallInput,
)
});
}

criterion_group!(benches, no_passthrough, passthrough, culling);
criterion_main!(benches);
8 changes: 8 additions & 0 deletions src/driver/bench_internals.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
//! Various driver internals which need to be exported for benchmarking.
//!
//! Included if using the `"internals"` feature flag.
//! You should not and/or cannot use these as part of a normal application.

pub use super::tasks::{message as task_message, mixer};

pub use super::crypto::CryptoState;
4 changes: 3 additions & 1 deletion src/driver/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ impl CryptoMode {
}
}

#[allow(missing_docs)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[non_exhaustive]
pub(crate) enum CryptoState {
pub enum CryptoState {
Normal,
Suffix,
Lite(Wrapping<u32>),
Expand Down Expand Up @@ -217,6 +218,7 @@ impl CryptoState {
endpoint
}

/// Returns the underlying (stateless) type of the active crypto mode.
pub fn kind(&self) -> CryptoMode {
CryptoMode::from(*self)
}
Expand Down
6 changes: 5 additions & 1 deletion src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
//! generation from being slowed down past its deadline, or from affecting other
//! asynchronous tasks your bot must handle.

#[cfg(feature = "internals")]
pub mod bench_internals;

mod config;
pub(crate) mod connection;
mod crypto;
Expand All @@ -16,7 +19,8 @@ pub(crate) mod tasks;

pub use config::Config;
use connection::error::{Error, Result};
pub use crypto::*;
pub use crypto::CryptoMode;
pub(crate) use crypto::CryptoState;
pub use decode_mode::DecodeMode;

#[cfg(feature = "builtin-queue")]
Expand Down
6 changes: 3 additions & 3 deletions src/driver/tasks/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver<EventMe
Ok(RemoveTrack(i)) => {
info!("Event state for track {} of {} removed.", i, events.len());

events.remove(i);
states.remove(i);
handles.remove(i);
events.swap_remove(i);
states.swap_remove(i);
handles.swap_remove(i);
},
Ok(RemoveAllTracks) => {
info!("Event state for all tracks removed.");
Expand Down
2 changes: 2 additions & 0 deletions src/driver/tasks/message/core.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(missing_docs)]

use crate::{
driver::{connection::error::Error, Config},
events::EventData,
Expand Down
4 changes: 3 additions & 1 deletion src/driver/tasks/message/events.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#![allow(missing_docs)]

use crate::{
events::{CoreContext, EventData, EventStore},
tracks::{LoopState, PlayMode, TrackHandle, TrackState},
};
use std::time::Duration;

pub(crate) enum EventMessage {
pub enum EventMessage {
// Event related.
// Track events should fire off the back of state changes.
AddGlobalEvent(EventData),
Expand Down
6 changes: 4 additions & 2 deletions src/driver/tasks/message/mixer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(missing_docs)]

use super::{Interconnect, UdpRxMessage, UdpTxMessage, WsMessage};

use crate::{
Expand All @@ -8,7 +10,7 @@ use crate::{
use flume::Sender;
use xsalsa20poly1305::XSalsa20Poly1305 as Cipher;

pub(crate) struct MixerConnection {
pub struct MixerConnection {
pub cipher: Cipher,
pub crypto_state: CryptoState,
pub udp_rx: Sender<UdpRxMessage>,
Expand All @@ -22,7 +24,7 @@ impl Drop for MixerConnection {
}
}

pub(crate) enum MixerMessage {
pub enum MixerMessage {
AddTrack(Track),
SetTrack(Option<Track>),

Expand Down
6 changes: 4 additions & 2 deletions src/driver/tasks/message/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
#![allow(missing_docs)]

mod core;
mod events;
mod mixer;
mod udp_rx;
mod udp_tx;
mod ws;

pub(crate) use self::{core::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*};
pub use self::{core::*, events::*, mixer::*, udp_rx::*, udp_tx::*, ws::*};

use flume::Sender;
use tracing::info;

#[derive(Clone, Debug)]
pub(crate) struct Interconnect {
pub struct Interconnect {
pub core: Sender<CoreMessage>,
pub events: Sender<EventMessage>,
pub mixer: Sender<MixerMessage>,
Expand Down
Loading

0 comments on commit 504b8df

Please sign in to comment.