Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Speed up logging once again (#9981)
Browse files Browse the repository at this point in the history
* Update `tracing`-related dependencies

* Enable `parking_lot` feature in `tracing-subscriber`

* Add an asynchronous stderr logger

* Make clippy happy

* Add an integration test for the logger

* Refactor `test_logger_filters`'s subprocess machinery into a separate function

* Use a child process instead of hooking into stderr for the test

* Add a doc comment for `MakeStderrWriter`

* Move the initialization into the `MakeStderrWriter`'s constructor

* Add an extra test case to trigger the logger's emergency flush mechanism

* Use the buffer's mutex for asynchronous flushes

* Remove vestigial `nix` dependency from one of the previous commits
  • Loading branch information
koute authored Oct 21, 2021
1 parent 485e592 commit 632b323
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 21 deletions.
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion client/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ansi_term = "0.12.1"
atty = "0.2.13"
chrono = "0.4.19"
lazy_static = "1.4.0"
libc = "0.2.95"
log = { version = "0.4.8" }
once_cell = "1.8.0"
parking_lot = "0.11.1"
Expand All @@ -26,7 +27,7 @@ serde = "1.0.126"
thiserror = "1.0.21"
tracing = "0.1.29"
tracing-log = "0.1.2"
tracing-subscriber = "0.2.19"
tracing-subscriber = { version = "0.2.25", features = ["parking_lot"] }
sp-tracing = { version = "4.0.0-dev", path = "../../primitives/tracing" }
sp-rpc = { version = "4.0.0-dev", path = "../../primitives/rpc" }
sp-runtime = { version = "4.0.0-dev", path = "../../primitives/runtime" }
Expand Down
2 changes: 1 addition & 1 deletion client/tracing/src/logging/directives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,5 @@ pub(crate) fn set_reload_handle(handle: Handle<EnvFilter, SCSubscriber>) {
type SCSubscriber<
N = tracing_fmt::format::DefaultFields,
E = crate::logging::EventFormat,
W = fn() -> std::io::Stderr,
W = crate::logging::DefaultLogger,
> = layer::Layered<tracing_fmt::Layer<Registry, N, E, W>, Registry>;
147 changes: 134 additions & 13 deletions client/tracing/src/logging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ mod directives;
mod event_format;
mod fast_local_time;
mod layers;
mod stderr_writer;

pub(crate) type DefaultLogger = stderr_writer::MakeStderrWriter;

pub use directives::*;
pub use sc_tracing_proc_macro::*;
Expand All @@ -47,6 +50,8 @@ pub use event_format::*;
pub use fast_local_time::FastLocalTime;
pub use layers::*;

use stderr_writer::MakeStderrWriter;

/// Logging Result typedef.
pub type Result<T> = std::result::Result<T, Error>;

Expand Down Expand Up @@ -91,7 +96,7 @@ fn prepare_subscriber<N, E, F, W>(
profiling_targets: Option<&str>,
force_colors: Option<bool>,
builder_hook: impl Fn(
SubscriberBuilder<format::DefaultFields, EventFormat, EnvFilter, fn() -> std::io::Stderr>,
SubscriberBuilder<format::DefaultFields, EventFormat, EnvFilter, DefaultLogger>,
) -> SubscriberBuilder<N, E, F, W>,
) -> Result<impl Subscriber + for<'a> LookupSpan<'a>>
where
Expand Down Expand Up @@ -172,7 +177,7 @@ where

let builder = builder.with_span_events(format::FmtSpan::NONE);

let builder = builder.with_writer(std::io::stderr as _);
let builder = builder.with_writer(MakeStderrWriter::default());

let builder = builder.event_format(event_format);

Expand Down Expand Up @@ -282,7 +287,16 @@ impl LoggerBuilder {
mod tests {
use super::*;
use crate as sc_tracing;
use std::{env, process::Command};
use log::info;
use std::{
collections::BTreeMap,
env,
process::Command,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
};
use tracing::{metadata::Kind, subscriber::Interest, Callsite, Level, Metadata};

const EXPECTED_LOG_MESSAGE: &'static str = "yeah logging works as expected";
Expand All @@ -292,9 +306,28 @@ mod tests {
let _ = LoggerBuilder::new(directives).init().unwrap();
}

fn run_test_in_another_process(
test_name: &str,
test_body: impl FnOnce(),
) -> Option<std::process::Output> {
if env::var("RUN_FORKED_TEST").is_ok() {
test_body();
None
} else {
let output = Command::new(env::current_exe().unwrap())
.arg(test_name)
.env("RUN_FORKED_TEST", "1")
.output()
.unwrap();

assert!(output.status.success());
Some(output)
}
}

#[test]
fn test_logger_filters() {
if env::var("RUN_TEST_LOGGER_FILTERS").is_ok() {
run_test_in_another_process("test_logger_filters", || {
let test_directives =
"afg=debug,sync=trace,client=warn,telemetry,something-with-dash=error";
init_logger(&test_directives);
Expand Down Expand Up @@ -331,15 +364,7 @@ mod tests {
assert!(test_filter("telemetry", Level::TRACE));
assert!(test_filter("something-with-dash", Level::ERROR));
});
} else {
let status = Command::new(env::current_exe().unwrap())
.arg("test_logger_filters")
.env("RUN_TEST_LOGGER_FILTERS", "1")
.output()
.unwrap()
.status;
assert!(status.success());
}
});
}

/// This test ensures that using dash (`-`) in the target name in logs and directives actually
Expand Down Expand Up @@ -474,4 +499,100 @@ mod tests {
assert_eq!("MAX_LOG_LEVEL=Trace", run_test(None, Some("test=info".into())));
}
}

// This creates a bunch of threads and makes sure they start executing
// a given callback almost exactly at the same time.
fn run_on_many_threads(thread_count: usize, callback: impl Fn(usize) + 'static + Send + Clone) {
let started_count = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(AtomicBool::new(false));
let threads: Vec<_> = (0..thread_count)
.map(|nth_thread| {
let started_count = started_count.clone();
let barrier = barrier.clone();
let callback = callback.clone();

std::thread::spawn(move || {
started_count.fetch_add(1, Ordering::SeqCst);
while !barrier.load(Ordering::SeqCst) {
std::thread::yield_now();
}

callback(nth_thread);
})
})
.collect();

while started_count.load(Ordering::SeqCst) != thread_count {
std::thread::yield_now();
}
barrier.store(true, Ordering::SeqCst);

for thread in threads {
if let Err(error) = thread.join() {
println!("error: failed to join thread: {:?}", error);
unsafe { libc::abort() }
}
}
}

#[test]
fn parallel_logs_from_multiple_threads_are_properly_gathered() {
const THREAD_COUNT: usize = 128;
const LOGS_PER_THREAD: usize = 1024;

let output = run_test_in_another_process(
"parallel_logs_from_multiple_threads_are_properly_gathered",
|| {
let builder = LoggerBuilder::new("");
builder.init().unwrap();

run_on_many_threads(THREAD_COUNT, |nth_thread| {
for _ in 0..LOGS_PER_THREAD {
info!("Thread <<{}>>", nth_thread);
}
});
},
);

if let Some(output) = output {
let stderr = String::from_utf8(output.stderr).unwrap();
let mut count_per_thread = BTreeMap::new();
for line in stderr.split("\n") {
if let Some(index_s) = line.find("Thread <<") {
let index_s = index_s + "Thread <<".len();
let index_e = line.find(">>").unwrap();
let nth_thread: usize = line[index_s..index_e].parse().unwrap();
*count_per_thread.entry(nth_thread).or_insert(0) += 1;
}
}

assert_eq!(count_per_thread.len(), THREAD_COUNT);
for (_, count) in count_per_thread {
assert_eq!(count, LOGS_PER_THREAD);
}
}
}

#[test]
fn huge_single_line_log_is_properly_printed_out() {
let mut line = String::new();
line.push_str("$$START$$");
for n in 0..16 * 1024 * 1024 {
let ch = b'a' + (n as u8 % (b'z' - b'a'));
line.push(char::from(ch));
}
line.push_str("$$END$$");

let output =
run_test_in_another_process("huge_single_line_log_is_properly_printed_out", || {
let builder = LoggerBuilder::new("");
builder.init().unwrap();
info!("{}", line);
});

if let Some(output) = output {
let stderr = String::from_utf8(output.stderr).unwrap();
assert!(stderr.contains(&line));
}
}
}
Loading

0 comments on commit 632b323

Please sign in to comment.