Skip to content

Commit

Permalink
Make tee work more nicely with non-collections (#13652)
Browse files Browse the repository at this point in the history
# Description

This changes the behavior of `tee` to be more transparent when given a
value that isn't a list or range. Previously, anything that wasn't a
byte stream would converted to a list stream using the iterator
implementation, which led to some surprising results. Instead, now, if
the value is a string or binary, it will be treated the same way a byte
stream is, and the output of `tee` is a byte stream instead of the
original value. This is done so that we can synchronize with the other
thread on collect, and potentially capture any error produced by the
closure.

For values that can't be converted to streams, the closure is just run
with a clone of the value instead on another thread. Because we can't
wait for the other thread, there is no way to send an error back to the
original thread, so instead it's just written to stderr using
`report_error_new()`.

There are a couple of follow up edge cases I see where byte streams
aren't necessarily treated exactly the same way strings are, but this
should mostly be a good experience.

Fixes #13489.

# User-Facing Changes

Breaking change.

- `tee` now outputs and sends string/binary stream for string/binary
input.
- `tee` now outputs and sends the original value for any other input
other than lists/ranges.

# Tests + Formatting

Added for new behavior.

# After Submitting

- [ ] release notes: breaking change, command change
  • Loading branch information
devyn committed Sep 1, 2024
1 parent ee997ef commit 39bda89
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 18 deletions.
80 changes: 62 additions & 18 deletions crates/nu-command/src/filters/tee.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use nu_engine::{command_prelude::*, get_eval_block_with_early_return};
use nu_protocol::{
byte_stream::copy_with_signals, engine::Closure, process::ChildPipe, ByteStream,
ByteStreamSource, OutDest, PipelineMetadata, Signals,
byte_stream::copy_with_signals, engine::Closure, process::ChildPipe, report_error_new,
ByteStream, ByteStreamSource, OutDest, PipelineMetadata, Signals,
};
use std::{
io::{self, Read, Write},
sync::mpsc::{self, Sender},
sync::{
mpsc::{self, Sender},
Arc,
},
thread::{self, JoinHandle},
};

Expand Down Expand Up @@ -61,6 +64,11 @@ use it in your pipeline."#
description: "Print numbers and their sum",
result: None,
},
Example {
example: "10000 | tee { 1..$in | print } | $in * 5",
description: "Do something with a value on another thread, while also passing through the value",
result: Some(Value::test_int(50000)),
}
]
}

Expand All @@ -78,8 +86,10 @@ use it in your pipeline."#
let closure_span = closure.span;
let closure = closure.item;

let engine_state_arc = Arc::new(engine_state.clone());

let mut eval_block = {
let closure_engine_state = engine_state.clone();
let closure_engine_state = engine_state_arc.clone();
let mut closure_stack = stack
.captures_to_stack_preserve_out_dest(closure.captures)
.reset_pipes();
Expand All @@ -97,8 +107,15 @@ use it in your pipeline."#
}
};

// Convert values that can be represented as streams into streams. Streams can pass errors
// through later, so if we treat string/binary/list as a stream instead, it's likely that
// we can get the error back to the original thread.
let span = input.span().unwrap_or(head);
let input = input
.try_into_stream(engine_state)
.unwrap_or_else(|original_input| original_input);

if let PipelineData::ByteStream(stream, metadata) = input {
let span = stream.span();
let type_ = stream.type_();

let info = StreamInfo {
Expand Down Expand Up @@ -228,22 +245,37 @@ use it in your pipeline."#
return stderr_misuse(input.span().unwrap_or(head), head);
}

let span = input.span().unwrap_or(head);
let metadata = input.metadata();
let metadata_clone = metadata.clone();
let signals = engine_state.signals().clone();

Ok(tee(input.into_iter(), move |rx| {
let input = rx.into_pipeline_data_with_metadata(span, signals, metadata_clone);
eval_block(input)
})
.err_span(call.head)?
.map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span)))
.into_pipeline_data_with_metadata(
span,
engine_state.signals().clone(),
metadata,
))
if matches!(input, PipelineData::ListStream(..)) {
// Only use the iterator implementation on lists / list streams. We want to be able
// to preserve errors as much as possible, and only the stream implementations can
// really do that
let signals = engine_state.signals().clone();

Ok(tee(input.into_iter(), move |rx| {
let input = rx.into_pipeline_data_with_metadata(span, signals, metadata_clone);
eval_block(input)
})
.err_span(call.head)?
.map(move |result| result.unwrap_or_else(|err| Value::error(err, closure_span)))
.into_pipeline_data_with_metadata(
span,
engine_state.signals().clone(),
metadata,
))
} else {
// Otherwise, we can spawn a thread with the input value, but we have nowhere to
// send an error to other than just trying to print it to stderr.
let value = input.into_value(span)?;
let value_clone = value.clone();
tee_once(engine_state_arc, move || {
eval_block(value_clone.into_pipeline_data_with_metadata(metadata_clone))
})
.err_span(call.head)?;
Ok(value.into_pipeline_data_with_metadata(metadata))
}
}
}

