Skip to content

Commit

Permalink
subscriber: update sharded-slab to 0.1, pool hashmap allocations (tok…
Browse files Browse the repository at this point in the history
…io-rs#1062)

## Motivation

hawkw/sharded-slab#45 changes `sharded-slab` so that the per-shard
metadata is allocated only when a new shard is created, rather than all
up front when the slab is created. This fixes the very large amount of
memory allocated by simply creating a new `Registry` without actually
collecting any traces.

## Solution

This branch updates `tracing-subscriber` to depend on `sharded-slab`
0.1.0, which includes the upstream fix.

In addition, this branch the registry from using `sharded_slab::Slab` to
`sharded_slab::Pool`. This allows us to clear hashmap allocations for
extensions in-place, retaining the already allocated maps. This should
improve `new_span` performance a bit.

Fixes tokio-rs#1005
  • Loading branch information
hawkw authored Oct 22, 2020
1 parent eeb6974 commit 1bf9da2
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 86 deletions.
2 changes: 1 addition & 1 deletion tracing-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ tracing-serde = { path = "../tracing-serde", version = "0.2", optional = true }
parking_lot = { version = ">= 0.7, <= 0.11", optional = true }

# registry
sharded-slab = { version = "^0.0.9", optional = true }
sharded-slab = { version = "0.1.0", optional = true }
thread_local = { version = "1.0.1", optional = true }

[dev-dependencies]
Expand Down
87 changes: 81 additions & 6 deletions tracing-subscriber/benches/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ fn bench_new_span(c: &mut Criterion) {
group.bench_with_input(BenchmarkId::new("multithreaded", i), i, |b, &i| {
b.iter_custom(|iters| {
let mut total = Duration::from_secs(0);
let dispatch = mk_dispatch();
for _ in 0..iters {
let bench = MultithreadedBench::new(mk_dispatch());
let bench = MultithreadedBench::new(dispatch.clone());
let elapsed = bench
.thread(move || {
for n in 0..i {
Expand Down Expand Up @@ -95,7 +96,8 @@ fn mk_dispatch() -> tracing::Dispatch {
fn bench_event(c: &mut Criterion) {
bench_thrpt(c, "event", |group, i| {
group.bench_with_input(BenchmarkId::new("root/single_threaded", i), i, |b, &i| {
tracing::dispatch::with_default(&mk_dispatch(), || {
let dispatch = mk_dispatch();
tracing::dispatch::with_default(&dispatch, || {
b.iter(|| {
for n in 0..i {
tracing::info!(n);
Expand All @@ -106,8 +108,9 @@ fn bench_event(c: &mut Criterion) {
group.bench_with_input(BenchmarkId::new("root/multithreaded", i), i, |b, &i| {
b.iter_custom(|iters| {
let mut total = Duration::from_secs(0);
let dispatch = mk_dispatch();
for _ in 0..iters {
let bench = MultithreadedBench::new(mk_dispatch());
let bench = MultithreadedBench::new(dispatch.clone());
let elapsed = bench
.thread(move || {
for n in 0..i {
Expand Down Expand Up @@ -156,8 +159,9 @@ fn bench_event(c: &mut Criterion) {
|b, &i| {
b.iter_custom(|iters| {
let mut total = Duration::from_secs(0);
let dispatch = mk_dispatch();
for _ in 0..iters {
let bench = MultithreadedBench::new(mk_dispatch());
let bench = MultithreadedBench::new(dispatch.clone());
let elapsed = bench
.thread_with_setup(move |start| {
let span = tracing::info_span!("unique_parent", foo = false);
Expand Down Expand Up @@ -203,13 +207,13 @@ fn bench_event(c: &mut Criterion) {
i,
|b, &i| {
b.iter_custom(|iters| {
let dispatch = mk_dispatch();
let mut total = Duration::from_secs(0);
for _ in 0..iters {
let dispatch = mk_dispatch();
let parent = tracing::dispatch::with_default(&dispatch, || {
tracing::info_span!("shared_parent", foo = "hello world")
});
let bench = MultithreadedBench::new(dispatch);
let bench = MultithreadedBench::new(dispatch.clone());
let parent2 = parent.clone();
bench.thread_with_setup(move |start| {
let _guard = parent2.enter();
Expand Down Expand Up @@ -249,6 +253,77 @@ fn bench_event(c: &mut Criterion) {
})
},
);
group.bench_with_input(
BenchmarkId::new("multi-parent/multithreaded", i),
i,
|b, &i| {
b.iter_custom(|iters| {
let dispatch = mk_dispatch();
let mut total = Duration::from_secs(0);
for _ in 0..iters {
let parent = tracing::dispatch::with_default(&dispatch, || {
tracing::info_span!("multiparent", foo = "hello world")
});
let bench = MultithreadedBench::new(dispatch.clone());
let parent2 = parent.clone();
bench.thread_with_setup(move |start| {
let _guard = parent2.enter();
start.wait();
let mut span = tracing::info_span!("parent");
for n in 0..i {
let s = tracing::info_span!(parent: &span, "parent2", n, i);
s.in_scope(|| {
tracing::info!(n);
});
span = s;
}
});
let parent2 = parent.clone();
bench.thread_with_setup(move |start| {
let _guard = parent2.enter();
start.wait();
let mut span = tracing::info_span!("parent");
for n in 0..i {
let s = tracing::info_span!(parent: &span, "parent2", n, i);
s.in_scope(|| {
tracing::info!(n);
});
span = s;
}
});
let parent2 = parent.clone();
bench.thread_with_setup(move |start| {
let _guard = parent2.enter();
start.wait();
let mut span = tracing::info_span!("parent");
for n in 0..i {
let s = tracing::info_span!(parent: &span, "parent2", n, i);
s.in_scope(|| {
tracing::info!(n);
});
span = s;
}
});
let parent2 = parent.clone();
bench.thread_with_setup(move |start| {
let _guard = parent2.enter();
start.wait();
let mut span = tracing::info_span!("parent");
for n in 0..i {
let s = tracing::info_span!(parent: &span, "parent2", n, i);
s.in_scope(|| {
tracing::info!(n);
});
span = s;
}
});
let elapsed = bench.run();
total += elapsed;
}
total
})
},
);
});
}

Expand Down
129 changes: 96 additions & 33 deletions tracing-subscriber/src/registry/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,16 @@ impl<'a> ExtensionsMut<'a> {
/// data that it is interested in recording and emitting.
#[derive(Default)]
pub(crate) struct ExtensionsInner {
// If extensions are never used, no need to carry around an empty HashMap.
// That's 3 words. Instead, this is only 1 word.
map: Option<Box<AnyMap>>,
map: AnyMap,
}

impl ExtensionsInner {
/// Create an empty `Extensions`.
#[inline]
pub(crate) fn new() -> ExtensionsInner {
ExtensionsInner { map: None }
ExtensionsInner {
map: AnyMap::default(),
}
}

/// Insert a type into this `Extensions`.
Expand All @@ -129,7 +129,6 @@ impl ExtensionsInner {
/// be returned.
pub(crate) fn insert<T: Send + Sync + 'static>(&mut self, val: T) -> Option<T> {
self.map
.get_or_insert_with(|| Box::new(HashMap::default()))
.insert(TypeId::of::<T>(), Box::new(val))
.and_then(|boxed| {
#[allow(warnings)]
Expand All @@ -145,60 +144,124 @@ impl ExtensionsInner {
/// Get a reference to a type previously inserted on this `Extensions`.
pub(crate) fn get<T: 'static>(&self) -> Option<&T> {
self.map
.as_ref()
.and_then(|map| map.get(&TypeId::of::<T>()))
.get(&TypeId::of::<T>())
.and_then(|boxed| (&**boxed as &(dyn Any + 'static)).downcast_ref())
}

/// Get a mutable reference to a type previously inserted on this `Extensions`.
pub(crate) fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {
self.map
.as_mut()
.and_then(|map| map.get_mut(&TypeId::of::<T>()))
.get_mut(&TypeId::of::<T>())
.and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut())
}

/// Remove a type from this `Extensions`.
///
/// If a extension of this type existed, it will be returned.
pub(crate) fn remove<T: Send + Sync + 'static>(&mut self) -> Option<T> {
self.map
.as_mut()
.and_then(|map| map.remove(&TypeId::of::<T>()))
.and_then(|boxed| {
#[allow(warnings)]
{
(boxed as Box<Any + 'static>)
.downcast()
.ok()
.map(|boxed| *boxed)
}
})
self.map.remove(&TypeId::of::<T>()).and_then(|boxed| {
#[allow(warnings)]
{
(boxed as Box<Any + 'static>)
.downcast()
.ok()
.map(|boxed| *boxed)
}
})
}

/// Clear the `ExtensionsInner` in-place, dropping any elements in the map but
/// retaining allocated capacity.
///
/// This permits the hash map allocation to be pooled by the registry so
/// that future spans will not need to allocate new hashmaps.
pub(crate) fn clear(&mut self) {
self.map.clear();
}
}

impl fmt::Debug for ExtensionsInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Extensions").finish()
f.debug_struct("Extensions")
.field("len", &self.map.len())
.field("capacity", &self.map.capacity())
.finish()
}
}

#[test]
fn test_extensions() {
#[cfg(test)]
mod tests {
use super::*;

#[derive(Debug, PartialEq)]
struct MyType(i32);

let mut extensions = ExtensionsInner::new();
#[test]
fn test_extensions() {
let mut extensions = ExtensionsInner::new();

extensions.insert(5i32);
extensions.insert(MyType(10));

assert_eq!(extensions.get(), Some(&5i32));
assert_eq!(extensions.get_mut(), Some(&mut 5i32));

assert_eq!(extensions.remove::<i32>(), Some(5i32));
assert!(extensions.get::<i32>().is_none());

assert_eq!(extensions.get::<bool>(), None);
assert_eq!(extensions.get(), Some(&MyType(10)));
}

#[test]
fn clear_retains_capacity() {
let mut extensions = ExtensionsInner::new();
extensions.insert(5i32);
extensions.insert(MyType(10));
extensions.insert(true);

assert_eq!(extensions.map.len(), 3);
let prev_capacity = extensions.map.capacity();
extensions.clear();

assert_eq!(
extensions.map.len(),
0,
"after clear(), extensions map should have length 0"
);
assert_eq!(
extensions.map.capacity(),
prev_capacity,
"after clear(), extensions map should retain prior capacity"
);
}

#[test]
fn clear_drops_elements() {
use std::sync::Arc;
struct DropMePlease(Arc<()>);
struct DropMeTooPlease(Arc<()>);

extensions.insert(5i32);
extensions.insert(MyType(10));
let mut extensions = ExtensionsInner::new();
let val1 = DropMePlease(Arc::new(()));
let val2 = DropMeTooPlease(Arc::new(()));

assert_eq!(extensions.get(), Some(&5i32));
assert_eq!(extensions.get_mut(), Some(&mut 5i32));
let val1_dropped = Arc::downgrade(&val1.0);
let val2_dropped = Arc::downgrade(&val2.0);
extensions.insert(val1);
extensions.insert(val2);

assert_eq!(extensions.remove::<i32>(), Some(5i32));
assert!(extensions.get::<i32>().is_none());
assert!(val1_dropped.upgrade().is_some());
assert!(val2_dropped.upgrade().is_some());

assert_eq!(extensions.get::<bool>(), None);
assert_eq!(extensions.get(), Some(&MyType(10)));
extensions.clear();
assert!(
val1_dropped.upgrade().is_none(),
"after clear(), val1 should be dropped"
);
assert!(
val2_dropped.upgrade().is_none(),
"after clear(), val2 should be dropped"
);
}
}
Loading

0 comments on commit 1bf9da2

Please sign in to comment.