From 1bf9da2bc7fee836bbb85051d42570ed8b7c5731 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 22 Oct 2020 15:10:00 -0700 Subject: [PATCH] subscriber: update sharded-slab to 0.1, pool hashmap allocations (#1062) ## Motivation hawkw/sharded-slab#45 changes `sharded-slab` so that the per-shard metadata is allocated only when a new shard is created, rather than all up front when the slab is created. This fixes the very large amount of memory allocated by simply creating a new `Registry` without actually collecting any traces. ## Solution This branch updates `tracing-subscriber` to depend on `sharded-slab` 0.1.0, which includes the upstream fix. In addition, this branch the registry from using `sharded_slab::Slab` to `sharded_slab::Pool`. This allows us to clear hashmap allocations for extensions in-place, retaining the already allocated maps. This should improve `new_span` performance a bit. Fixes #1005 --- tracing-subscriber/Cargo.toml | 2 +- tracing-subscriber/benches/fmt.rs | 87 ++++++++- tracing-subscriber/src/registry/extensions.rs | 129 ++++++++++---- tracing-subscriber/src/registry/sharded.rs | 165 +++++++++++++----- tracing-subscriber/src/sync.rs | 5 + 5 files changed, 302 insertions(+), 86 deletions(-) diff --git a/tracing-subscriber/Cargo.toml b/tracing-subscriber/Cargo.toml index e79ca3d17a..e5c93916e2 100644 --- a/tracing-subscriber/Cargo.toml +++ b/tracing-subscriber/Cargo.toml @@ -54,7 +54,7 @@ tracing-serde = { path = "../tracing-serde", version = "0.2", optional = true } parking_lot = { version = ">= 0.7, <= 0.11", optional = true } # registry -sharded-slab = { version = "^0.0.9", optional = true } +sharded-slab = { version = "0.1.0", optional = true } thread_local = { version = "1.0.1", optional = true } [dev-dependencies] diff --git a/tracing-subscriber/benches/fmt.rs b/tracing-subscriber/benches/fmt.rs index f8dd4759f4..390b19a249 100644 --- a/tracing-subscriber/benches/fmt.rs +++ b/tracing-subscriber/benches/fmt.rs @@ -41,8 +41,9 @@ fn bench_new_span(c: &mut Criterion) { group.bench_with_input(BenchmarkId::new("multithreaded", i), i, |b, &i| { b.iter_custom(|iters| { let mut total = Duration::from_secs(0); + let dispatch = mk_dispatch(); for _ in 0..iters { - let bench = MultithreadedBench::new(mk_dispatch()); + let bench = MultithreadedBench::new(dispatch.clone()); let elapsed = bench .thread(move || { for n in 0..i { @@ -95,7 +96,8 @@ fn mk_dispatch() -> tracing::Dispatch { fn bench_event(c: &mut Criterion) { bench_thrpt(c, "event", |group, i| { group.bench_with_input(BenchmarkId::new("root/single_threaded", i), i, |b, &i| { - tracing::dispatch::with_default(&mk_dispatch(), || { + let dispatch = mk_dispatch(); + tracing::dispatch::with_default(&dispatch, || { b.iter(|| { for n in 0..i { tracing::info!(n); @@ -106,8 +108,9 @@ fn bench_event(c: &mut Criterion) { group.bench_with_input(BenchmarkId::new("root/multithreaded", i), i, |b, &i| { b.iter_custom(|iters| { let mut total = Duration::from_secs(0); + let dispatch = mk_dispatch(); for _ in 0..iters { - let bench = MultithreadedBench::new(mk_dispatch()); + let bench = MultithreadedBench::new(dispatch.clone()); let elapsed = bench .thread(move || { for n in 0..i { @@ -156,8 +159,9 @@ fn bench_event(c: &mut Criterion) { |b, &i| { b.iter_custom(|iters| { let mut total = Duration::from_secs(0); + let dispatch = mk_dispatch(); for _ in 0..iters { - let bench = MultithreadedBench::new(mk_dispatch()); + let bench = MultithreadedBench::new(dispatch.clone()); let elapsed = bench .thread_with_setup(move |start| { let span = tracing::info_span!("unique_parent", foo = false); @@ -203,13 +207,13 @@ fn bench_event(c: &mut Criterion) { i, |b, &i| { b.iter_custom(|iters| { + let dispatch = mk_dispatch(); let mut total = Duration::from_secs(0); for _ in 0..iters { - let dispatch = mk_dispatch(); let parent = tracing::dispatch::with_default(&dispatch, || { tracing::info_span!("shared_parent", foo = "hello world") }); - let bench = MultithreadedBench::new(dispatch); + let bench = MultithreadedBench::new(dispatch.clone()); let parent2 = parent.clone(); bench.thread_with_setup(move |start| { let _guard = parent2.enter(); @@ -249,6 +253,77 @@ fn bench_event(c: &mut Criterion) { }) }, ); + group.bench_with_input( + BenchmarkId::new("multi-parent/multithreaded", i), + i, + |b, &i| { + b.iter_custom(|iters| { + let dispatch = mk_dispatch(); + let mut total = Duration::from_secs(0); + for _ in 0..iters { + let parent = tracing::dispatch::with_default(&dispatch, || { + tracing::info_span!("multiparent", foo = "hello world") + }); + let bench = MultithreadedBench::new(dispatch.clone()); + let parent2 = parent.clone(); + bench.thread_with_setup(move |start| { + let _guard = parent2.enter(); + start.wait(); + let mut span = tracing::info_span!("parent"); + for n in 0..i { + let s = tracing::info_span!(parent: &span, "parent2", n, i); + s.in_scope(|| { + tracing::info!(n); + }); + span = s; + } + }); + let parent2 = parent.clone(); + bench.thread_with_setup(move |start| { + let _guard = parent2.enter(); + start.wait(); + let mut span = tracing::info_span!("parent"); + for n in 0..i { + let s = tracing::info_span!(parent: &span, "parent2", n, i); + s.in_scope(|| { + tracing::info!(n); + }); + span = s; + } + }); + let parent2 = parent.clone(); + bench.thread_with_setup(move |start| { + let _guard = parent2.enter(); + start.wait(); + let mut span = tracing::info_span!("parent"); + for n in 0..i { + let s = tracing::info_span!(parent: &span, "parent2", n, i); + s.in_scope(|| { + tracing::info!(n); + }); + span = s; + } + }); + let parent2 = parent.clone(); + bench.thread_with_setup(move |start| { + let _guard = parent2.enter(); + start.wait(); + let mut span = tracing::info_span!("parent"); + for n in 0..i { + let s = tracing::info_span!(parent: &span, "parent2", n, i); + s.in_scope(|| { + tracing::info!(n); + }); + span = s; + } + }); + let elapsed = bench.run(); + total += elapsed; + } + total + }) + }, + ); }); } diff --git a/tracing-subscriber/src/registry/extensions.rs b/tracing-subscriber/src/registry/extensions.rs index e3b98363e7..87ec8edee8 100644 --- a/tracing-subscriber/src/registry/extensions.rs +++ b/tracing-subscriber/src/registry/extensions.rs @@ -111,16 +111,16 @@ impl<'a> ExtensionsMut<'a> { /// data that it is interested in recording and emitting. #[derive(Default)] pub(crate) struct ExtensionsInner { - // If extensions are never used, no need to carry around an empty HashMap. - // That's 3 words. Instead, this is only 1 word. - map: Option>, + map: AnyMap, } impl ExtensionsInner { /// Create an empty `Extensions`. #[inline] pub(crate) fn new() -> ExtensionsInner { - ExtensionsInner { map: None } + ExtensionsInner { + map: AnyMap::default(), + } } /// Insert a type into this `Extensions`. @@ -129,7 +129,6 @@ impl ExtensionsInner { /// be returned. pub(crate) fn insert(&mut self, val: T) -> Option { self.map - .get_or_insert_with(|| Box::new(HashMap::default())) .insert(TypeId::of::(), Box::new(val)) .and_then(|boxed| { #[allow(warnings)] @@ -145,16 +144,14 @@ impl ExtensionsInner { /// Get a reference to a type previously inserted on this `Extensions`. pub(crate) fn get(&self) -> Option<&T> { self.map - .as_ref() - .and_then(|map| map.get(&TypeId::of::())) + .get(&TypeId::of::()) .and_then(|boxed| (&**boxed as &(dyn Any + 'static)).downcast_ref()) } /// Get a mutable reference to a type previously inserted on this `Extensions`. pub(crate) fn get_mut(&mut self) -> Option<&mut T> { self.map - .as_mut() - .and_then(|map| map.get_mut(&TypeId::of::())) + .get_mut(&TypeId::of::()) .and_then(|boxed| (&mut **boxed as &mut (dyn Any + 'static)).downcast_mut()) } @@ -162,43 +159,109 @@ impl ExtensionsInner { /// /// If a extension of this type existed, it will be returned. pub(crate) fn remove(&mut self) -> Option { - self.map - .as_mut() - .and_then(|map| map.remove(&TypeId::of::())) - .and_then(|boxed| { - #[allow(warnings)] - { - (boxed as Box) - .downcast() - .ok() - .map(|boxed| *boxed) - } - }) + self.map.remove(&TypeId::of::()).and_then(|boxed| { + #[allow(warnings)] + { + (boxed as Box) + .downcast() + .ok() + .map(|boxed| *boxed) + } + }) + } + + /// Clear the `ExtensionsInner` in-place, dropping any elements in the map but + /// retaining allocated capacity. + /// + /// This permits the hash map allocation to be pooled by the registry so + /// that future spans will not need to allocate new hashmaps. + pub(crate) fn clear(&mut self) { + self.map.clear(); } } impl fmt::Debug for ExtensionsInner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Extensions").finish() + f.debug_struct("Extensions") + .field("len", &self.map.len()) + .field("capacity", &self.map.capacity()) + .finish() } } -#[test] -fn test_extensions() { +#[cfg(test)] +mod tests { + use super::*; + #[derive(Debug, PartialEq)] struct MyType(i32); - let mut extensions = ExtensionsInner::new(); + #[test] + fn test_extensions() { + let mut extensions = ExtensionsInner::new(); + + extensions.insert(5i32); + extensions.insert(MyType(10)); + + assert_eq!(extensions.get(), Some(&5i32)); + assert_eq!(extensions.get_mut(), Some(&mut 5i32)); + + assert_eq!(extensions.remove::(), Some(5i32)); + assert!(extensions.get::().is_none()); + + assert_eq!(extensions.get::(), None); + assert_eq!(extensions.get(), Some(&MyType(10))); + } + + #[test] + fn clear_retains_capacity() { + let mut extensions = ExtensionsInner::new(); + extensions.insert(5i32); + extensions.insert(MyType(10)); + extensions.insert(true); + + assert_eq!(extensions.map.len(), 3); + let prev_capacity = extensions.map.capacity(); + extensions.clear(); + + assert_eq!( + extensions.map.len(), + 0, + "after clear(), extensions map should have length 0" + ); + assert_eq!( + extensions.map.capacity(), + prev_capacity, + "after clear(), extensions map should retain prior capacity" + ); + } + + #[test] + fn clear_drops_elements() { + use std::sync::Arc; + struct DropMePlease(Arc<()>); + struct DropMeTooPlease(Arc<()>); - extensions.insert(5i32); - extensions.insert(MyType(10)); + let mut extensions = ExtensionsInner::new(); + let val1 = DropMePlease(Arc::new(())); + let val2 = DropMeTooPlease(Arc::new(())); - assert_eq!(extensions.get(), Some(&5i32)); - assert_eq!(extensions.get_mut(), Some(&mut 5i32)); + let val1_dropped = Arc::downgrade(&val1.0); + let val2_dropped = Arc::downgrade(&val2.0); + extensions.insert(val1); + extensions.insert(val2); - assert_eq!(extensions.remove::(), Some(5i32)); - assert!(extensions.get::().is_none()); + assert!(val1_dropped.upgrade().is_some()); + assert!(val2_dropped.upgrade().is_some()); - assert_eq!(extensions.get::(), None); - assert_eq!(extensions.get(), Some(&MyType(10))); + extensions.clear(); + assert!( + val1_dropped.upgrade().is_none(), + "after clear(), val1 should be dropped" + ); + assert!( + val2_dropped.upgrade().is_none(), + "after clear(), val2 should be dropped" + ); + } } diff --git a/tracing-subscriber/src/registry/sharded.rs b/tracing-subscriber/src/registry/sharded.rs index e10e4fb8a1..7ba5e87a90 100644 --- a/tracing-subscriber/src/registry/sharded.rs +++ b/tracing-subscriber/src/registry/sharded.rs @@ -1,4 +1,4 @@ -use sharded_slab::{Guard, Slab}; +use sharded_slab::{pool::Ref, Clear, Pool}; use thread_local::ThreadLocal; use super::stack::SpanStack; @@ -47,7 +47,7 @@ use tracing_core::{ #[cfg_attr(docsrs, doc(cfg(feature = "registry")))] #[derive(Debug)] pub struct Registry { - spans: Slab, + spans: Pool, current_spans: ThreadLocal>, } @@ -65,14 +65,23 @@ pub struct Registry { #[cfg_attr(docsrs, doc(cfg(feature = "registry")))] #[derive(Debug)] pub struct Data<'a> { - inner: Guard<'a, DataInner>, + /// Immutable reference to the pooled `DataInner` entry. + inner: Ref<'a, DataInner>, } +/// Stored data associated with a span. +/// +/// This type is pooled using `sharded_slab::Pool`; when a span is dropped, the +/// `DataInner` entry at that span's slab index is cleared in place and reused +/// by a future span. Thus, the `Default` and `sharded_slab::Clear` +/// implementations for this type are load-bearing. #[derive(Debug)] struct DataInner { metadata: &'static Metadata<'static>, parent: Option, ref_count: AtomicUsize, + // The span's `Extensions` typemap. Allocations for the `HashMap` backing + // this are pooled and reused in place. pub(crate) extensions: RwLock, } @@ -81,7 +90,7 @@ struct DataInner { impl Default for Registry { fn default() -> Self { Self { - spans: Slab::new(), + spans: Pool::new(), current_spans: ThreadLocal::new(), } } @@ -126,11 +135,7 @@ pub(crate) struct CloseGuard<'a> { } impl Registry { - fn insert(&self, s: DataInner) -> Option { - self.spans.insert(s) - } - - fn get(&self, id: &Id) -> Option> { + fn get(&self, id: &Id) -> Option> { self.spans.get(id_to_idx(id)) } @@ -180,13 +185,20 @@ impl Collect for Registry { attrs.parent().map(|id| self.clone_span(id)) }; - let s = DataInner { - metadata: attrs.metadata(), - parent, - ref_count: AtomicUsize::new(1), - extensions: RwLock::new(ExtensionsInner::new()), - }; - let id = self.insert(s).expect("Unable to allocate another span"); + let id = self + .spans + // Check out a `DataInner` entry from the pool for the new span. If + // there are free entries already allocated in the pool, this will + // preferentially reuse one; otherwise, a new `DataInner` is + // allocated and added to the pool. + .create_with(|data| { + data.metadata = attrs.metadata(); + data.parent = parent; + let refs = data.ref_count.get_mut(); + debug_assert_eq!(*refs, 0); + *refs = 1; + }) + .expect("Unable to allocate another span"); idx_to_id(id) } @@ -230,7 +242,11 @@ impl Collect for Registry { // calls to `try_close`: we have to ensure that all threads have // dropped their refs to the span before the span is closed. let refs = span.ref_count.fetch_add(1, Ordering::Relaxed); - assert!(refs != 0, "tried to clone a span that already closed"); + assert!( + refs != 0, + "tried to clone a span ({:?}) that already closed", + id + ); id.clone() } @@ -282,34 +298,7 @@ impl<'a> LookupSpan<'a> for Registry { } } -// === impl DataInner === - -impl Drop for DataInner { - // A span is not considered closed until all of its children have closed. - // Therefore, each span's `DataInner` holds a "reference" to the parent - // span, keeping the parent span open until all its children have closed. - // When we close a span, we must then decrement the parent's ref count - // (potentially, allowing it to close, if this child is the last reference - // to that span). - fn drop(&mut self) { - // We have to actually unpack the option inside the `get_default` - // closure, since it is a `FnMut`, but testing that there _is_ a value - // here lets us avoid the thread-local access if we don't need the - // dispatcher at all. - if self.parent.is_some() { - // Note that --- because `Layered::try_close` works by calling - // `try_close` on the inner subscriber and using the return value to - // determine whether to call the `Subscriber`'s `on_close` callback --- - // we must call `try_close` on the entire subscriber stack, rather - // than just on the registry. If the registry called `try_close` on - // itself directly, the layers wouldn't see the close notification. - let subscriber = dispatch::get_default(Dispatch::clone); - if let Some(parent) = self.parent.take() { - let _ = subscriber.try_close(parent); - } - } - } -} +// === impl CloseGuard === impl<'a> CloseGuard<'a> { pub(crate) fn is_closing(&mut self) { @@ -336,7 +325,7 @@ impl<'a> Drop for CloseGuard<'a> { // `on_close` call. If the span is closing, it's okay to remove the // span. if c == 1 && self.is_closing { - self.registry.spans.remove(id_to_idx(&self.id)); + self.registry.spans.clear(id_to_idx(&self.id)); } }); } @@ -366,6 +355,90 @@ impl<'a> SpanData<'a> for Data<'a> { } } +// === impl DataInner === + +impl Default for DataInner { + fn default() -> Self { + // Since `DataInner` owns a `&'static Callsite` pointer, we need + // something to use as the initial default value for that callsite. + // Since we can't access a `DataInner` until it has had actual span data + // inserted into it, the null metadata will never actually be accessed. + struct NullCallsite; + impl tracing_core::callsite::Callsite for NullCallsite { + fn set_interest(&self, _: Interest) { + unreachable!( + "/!\\ Tried to register the null callsite /!\\\n \ + This should never have happened and is definitely a bug. \ + A `tracing` bug report would be appreciated." + ) + } + + fn metadata(&self) -> &Metadata<'_> { + unreachable!( + "/!\\ Tried to access the null callsite's metadata /!\\\n \ + This should never have happened and is definitely a bug. \ + A `tracing` bug report would be appreciated." + ) + } + } + + static NULL_CALLSITE: NullCallsite = NullCallsite; + static NULL_METADATA: Metadata<'static> = tracing_core::metadata! { + name: "", + target: "", + level: tracing_core::Level::TRACE, + fields: &[], + callsite: &NULL_CALLSITE, + kind: tracing_core::metadata::Kind::SPAN, + }; + + Self { + metadata: &NULL_METADATA, + parent: None, + ref_count: AtomicUsize::new(0), + extensions: RwLock::new(ExtensionsInner::new()), + } + } +} + +impl Clear for DataInner { + /// Clears the span's data in place, dropping the parent's reference count. + fn clear(&mut self) { + // A span is not considered closed until all of its children have closed. + // Therefore, each span's `DataInner` holds a "reference" to the parent + // span, keeping the parent span open until all its children have closed. + // When we close a span, we must then decrement the parent's ref count + // (potentially, allowing it to close, if this child is the last reference + // to that span). + // We have to actually unpack the option inside the `get_default` + // closure, since it is a `FnMut`, but testing that there _is_ a value + // here lets us avoid the thread-local access if we don't need the + // dispatcher at all. + if self.parent.is_some() { + // Note that --- because `Layered::try_close` works by calling + // `try_close` on the inner subscriber and using the return value to + // determine whether to call the `Layer`'s `on_close` callback --- + // we must call `try_close` on the entire subscriber stack, rather + // than just on the registry. If the registry called `try_close` on + // itself directly, the layers wouldn't see the close notification. + let subscriber = dispatch::get_default(Dispatch::clone); + if let Some(parent) = self.parent.take() { + let _ = subscriber.try_close(parent); + } + } + + // Clear (but do not deallocate!) the pooled `HashMap` for the span's extensions. + self.extensions + .get_mut() + .unwrap_or_else(|l| { + // This function can be called in a `Drop` impl, such as while + // panicking, so ignore lock poisoning. + l.into_inner() + }) + .clear(); + } +} + #[cfg(test)] mod tests { use super::Registry; diff --git a/tracing-subscriber/src/sync.rs b/tracing-subscriber/src/sync.rs index e89a8087e8..f09731f50b 100644 --- a/tracing-subscriber/src/sync.rs +++ b/tracing-subscriber/src/sync.rs @@ -33,6 +33,11 @@ mod parking_lot_impl { } } + #[inline] + pub(crate) fn get_mut(&mut self) -> LockResult<&mut T> { + Ok(self.inner.get_mut()) + } + #[inline] pub(crate) fn read<'a>(&'a self) -> LockResult> { Ok(self.inner.read())