From 07c94dc3ea9020fa7cd0f7e45f99dae40e68c9f0 Mon Sep 17 00:00:00 2001 From: Markus Mayer Date: Sat, 6 Jul 2024 20:28:31 +0200 Subject: [PATCH] Add timestamp to CSV output --- src/cli.rs | 8 +++++ src/main.rs | 100 +++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 103 insertions(+), 5 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 2529a7b..e58b7aa 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -61,4 +61,12 @@ pub struct Dump { help = "The file in which to store raw data" )] pub raw: Option, + + #[arg( + short, + long, + value_name = "DIRECTORY", + help = "The directory in which to store data" + )] + pub dir: PathBuf, } diff --git a/src/main.rs b/src/main.rs index 638f387..84ce7bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,10 @@ extern crate core; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use async_compression::tokio::write::GzipEncoder; use async_compression::Level; @@ -9,14 +12,14 @@ use clap::Parser; use color_eyre::eyre::Result; pub use ratatui::prelude::*; use serial_sensors_proto::versions::Version1DataFrame; -use serial_sensors_proto::{deserialize, DeserializationError}; +use serial_sensors_proto::{deserialize, DeserializationError, SensorData, SensorId}; use tokio::fs::File; use tokio::io::{self, AsyncReadExt, AsyncWriteExt, BufWriter}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_serial::{DataBits, FlowControl, Parity, SerialPortBuilderExt, SerialStream, StopBits}; use crate::app::App; -use crate::cli::{Cli, Commands, Dump}; +use crate::cli::{Cli, Commands}; use crate::data_buffer::SensorDataBuffer; use crate::utils::{initialize_logging, initialize_panic_handler}; @@ -101,7 +104,7 @@ async fn main() -> Result<()> { tokio::spawn(decoder(receiver, frames_tx)); // Process frames. - dump_data(args, frames_rx).await?; + dump_data(args.dir, frames_rx).await?; } } @@ -145,14 +148,101 @@ async fn dump_raw_gzipped( // TODO: Add rendezvous on CTRL-C } -async fn dump_data(_args: Dump, mut rx: UnboundedReceiver) -> Result<()> { +async fn dump_data(directory: PathBuf, mut rx: UnboundedReceiver) -> Result<()> { + let mut files: HashMap> = HashMap::new(); + loop { + let now = SystemTime::now(); + let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); + if let Some(data) = rx.recv().await { println!("Data received: {:?}", data); + let target = SensorId::from(&data); + + match files.entry(target.clone()) { + Entry::Occupied(mut entry) => { + let data = create_data_row(since_the_epoch, target, &data); + entry.get_mut().write_all(&data).await?; + } + Entry::Vacant(entry) => { + let file_name = format!( + "{}-{}x{}.csv", + target.tag(), + target.num_components().unwrap_or(0), + target.value_type() as u8 + ); + println!("New sensor; creating new file: {file_name}"); + let path = directory.join(file_name); + let file = match File::create(path).await { + Ok(file) => file, + Err(e) => { + return Err(e.into()); + } + }; + + let writer = entry.insert(BufWriter::new(file)); + + // Create header row. + let header = create_header_row(&data); + writer.write_all(&header).await?; + + // Create first data row. + let data = create_data_row(since_the_epoch, target, &data); + writer.write_all(&data).await?; + + writer.flush().await?; + } + }; } } } +fn create_header_row(data: &Version1DataFrame) -> Vec { + let mut row = String::from("host_time,sensor_tag,num_components,value_type"); + match data.value { + SensorData::SystemClockFrequency(_) => {} + SensorData::AccelerometerI16(_) => {} + SensorData::MagnetometerI16(_) => {} + SensorData::TemperatureI16(_) => {} + SensorData::GyroscopeI16(_) => {} + SensorData::HeadingI16(_) => {} + SensorData::EulerAnglesF32(_) => {} + SensorData::OrientationQuaternionF32(_) => {} + SensorData::LinearRanges(_) => {} + SensorData::Identification(_) => {} + } + row.push('\n'); + row.as_bytes().into() +} + +fn create_data_row( + since_the_epoch: Duration, + target: SensorId, + data: &Version1DataFrame, +) -> Vec { + let mut row = format!( + "{},{:02X},{},{:02X}", + since_the_epoch.as_secs_f64(), + target.tag(), + target.num_components().unwrap_or(0), + target.value_type() as u8 + ); + match data.value { + SensorData::SystemClockFrequency(_) => {} + SensorData::AccelerometerI16(_) => {} + SensorData::MagnetometerI16(_) => {} + SensorData::TemperatureI16(_) => {} + SensorData::GyroscopeI16(_) => {} + SensorData::HeadingI16(_) => {} + SensorData::EulerAnglesF32(_) => {} + SensorData::OrientationQuaternionF32(_) => {} + SensorData::LinearRanges(_) => {} + SensorData::Identification(_) => {} + } + row.push('\n'); + row.as_bytes().into() +} + async fn decoder( mut receiver: UnboundedReceiver>, sender: UnboundedSender,