Skip to content

Commit

Permalink
Add buffering to stdout when it's not a terminal
Browse files Browse the repository at this point in the history
This is based on the work of sharkdp#736 by @sourlemon207.

I've added the suggestion I recommended on that PR.
  • Loading branch information
tmccombs committed Nov 15, 2021
1 parent fe99270 commit 48810a0
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 31 deletions.
4 changes: 2 additions & 2 deletions src/exec/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub fn job(
// Obtain the next result from the receiver, else if the channel
// has closed, exit from the loop
let value: PathBuf = match lock.recv() {
Ok(WorkerResult::Entry(val)) => val,
Ok(WorkerResult::Entry(path)) => path,
Ok(WorkerResult::Error(err)) => {
if show_filesystem_errors {
print_error(err.to_string());
Expand All @@ -53,7 +53,7 @@ pub fn batch(
limit: usize,
) -> ExitCode {
let paths = rx.iter().filter_map(|value| match value {
WorkerResult::Entry(val) => Some(val),
WorkerResult::Entry(path) => Some(path),
WorkerResult::Error(err) => {
if show_filesystem_errors {
print_error(err.to_string());
Expand Down
28 changes: 14 additions & 14 deletions src/output.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::borrow::Cow;
use std::io::{self, StdoutLock, Write};
use std::io::{self, Write};
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use lscolors::{Indicator, LsColors, Style};

Expand All @@ -16,11 +15,11 @@ fn replace_path_separator(path: &str, new_path_separator: &str) -> String {
}

// TODO: this function is performance critical and can probably be optimized
pub fn print_entry(
stdout: &mut StdoutLock,
pub fn print_entry<W: Write>(
stdout: &mut W,
entry: &Path,
config: &Config,
wants_to_quit: &Arc<AtomicBool>,
wants_to_quit: &AtomicBool,
) {
let path = if entry.is_absolute() {
entry
Expand All @@ -46,12 +45,12 @@ pub fn print_entry(
}

// TODO: this function is performance critical and can probably be optimized
fn print_entry_colorized(
stdout: &mut StdoutLock,
fn print_entry_colorized<W: Write>(
stdout: &mut W,
path: &Path,
config: &Config,
ls_colors: &LsColors,
wants_to_quit: &Arc<AtomicBool>,
wants_to_quit: &AtomicBool,
) -> io::Result<()> {
// Split the path between the parent and the last component
let mut offset = 0;
Expand Down Expand Up @@ -94,15 +93,16 @@ fn print_entry_colorized(
}

if wants_to_quit.load(Ordering::Relaxed) {
stdout.flush()?;
ExitCode::KilledBySigint.exit();
}

Ok(())
}

// TODO: this function is performance critical and can probably be optimized
fn print_entry_uncolorized_base(
stdout: &mut StdoutLock,
fn print_entry_uncolorized_base<W: Write>(
stdout: &mut W,
path: &Path,
config: &Config,
) -> io::Result<()> {
Expand All @@ -116,17 +116,17 @@ fn print_entry_uncolorized_base(
}

#[cfg(not(unix))]
fn print_entry_uncolorized(
stdout: &mut StdoutLock,
fn print_entry_uncolorized<W: Write>(
stdout: &mut W,
path: &Path,
config: &Config,
) -> io::Result<()> {
print_entry_uncolorized_base(stdout, path, config)
}

#[cfg(unix)]
fn print_entry_uncolorized(
stdout: &mut StdoutLock,
fn print_entry_uncolorized<W: Write>(
stdout: &mut W,
path: &Path,
config: &Config,
) -> io::Result<()> {
Expand Down
41 changes: 26 additions & 15 deletions src/walk.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::borrow::Cow;
use std::ffi::OsStr;
use std::fs::{FileType, Metadata};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender as Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time;
use std::{borrow::Cow, io::Write};

use anyhow::{anyhow, Result};
use ignore::overrides::OverrideBuilder;
Expand All @@ -22,6 +22,12 @@ use crate::exit_codes::{merge_exitcodes, ExitCode};
use crate::filesystem;
use crate::output;

pub const WORKER_CHANNEL_DEFAULT_BOUND: usize = 1000;

pub fn make_worker_channel() -> (Sender<WorkerResult>, Receiver<WorkerResult>) {
sync_channel(WORKER_CHANNEL_DEFAULT_BOUND)
}

/// The receiver thread can either be buffering results or directly streaming to the console.
enum ReceiverMode {
/// Receiver is still buffering in order to sort the results, if the search finishes fast
Expand Down Expand Up @@ -53,7 +59,7 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc<Regex>, config: Arc<Config>) -> R
let first_path_buf = path_iter
.next()
.expect("Error: Path vector can not be empty");
let (tx, rx) = channel();
let (tx, rx) = make_worker_channel();

let mut override_builder = OverrideBuilder::new(first_path_buf.as_path());

Expand Down Expand Up @@ -220,54 +226,60 @@ fn spawn_receiver(
} else {
let start = time::Instant::now();

let mut buffer = vec![];

// Start in buffering mode
let mut mode = ReceiverMode::Buffering;

// Maximum time to wait before we start streaming to the console.
let max_buffer_time = config.max_buffer_time.unwrap_or(DEFAULT_MAX_BUFFER_TIME);

let stdout = io::stdout();
let mut stdout = stdout.lock();
let stdout = stdout.lock();
let mut stdout = io::BufWriter::new(stdout);

let mut num_results = 0;

let is_interactive = config.interactive_terminal;
let mut buffer = Vec::with_capacity(MAX_BUFFER_LENGTH);
for worker_result in rx {
match worker_result {
WorkerResult::Entry(value) => {
WorkerResult::Entry(path) => {
if config.quiet {
return ExitCode::HasResults(true);
}

match mode {
ReceiverMode::Buffering => {
buffer.push(value);
buffer.push(path);

// Have we reached the maximum buffer size or maximum buffering time?
if buffer.len() > MAX_BUFFER_LENGTH
|| start.elapsed() > max_buffer_time
{
// Flush the buffer
for v in &buffer {
for path in &buffer {
output::print_entry(
&mut stdout,
v,
path,
&config,
&wants_to_quit,
);
}
buffer.clear();

if is_interactive && stdout.flush().is_err() {
// Probably a broken pipe. Exit gracefully.
return ExitCode::GeneralError;
}
// Start streaming
mode = ReceiverMode::Streaming;
}
}
ReceiverMode::Streaming => {
output::print_entry(&mut stdout, &value, &config, &wants_to_quit);
output::print_entry(&mut stdout, &path, &config, &wants_to_quit);
if is_interactive && stdout.flush().is_err() {
// Probably a broken pipe. Exit gracefully.
return ExitCode::GeneralError;
}
}
}

num_results += 1;
if let Some(max_results) = config.max_results {
if num_results >= max_results {
Expand All @@ -282,7 +294,6 @@ fn spawn_receiver(
}
}
}

// If we have finished fast enough (faster than max_buffer_time), we haven't streamed
// anything to the console, yet. In this case, sort the results and print them:
buffer.sort();
Expand Down

0 comments on commit 48810a0

Please sign in to comment.