From 43b2b3fbaac2b248031505ab3fd60393687a2ac0 Mon Sep 17 00:00:00 2001 From: Ulrich Hornung Date: Tue, 27 Feb 2024 23:12:04 +0100 Subject: [PATCH 1/4] handle SIGUSR1 directly. not just every 1sec --- src/uu/dd/src/dd.rs | 61 +++++++++++++---- src/uu/dd/src/progress.rs | 141 ++++++++++++++++++++++---------------- 2 files changed, 128 insertions(+), 74 deletions(-) diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index b1c6b563017..fb91f67552c 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -21,6 +21,7 @@ use nix::fcntl::FcntlArg::F_SETFL; #[cfg(any(target_os = "linux", target_os = "android"))] use nix::fcntl::OFlag; use parseargs::Parser; +use progress::ProgUpdateType; use progress::{gen_prog_updater, ProgUpdate, ReadStat, StatusLevel, WriteStat}; use uucore::io::OwnedFileDescriptorOrHandle; @@ -39,10 +40,8 @@ use std::os::unix::{ #[cfg(windows)] use std::os::windows::{fs::MetadataExt, io::AsHandle}; use std::path::Path; -use std::sync::{ - atomic::{AtomicBool, Ordering::Relaxed}, - mpsc, Arc, -}; +use std::sync::atomic::AtomicU8; +use std::sync::{atomic::Ordering::Relaxed, mpsc, Arc}; use std::thread; use std::time::{Duration, Instant}; @@ -94,29 +93,41 @@ struct Settings { /// the first caller each interval will yield true. /// /// When all instances are dropped the background thread will exit on the next interval. -#[derive(Debug, Clone)] pub struct Alarm { interval: Duration, - trigger: Arc, + trigger: Arc, } +const TRIGGER_NONE: u8 = 0; +const TRIGGER_TIMER: u8 = 1; +const TRIGGER_SIGNAL: u8 = 2; + impl Alarm { pub fn with_interval(interval: Duration) -> Self { - let trigger = Arc::new(AtomicBool::default()); + let trigger = Arc::new(AtomicU8::default()); let weak_trigger = Arc::downgrade(&trigger); thread::spawn(move || { while let Some(trigger) = weak_trigger.upgrade() { thread::sleep(interval); - trigger.store(true, Relaxed); + trigger.store(TRIGGER_TIMER, Relaxed); } }); Self { interval, trigger } } - pub fn is_triggered(&self) -> bool { - self.trigger.swap(false, Relaxed) + pub fn manual_trigger_fn(&self) -> Box { + let weak_trigger = Arc::downgrade(&self.trigger); + Box::new(move || { + if let Some(trigger) = weak_trigger.upgrade() { + trigger.store(TRIGGER_SIGNAL, Relaxed); + } + }) + } + + pub fn get_trigger(&self) -> u8 { + self.trigger.swap(TRIGGER_NONE, Relaxed) } pub fn get_interval(&self) -> Duration { @@ -1018,6 +1029,18 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> { // This avoids the need to query the OS monotonic clock for every block. let alarm = Alarm::with_interval(Duration::from_secs(1)); + // The signal handler spawns an own thread that waits for signals. + // When the signal is received, it calls a handler function. + // We inject a handler function that manually triggers the alarm. + #[cfg(target_os = "linux")] + let signal_handler = progress::SignalHandler::install_signal_handler(alarm.manual_trigger_fn()); + #[cfg(target_os = "linux")] + if let Err(e) = &signal_handler { + if Some(StatusLevel::None) != i.settings.status { + eprintln!("Internal dd Warning: Unable to register signal handler \n\t{e}"); + } + } + // Index in the input file where we are reading bytes and in // the output file where we are writing bytes. // @@ -1086,11 +1109,20 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> { // error. rstat += rstat_update; wstat += wstat_update; - if alarm.is_triggered() { - let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false); - prog_tx.send(prog_update).unwrap_or(()); + match alarm.get_trigger() { + TRIGGER_NONE => {} + t @ TRIGGER_TIMER | t @ TRIGGER_SIGNAL => { + let tp = match t { + TRIGGER_TIMER => ProgUpdateType::Periodic, + _ => ProgUpdateType::Signal, + }; + let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), tp); + prog_tx.send(prog_update).unwrap_or(()); + } + _ => {} } } + finalize(o, rstat, wstat, start, &prog_tx, output_thread, truncate) } @@ -1118,12 +1150,13 @@ fn finalize( // Print the final read/write statistics. let wstat = wstat + wstat_update; - let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true); + let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), ProgUpdateType::Final); prog_tx.send(prog_update).unwrap_or(()); // Wait for the output thread to finish output_thread .join() .expect("Failed to join with the output thread."); + Ok(()) } diff --git a/src/uu/dd/src/progress.rs b/src/uu/dd/src/progress.rs index 238b2ab39a2..062fbbe8df6 100644 --- a/src/uu/dd/src/progress.rs +++ b/src/uu/dd/src/progress.rs @@ -11,8 +11,12 @@ //! updater that runs in its own thread. use std::io::Write; use std::sync::mpsc; +#[cfg(target_os = "linux")] +use std::thread::JoinHandle; use std::time::Duration; +#[cfg(target_os = "linux")] +use signal_hook::iterator::Handle; use uucore::{ error::UResult, format::num_format::{FloatVariant, Formatter}, @@ -20,18 +24,12 @@ use uucore::{ use crate::numbers::{to_magnitude_and_suffix, SuffixType}; -// On Linux, we register a signal handler that prints progress updates. -#[cfg(target_os = "linux")] -use signal_hook::consts::signal; -#[cfg(target_os = "linux")] -use std::{ - env, - error::Error, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, -}; +#[derive(PartialEq, Eq)] +pub(crate) enum ProgUpdateType { + Periodic, + Signal, + Final, +} /// Summary statistics for read and write progress of dd for a given duration. pub(crate) struct ProgUpdate { @@ -53,7 +51,7 @@ pub(crate) struct ProgUpdate { /// The status of the write. /// /// True if the write is completed, false if still in-progress. - pub(crate) complete: bool, + pub(crate) update_type: ProgUpdateType, } impl ProgUpdate { @@ -62,13 +60,13 @@ impl ProgUpdate { read_stat: ReadStat, write_stat: WriteStat, duration: Duration, - complete: bool, + update_type: ProgUpdateType, ) -> Self { Self { read_stat, write_stat, duration, - complete, + update_type, } } @@ -433,7 +431,7 @@ pub(crate) fn gen_prog_updater( let mut progress_printed = false; while let Ok(update) = rx.recv() { // Print the final read/write statistics. - if update.complete { + if update.update_type == ProgUpdateType::Final { update.print_final_stats(print_level, progress_printed); return; } @@ -445,6 +443,48 @@ pub(crate) fn gen_prog_updater( } } +#[cfg(target_os = "linux")] +pub(crate) struct SignalHandler { + handle: Handle, + thread: Option>, +} + +#[cfg(target_os = "linux")] +impl SignalHandler { + pub(crate) fn install_signal_handler( + f: Box, + ) -> Result { + use signal_hook::consts::signal::*; + use signal_hook::iterator::Signals; + + let mut signals = Signals::new([SIGUSR1])?; + let handle = signals.handle(); + let thread = std::thread::spawn(move || { + for signal in &mut signals { + match signal { + SIGUSR1 => (*f)(), + _ => unreachable!(), + } + } + }); + + Ok(Self { + handle, + thread: Some(thread), + }) + } +} + +#[cfg(target_os = "linux")] +impl Drop for SignalHandler { + fn drop(&mut self) { + self.handle.close(); + if let Some(thread) = std::mem::take(&mut self.thread) { + thread.join().unwrap(); + } + } +} + /// Return a closure that can be used in its own thread to print progress info. /// /// This function returns a closure that receives [`ProgUpdate`] @@ -459,50 +499,31 @@ pub(crate) fn gen_prog_updater( rx: mpsc::Receiver, print_level: Option, ) -> impl Fn() { - // TODO: SIGINFO: Trigger progress line reprint. BSD-style Linux only. - const SIGUSR1_USIZE: usize = signal::SIGUSR1 as usize; - fn posixly_correct() -> bool { - env::var("POSIXLY_CORRECT").is_ok() - } - fn register_linux_signal_handler(sigval: Arc) -> Result<(), Box> { - if !posixly_correct() { - signal_hook::flag::register_usize(signal::SIGUSR1, sigval, SIGUSR1_USIZE)?; - } - - Ok(()) - } // -------------------------------------------------------------- move || { - let sigval = Arc::new(AtomicUsize::new(0)); - - register_linux_signal_handler(sigval.clone()).unwrap_or_else(|e| { - if Some(StatusLevel::None) != print_level { - eprintln!("Internal dd Warning: Unable to register signal handler \n\t{e}"); - } - }); - // Holds the state of whether we have printed the current progress. // This is needed so that we know whether or not to print a newline // character before outputting non-progress data. let mut progress_printed = false; while let Ok(update) = rx.recv() { - // Print the final read/write statistics. - if update.complete { - update.print_final_stats(print_level, progress_printed); - return; - } - // (Re)print status line if progress is requested. - if Some(StatusLevel::Progress) == print_level && !update.complete { - update.reprint_prog_line(); - progress_printed = true; - } - // Handle signals and set the signal to un-seen. - // This will print a maximum of 1 time per second, even though it - // should be printing on every SIGUSR1. - if let SIGUSR1_USIZE = sigval.swap(0, Ordering::Relaxed) { - update.print_transfer_stats(progress_printed); - // Reset the progress printed, since print_transfer_stats always prints a newline. - progress_printed = false; + match update.update_type { + ProgUpdateType::Final => { + // Print the final read/write statistics. + update.print_final_stats(print_level, progress_printed); + return; + } + ProgUpdateType::Periodic => { + // (Re)print status line if progress is requested. + if Some(StatusLevel::Progress) == print_level { + update.reprint_prog_line(); + progress_printed = true; + } + } + ProgUpdateType::Signal => { + update.print_transfer_stats(progress_printed); + // Reset the progress printed, since print_transfer_stats always prints a newline. + progress_printed = false; + } } } } @@ -524,7 +545,7 @@ mod tests { ..Default::default() }, duration: Duration::new(1, 0), // one second - complete: false, + update_type: super::ProgUpdateType::Periodic, } } @@ -533,7 +554,7 @@ mod tests { read_stat: ReadStat::default(), write_stat: WriteStat::default(), duration, - complete: false, + update_type: super::ProgUpdateType::Periodic, } } @@ -558,12 +579,12 @@ mod tests { let read_stat = ReadStat::new(1, 2, 3, 4); let write_stat = WriteStat::new(4, 5, 6); let duration = Duration::new(789, 0); - let complete = false; + let update_type = super::ProgUpdateType::Periodic; let prog_update = ProgUpdate { read_stat, write_stat, duration, - complete, + update_type, }; let mut cursor = Cursor::new(vec![]); @@ -580,7 +601,7 @@ mod tests { read_stat: ReadStat::default(), write_stat: WriteStat::default(), duration: Duration::new(1, 0), // one second - complete: false, + update_type: super::ProgUpdateType::Periodic, }; let mut cursor = Cursor::new(vec![]); @@ -636,7 +657,7 @@ mod tests { read_stat: ReadStat::default(), write_stat: WriteStat::default(), duration: Duration::new(1, 0), // one second - complete: false, + update_type: super::ProgUpdateType::Periodic, }; let mut cursor = Cursor::new(vec![]); prog_update @@ -657,7 +678,7 @@ mod tests { read_stat: ReadStat::default(), write_stat: WriteStat::default(), duration: Duration::new(1, 0), // one second - complete: false, + update_type: super::ProgUpdateType::Periodic, }; let mut cursor = Cursor::new(vec![]); let rewrite = true; From a626899416f8cddbd9f83af103e63cca394f881d Mon Sep 17 00:00:00 2001 From: Ulrich Hornung Date: Wed, 28 Feb 2024 22:17:50 +0100 Subject: [PATCH 2/4] reduce cognitive complexity by splitting away part of dd_copy --- src/uu/dd/src/dd.rs | 44 +++++++++++++++++++++++---------------- src/uu/dd/src/progress.rs | 1 + 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index fb91f67552c..c3fca2fa1c0 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -86,8 +86,10 @@ struct Settings { /// A timer which triggers on a given interval /// -/// After being constructed with [`Alarm::with_interval`], [`Alarm::is_triggered`] -/// will return true once per the given [`Duration`]. +/// After being constructed with [`Alarm::with_interval`], [`Alarm::get_trigger`] +/// will return [`TRIGGER_TIMER`] once per the given [`Duration`]. +/// Alarm can be manually triggered with closure returned by [`Alarm::manual_trigger_fn`]. +/// [`Alarm::get_trigger`] will return [`TRIGGER_SIGNAL`] in this case. /// /// Can be cloned, but the trigger status is shared across all instances so only /// the first caller each interval will yield true. @@ -933,6 +935,27 @@ impl<'a> BlockWriter<'a> { } } +fn flush_caches_full_length(i: &Input, o: &Output) -> std::io::Result<()> { + // TODO Better error handling for overflowing `len`. + if i.settings.iflags.nocache { + let offset = 0; + #[allow(clippy::useless_conversion)] + let len = i.src.len()?.try_into().unwrap(); + i.discard_cache(offset, len); + } + // Similarly, discard the system cache for the output file. + // + // TODO Better error handling for overflowing `len`. + if i.settings.oflags.nocache { + let offset = 0; + #[allow(clippy::useless_conversion)] + let len = o.dst.len()?.try_into().unwrap(); + o.discard_cache(offset, len); + } + + Ok(()) +} + /// Copy the given input data to this output, consuming both. /// /// This method contains the main loop for the `dd` program. Bytes @@ -992,22 +1015,7 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> { // requests that we inform the system that we no longer // need the contents of the input file in a system cache. // - // TODO Better error handling for overflowing `len`. - if i.settings.iflags.nocache { - let offset = 0; - #[allow(clippy::useless_conversion)] - let len = i.src.len()?.try_into().unwrap(); - i.discard_cache(offset, len); - } - // Similarly, discard the system cache for the output file. - // - // TODO Better error handling for overflowing `len`. - if i.settings.oflags.nocache { - let offset = 0; - #[allow(clippy::useless_conversion)] - let len = o.dst.len()?.try_into().unwrap(); - o.discard_cache(offset, len); - } + flush_caches_full_length(&i, &o)?; return finalize( BlockWriter::Unbuffered(o), rstat, diff --git a/src/uu/dd/src/progress.rs b/src/uu/dd/src/progress.rs index 062fbbe8df6..268b3d5f4ea 100644 --- a/src/uu/dd/src/progress.rs +++ b/src/uu/dd/src/progress.rs @@ -443,6 +443,7 @@ pub(crate) fn gen_prog_updater( } } +/// signal handler listens for SIGUSR1 signal and runs provided closure. #[cfg(target_os = "linux")] pub(crate) struct SignalHandler { handle: Handle, From a35dafcd308d90cb27f0e8d9493de57e38bffe16 Mon Sep 17 00:00:00 2001 From: Ulrich Hornung Date: Sat, 9 Mar 2024 21:38:16 +0100 Subject: [PATCH 3/4] consider "fullblock" cmd line arg also for block writes --- src/uu/dd/src/dd.rs | 48 +++++++++++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index c3fca2fa1c0..7ab55c65a6b 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -87,9 +87,9 @@ struct Settings { /// A timer which triggers on a given interval /// /// After being constructed with [`Alarm::with_interval`], [`Alarm::get_trigger`] -/// will return [`TRIGGER_TIMER`] once per the given [`Duration`]. +/// will return [`ALARM_TRIGGER_TIMER`] once per the given [`Duration`]. /// Alarm can be manually triggered with closure returned by [`Alarm::manual_trigger_fn`]. -/// [`Alarm::get_trigger`] will return [`TRIGGER_SIGNAL`] in this case. +/// [`Alarm::get_trigger`] will return [`ALARM_TRIGGER_SIGNAL`] in this case. /// /// Can be cloned, but the trigger status is shared across all instances so only /// the first caller each interval will yield true. @@ -100,9 +100,9 @@ pub struct Alarm { trigger: Arc, } -const TRIGGER_NONE: u8 = 0; -const TRIGGER_TIMER: u8 = 1; -const TRIGGER_SIGNAL: u8 = 2; +pub const ALARM_TRIGGER_NONE: u8 = 0; +pub const ALARM_TRIGGER_TIMER: u8 = 1; +pub const ALARM_TRIGGER_SIGNAL: u8 = 2; impl Alarm { pub fn with_interval(interval: Duration) -> Self { @@ -112,7 +112,7 @@ impl Alarm { thread::spawn(move || { while let Some(trigger) = weak_trigger.upgrade() { thread::sleep(interval); - trigger.store(TRIGGER_TIMER, Relaxed); + trigger.store(ALARM_TRIGGER_TIMER, Relaxed); } }); @@ -123,13 +123,13 @@ impl Alarm { let weak_trigger = Arc::downgrade(&self.trigger); Box::new(move || { if let Some(trigger) = weak_trigger.upgrade() { - trigger.store(TRIGGER_SIGNAL, Relaxed); + trigger.store(ALARM_TRIGGER_SIGNAL, Relaxed); } }) } pub fn get_trigger(&self) -> u8 { - self.trigger.swap(TRIGGER_NONE, Relaxed) + self.trigger.swap(ALARM_TRIGGER_NONE, Relaxed) } pub fn get_interval(&self) -> Duration { @@ -831,6 +831,30 @@ impl<'a> Output<'a> { } } + /// writes a block of data. optionally retries when first try didn't complete + /// + /// this is needed by gnu-test: tests/dd/stats.s + /// the write can be interrupted by a system signal. + /// e.g. SIGUSR1 which is send to report status + /// without retry, the data might not be fully written to destination. + fn write_block(&mut self, chunk: &[u8]) -> io::Result { + let full_len = chunk.len(); + let mut base_idx = 0; + loop { + match self.dst.write(&chunk[base_idx..]) { + Ok(wlen) => { + base_idx += wlen; + // take iflags.fullblock as oflags shall not have this option + if (base_idx >= full_len) || !self.settings.iflags.fullblock { + return Ok(base_idx); + } + } + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + } + } + } + /// Write the given bytes one block at a time. /// /// This may write partial blocks (for example, if the underlying @@ -844,7 +868,7 @@ impl<'a> Output<'a> { let mut bytes_total = 0; for chunk in buf.chunks(self.settings.obs) { - let wlen = self.dst.write(chunk)?; + let wlen = self.write_block(chunk)?; if wlen < self.settings.obs { writes_partial += 1; } else { @@ -1118,10 +1142,10 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> { rstat += rstat_update; wstat += wstat_update; match alarm.get_trigger() { - TRIGGER_NONE => {} - t @ TRIGGER_TIMER | t @ TRIGGER_SIGNAL => { + ALARM_TRIGGER_NONE => {} + t @ ALARM_TRIGGER_TIMER | t @ ALARM_TRIGGER_SIGNAL => { let tp = match t { - TRIGGER_TIMER => ProgUpdateType::Periodic, + ALARM_TRIGGER_TIMER => ProgUpdateType::Periodic, _ => ProgUpdateType::Signal, }; let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), tp); From 174e9a0af98dd55d0582d983cfc947fca41b7ca2 Mon Sep 17 00:00:00 2001 From: Ulrich Hornung Date: Sat, 23 Mar 2024 16:51:12 +0100 Subject: [PATCH 4/4] add documentation --- src/uu/dd/src/dd.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index 7ab55c65a6b..24fab1e2fb3 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -105,6 +105,7 @@ pub const ALARM_TRIGGER_TIMER: u8 = 1; pub const ALARM_TRIGGER_SIGNAL: u8 = 2; impl Alarm { + /// use to construct alarm timer with duration pub fn with_interval(interval: Duration) -> Self { let trigger = Arc::new(AtomicU8::default()); @@ -119,6 +120,11 @@ impl Alarm { Self { interval, trigger } } + /// Returns a closure that allows to manually trigger the alarm + /// + /// This is useful for cases where more than one alarm even source exists + /// In case of `dd` there is the SIGUSR1/SIGINFO case where we want to + /// trigger an manual progress report. pub fn manual_trigger_fn(&self) -> Box { let weak_trigger = Arc::downgrade(&self.trigger); Box::new(move || { @@ -128,10 +134,17 @@ impl Alarm { }) } + /// Use this function to poll for any pending alarm event + /// + /// Returns `ALARM_TRIGGER_NONE` for no pending event. + /// Returns `ALARM_TRIGGER_TIMER` if the event was triggered by timer + /// Returns `ALARM_TRIGGER_SIGNAL` if the event was triggered manually + /// by the closure returned from `manual_trigger_fn` pub fn get_trigger(&self) -> u8 { self.trigger.swap(ALARM_TRIGGER_NONE, Relaxed) } + // Getter function for the configured interval duration pub fn get_interval(&self) -> Duration { self.interval } @@ -959,6 +972,8 @@ impl<'a> BlockWriter<'a> { } } +/// depending on the command line arguments, this function +/// informs the OS to flush/discard the caches for input and/or output file. fn flush_caches_full_length(i: &Input, o: &Output) -> std::io::Result<()> { // TODO Better error handling for overflowing `len`. if i.settings.iflags.nocache {