From 48810a0e00fe5646891181c203c84ba62115bba3 Mon Sep 17 00:00:00 2001 From: Thayne McCombs Date: Mon, 15 Nov 2021 01:02:20 -0700 Subject: [PATCH] Add buffering to stdout when it's not a terminal This is based on the work of #736 by @sourlemon207. I've added the suggestion I recommended on that PR. --- src/exec/job.rs | 4 ++-- src/output.rs | 28 ++++++++++++++-------------- src/walk.rs | 41 ++++++++++++++++++++++++++--------------- 3 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/exec/job.rs b/src/exec/job.rs index aa8164cd4..85b30f1b6 100644 --- a/src/exec/job.rs +++ b/src/exec/job.rs @@ -26,7 +26,7 @@ pub fn job( // Obtain the next result from the receiver, else if the channel // has closed, exit from the loop let value: PathBuf = match lock.recv() { - Ok(WorkerResult::Entry(val)) => val, + Ok(WorkerResult::Entry(path)) => path, Ok(WorkerResult::Error(err)) => { if show_filesystem_errors { print_error(err.to_string()); @@ -53,7 +53,7 @@ pub fn batch( limit: usize, ) -> ExitCode { let paths = rx.iter().filter_map(|value| match value { - WorkerResult::Entry(val) => Some(val), + WorkerResult::Entry(path) => Some(path), WorkerResult::Error(err) => { if show_filesystem_errors { print_error(err.to_string()); diff --git a/src/output.rs b/src/output.rs index 194cb2adc..aa1b99a9c 100644 --- a/src/output.rs +++ b/src/output.rs @@ -1,8 +1,7 @@ use std::borrow::Cow; -use std::io::{self, StdoutLock, Write}; +use std::io::{self, Write}; use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; use lscolors::{Indicator, LsColors, Style}; @@ -16,11 +15,11 @@ fn replace_path_separator(path: &str, new_path_separator: &str) -> String { } // TODO: this function is performance critical and can probably be optimized -pub fn print_entry( - stdout: &mut StdoutLock, +pub fn print_entry( + stdout: &mut W, entry: &Path, config: &Config, - wants_to_quit: &Arc, + wants_to_quit: &AtomicBool, ) { let path = if entry.is_absolute() { entry @@ -46,12 +45,12 @@ pub fn print_entry( } // TODO: this function is performance critical and can probably be optimized -fn print_entry_colorized( - stdout: &mut StdoutLock, +fn print_entry_colorized( + stdout: &mut W, path: &Path, config: &Config, ls_colors: &LsColors, - wants_to_quit: &Arc, + wants_to_quit: &AtomicBool, ) -> io::Result<()> { // Split the path between the parent and the last component let mut offset = 0; @@ -94,6 +93,7 @@ fn print_entry_colorized( } if wants_to_quit.load(Ordering::Relaxed) { + stdout.flush()?; ExitCode::KilledBySigint.exit(); } @@ -101,8 +101,8 @@ fn print_entry_colorized( } // TODO: this function is performance critical and can probably be optimized -fn print_entry_uncolorized_base( - stdout: &mut StdoutLock, +fn print_entry_uncolorized_base( + stdout: &mut W, path: &Path, config: &Config, ) -> io::Result<()> { @@ -116,8 +116,8 @@ fn print_entry_uncolorized_base( } #[cfg(not(unix))] -fn print_entry_uncolorized( - stdout: &mut StdoutLock, +fn print_entry_uncolorized( + stdout: &mut W, path: &Path, config: &Config, ) -> io::Result<()> { @@ -125,8 +125,8 @@ fn print_entry_uncolorized( } #[cfg(unix)] -fn print_entry_uncolorized( - stdout: &mut StdoutLock, +fn print_entry_uncolorized( + stdout: &mut W, path: &Path, config: &Config, ) -> io::Result<()> { diff --git a/src/walk.rs b/src/walk.rs index ed3bf9f5e..61e38e4d9 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -1,13 +1,13 @@ -use std::borrow::Cow; use std::ffi::OsStr; use std::fs::{FileType, Metadata}; use std::io; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender as Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time; +use std::{borrow::Cow, io::Write}; use anyhow::{anyhow, Result}; use ignore::overrides::OverrideBuilder; @@ -22,6 +22,12 @@ use crate::exit_codes::{merge_exitcodes, ExitCode}; use crate::filesystem; use crate::output; +pub const WORKER_CHANNEL_DEFAULT_BOUND: usize = 1000; + +pub fn make_worker_channel() -> (Sender, Receiver) { + sync_channel(WORKER_CHANNEL_DEFAULT_BOUND) +} + /// The receiver thread can either be buffering results or directly streaming to the console. enum ReceiverMode { /// Receiver is still buffering in order to sort the results, if the search finishes fast @@ -53,7 +59,7 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) -> R let first_path_buf = path_iter .next() .expect("Error: Path vector can not be empty"); - let (tx, rx) = channel(); + let (tx, rx) = make_worker_channel(); let mut override_builder = OverrideBuilder::new(first_path_buf.as_path()); @@ -220,8 +226,6 @@ fn spawn_receiver( } else { let start = time::Instant::now(); - let mut buffer = vec![]; - // Start in buffering mode let mut mode = ReceiverMode::Buffering; @@ -229,45 +233,53 @@ fn spawn_receiver( let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME); let stdout = io::stdout(); - let mut stdout = stdout.lock(); + let stdout = stdout.lock(); + let mut stdout = io::BufWriter::new(stdout); let mut num_results = 0; - + let is_interactive = config.interactive_terminal; + let mut buffer = Vec::with_capacity(MAX_BUFFER_LENGTH); for worker_result in rx { match worker_result { - WorkerResult::Entry(value) => { + WorkerResult::Entry(path) => { if config.quiet { return ExitCode::HasResults(true); } match mode { ReceiverMode::Buffering => { - buffer.push(value); + buffer.push(path); // Have we reached the maximum buffer size or maximum buffering time? if buffer.len() > MAX_BUFFER_LENGTH || start.elapsed() > max_buffer_time { // Flush the buffer - for v in &buffer { + for path in &buffer { output::print_entry( &mut stdout, - v, + path, &config, &wants_to_quit, ); } buffer.clear(); - + if is_interactive && stdout.flush().is_err() { + // Probably a broken pipe. Exit gracefully. + return ExitCode::GeneralError; + } // Start streaming mode = ReceiverMode::Streaming; } } ReceiverMode::Streaming => { - output::print_entry(&mut stdout, &value, &config, &wants_to_quit); + output::print_entry(&mut stdout, &path, &config, &wants_to_quit); + if is_interactive && stdout.flush().is_err() { + // Probably a broken pipe. Exit gracefully. + return ExitCode::GeneralError; + } } } - num_results += 1; if let Some(max_results) = config.max_results { if num_results >= max_results { @@ -282,7 +294,6 @@ fn spawn_receiver( } } } - // If we have finished fast enough (faster than max_buffer_time), we haven't streamed // anything to the console, yet. In this case, sort the results and print them: buffer.sort();