From cab0cc0bece96fa88bde347db2911b8b94daf5c9 Mon Sep 17 00:00:00 2001 From: Markus Mayer Date: Sat, 6 Jul 2024 19:55:58 +0200 Subject: [PATCH] Add theoretical buffering of gzipped files --- Cargo.lock | 33 +++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/main.rs | 41 ++++++++++++++++++++++++++++++++++++++--- 3 files changed, 72 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 27218d5..09b512e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -99,6 +99,19 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +[[package]] +name = "async-compression" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd066d0b4ef8ecb03a55319dc13aa6910616d0f44008a045bb1835af830abff5" +dependencies = [ + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-trait" version = "0.1.80" @@ -391,6 +404,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossterm" version = "0.27.0" @@ -571,6 +593,16 @@ dependencies = [ "once_cell", ] +[[package]] +name = "flate2" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1344,6 +1376,7 @@ name = "serial-sensors" version = "0.1.0" dependencies = [ "anyhow", + "async-compression", "better-panic", "clap", "color-eyre", diff --git a/Cargo.toml b/Cargo.toml index f6dda4a..cadbbd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ serde_json = "1.0.120" json5 = "0.4.1" ttl-queue = "0.2.0" num-traits = "0.2.19" +async-compression = { version = "0.4.11", features = ["gzip", "tokio"] } [patch.crates-io] # serial-sensors-proto = { git = "https://github.com/sunsided/serial-sensors-proto", features = ["std", "alloc", "unsafe", "quaternion", "micromath"] } diff --git a/src/main.rs b/src/main.rs index 0d46731..638f387 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,8 @@ extern crate core; use std::sync::Arc; use std::time::Duration; +use async_compression::tokio::write::GzipEncoder; +use async_compression::Level; use clap::Parser; use color_eyre::eyre::Result; pub use ratatui::prelude::*; @@ -71,6 +73,12 @@ async fn main() -> Result<()> { Commands::Dump(args) => { // Intercept frames when dumping raw data. let receiver = if let Some(ref path) = args.raw { + let gzip = path + .extension() + .and_then(|ext| ext.to_str()) + .map(|ext| ext == "gz") + .unwrap_or(false); + let file = match File::create(path).await { Ok(file) => file, Err(e) => { @@ -79,7 +87,11 @@ async fn main() -> Result<()> { }; let (tx, raw_rx) = unbounded_channel(); - tokio::spawn(dump_raw(file, receiver, tx)); + if gzip { + tokio::spawn(dump_raw_gzipped(file, receiver, tx)); + } else { + tokio::spawn(dump_raw(file, receiver, tx)); + } raw_rx } else { receiver @@ -101,15 +113,38 @@ async fn dump_raw( mut rx: UnboundedReceiver>, tx: UnboundedSender>, ) -> Result<()> { - let mut buffered_writer = BufWriter::new(file); + let mut writer = BufWriter::new(file); loop { if let Some(data) = rx.recv().await { - buffered_writer.write_all(&data).await?; + writer.write_all(&data).await?; tx.send(data)?; } } } +async fn dump_raw_gzipped( + file: File, + mut rx: UnboundedReceiver>, + tx: UnboundedSender>, +) -> Result<()> { + let buffered_writer = BufWriter::new(file); + let mut writer = GzipEncoder::with_quality(buffered_writer, Level::Default); + loop { + if let Some(data) = rx.recv().await { + if let Err(e) = writer.write_all(&data).await { + writer.flush().await.ok(); + return Err(e.into()); + } + if let Err(e) = tx.send(data) { + writer.flush().await.ok(); + return Err(e.into()); + } + } + } + + // TODO: Add rendezvous on CTRL-C +} + async fn dump_data(_args: Dump, mut rx: UnboundedReceiver) -> Result<()> { loop { if let Some(data) = rx.recv().await {