Expand Down Expand Up @@ -314,6 +346,18 @@ where
}))
}

/// "tee" for a single value. No stream handling, just spawns a thread, printing any resulting error
fn tee_once(
engine_state: Arc<EngineState>,
on_thread: impl FnOnce() -> Result<(), ShellError> + Send + 'static,
) -> Result<JoinHandle<()>, std::io::Error> {
thread::Builder::new().name("tee".into()).spawn(move || {
if let Err(err) = on_thread() {
report_error_new(&engine_state, &err);
}
})
}

fn stderr_misuse<T>(span: Span, head: Span) -> Result<T, ShellError> {
Err(ShellError::UnsupportedInput {
msg: "--stderr can only be used on external commands".into(),
Expand Down
20 changes: 20 additions & 0 deletions crates/nu-command/tests/commands/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,23 @@ fn tee_save_stderr_to_file() {
assert_eq!("teststring\n", file_contents(dirs.test().join("copy.txt")));
})
}

#[test]
fn tee_single_value_streamable() {
let actual = nu!("'Hello, world!' | tee { print -e } | print");
assert!(actual.status.success());
assert_eq!("Hello, world!", actual.out);
// FIXME: note the lack of newline: this is a consequence of converting the string to a stream
// for now, but most likely the printer should be checking whether a string stream ends with a
// newline and adding it unless no_newline is true
assert_eq!("Hello, world!", actual.err);
}

#[test]
fn tee_single_value_non_streamable() {
// Non-streamable values don't have any synchronization point, so we have to wait.
let actual = nu!("500 | tee { print -e } | print; sleep 1sec");
assert!(actual.status.success());
assert_eq!("500", actual.out);
assert_eq!("500\n", actual.err);
}
34 changes: 34 additions & 0 deletions crates/nu-protocol/src/pipeline/pipeline_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,40 @@ impl PipelineData {
}
}

/// Converts any `Value` variant that can be represented as a stream into its stream variant.
///
/// This means that lists and ranges are converted into list streams, and strings and binary are
/// converted into byte streams.
///
/// Returns an `Err` with the original stream if the variant couldn't be converted to a stream
/// variant. If the variant is already a stream variant, it is returned as-is.
pub fn try_into_stream(self, engine_state: &EngineState) -> Result<PipelineData, PipelineData> {
let span = self.span().unwrap_or(Span::unknown());
match self {
PipelineData::ListStream(..) | PipelineData::ByteStream(..) => Ok(self),
PipelineData::Value(Value::List { .. } | Value::Range { .. }, ref metadata) => {
let metadata = metadata.clone();
Ok(PipelineData::ListStream(
ListStream::new(self.into_iter(), span, engine_state.signals().clone()),
metadata,
))
}
PipelineData::Value(Value::String { val, .. }, metadata) => {
Ok(PipelineData::ByteStream(
ByteStream::read_string(val, span, engine_state.signals().clone()),
metadata,
))
}
PipelineData::Value(Value::Binary { val, .. }, metadata) => {
Ok(PipelineData::ByteStream(
ByteStream::read_binary(val, span, engine_state.signals().clone()),
metadata,
))
}
_ => Err(self),
}
}

/// Writes all values or redirects all output to the current [`OutDest`]s in `stack`.
///
/// For [`OutDest::Pipe`] and [`OutDest::Capture`], this will return the `PipelineData` as is
Expand Down

0 comments on commit 39bda89

Please sign in to comment.