Skip to content

Commit

Permalink
Add theoretical buffering of gzipped files
Browse files Browse the repository at this point in the history
  • Loading branch information
sunsided committed Jul 6, 2024
1 parent bdd9ea1 commit cab0cc0
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 3 deletions.
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ serde_json = "1.0.120"
json5 = "0.4.1"
ttl-queue = "0.2.0"
num-traits = "0.2.19"
async-compression = { version = "0.4.11", features = ["gzip", "tokio"] }

[patch.crates-io]
# serial-sensors-proto = { git = "https://github.com/sunsided/serial-sensors-proto", features = ["std", "alloc", "unsafe", "quaternion", "micromath"] }
Expand Down
41 changes: 38 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ extern crate core;
use std::sync::Arc;
use std::time::Duration;

use async_compression::tokio::write::GzipEncoder;
use async_compression::Level;
use clap::Parser;
use color_eyre::eyre::Result;
pub use ratatui::prelude::*;
Expand Down Expand Up @@ -71,6 +73,12 @@ async fn main() -> Result<()> {
Commands::Dump(args) => {
// Intercept frames when dumping raw data.
let receiver = if let Some(ref path) = args.raw {
let gzip = path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext == "gz")
.unwrap_or(false);

let file = match File::create(path).await {
Ok(file) => file,
Err(e) => {
Expand All @@ -79,7 +87,11 @@ async fn main() -> Result<()> {
};

let (tx, raw_rx) = unbounded_channel();
tokio::spawn(dump_raw(file, receiver, tx));
if gzip {
tokio::spawn(dump_raw_gzipped(file, receiver, tx));
} else {
tokio::spawn(dump_raw(file, receiver, tx));
}
raw_rx
} else {
receiver
Expand All @@ -101,15 +113,38 @@ async fn dump_raw(
mut rx: UnboundedReceiver<Vec<u8>>,
tx: UnboundedSender<Vec<u8>>,
) -> Result<()> {
let mut buffered_writer = BufWriter::new(file);
let mut writer = BufWriter::new(file);
loop {
if let Some(data) = rx.recv().await {
buffered_writer.write_all(&data).await?;
writer.write_all(&data).await?;
tx.send(data)?;
}
}
}

async fn dump_raw_gzipped(
file: File,
mut rx: UnboundedReceiver<Vec<u8>>,
tx: UnboundedSender<Vec<u8>>,
) -> Result<()> {
let buffered_writer = BufWriter::new(file);
let mut writer = GzipEncoder::with_quality(buffered_writer, Level::Default);
loop {
if let Some(data) = rx.recv().await {
if let Err(e) = writer.write_all(&data).await {
writer.flush().await.ok();
return Err(e.into());
}
if let Err(e) = tx.send(data) {
writer.flush().await.ok();
return Err(e.into());
}
}
}

// TODO: Add rendezvous on CTRL-C
}

async fn dump_data(_args: Dump, mut rx: UnboundedReceiver<Version1DataFrame>) -> Result<()> {
loop {
if let Some(data) = rx.recv().await {
Expand Down

0 comments on commit cab0cc0

Please sign in to comment.