Skip to content

Commit

Permalink
Add timestamp to CSV output
Browse files Browse the repository at this point in the history
  • Loading branch information
sunsided committed Jul 6, 2024
1 parent cab0cc0 commit 07c94dc
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 5 deletions.
8 changes: 8 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,12 @@ pub struct Dump {
help = "The file in which to store raw data"
)]
pub raw: Option<PathBuf>,

#[arg(
short,
long,
value_name = "DIRECTORY",
help = "The directory in which to store data"
)]
pub dir: PathBuf,
}
100 changes: 95 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
extern crate core;

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use async_compression::tokio::write::GzipEncoder;
use async_compression::Level;
use clap::Parser;
use color_eyre::eyre::Result;
pub use ratatui::prelude::*;
use serial_sensors_proto::versions::Version1DataFrame;
use serial_sensors_proto::{deserialize, DeserializationError};
use serial_sensors_proto::{deserialize, DeserializationError, SensorData, SensorId};
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt, BufWriter};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio_serial::{DataBits, FlowControl, Parity, SerialPortBuilderExt, SerialStream, StopBits};

use crate::app::App;
use crate::cli::{Cli, Commands, Dump};
use crate::cli::{Cli, Commands};
use crate::data_buffer::SensorDataBuffer;
use crate::utils::{initialize_logging, initialize_panic_handler};

Expand Down Expand Up @@ -101,7 +104,7 @@ async fn main() -> Result<()> {
tokio::spawn(decoder(receiver, frames_tx));

// Process frames.
dump_data(args, frames_rx).await?;
dump_data(args.dir, frames_rx).await?;
}
}

Expand Down Expand Up @@ -145,14 +148,101 @@ async fn dump_raw_gzipped(
// TODO: Add rendezvous on CTRL-C
}

async fn dump_data(_args: Dump, mut rx: UnboundedReceiver<Version1DataFrame>) -> Result<()> {
async fn dump_data(directory: PathBuf, mut rx: UnboundedReceiver<Version1DataFrame>) -> Result<()> {
let mut files: HashMap<SensorId, BufWriter<File>> = HashMap::new();

loop {
let now = SystemTime::now();
let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");

if let Some(data) = rx.recv().await {
println!("Data received: {:?}", data);
let target = SensorId::from(&data);

match files.entry(target.clone()) {
Entry::Occupied(mut entry) => {
let data = create_data_row(since_the_epoch, target, &data);
entry.get_mut().write_all(&data).await?;
}
Entry::Vacant(entry) => {
let file_name = format!(
"{}-{}x{}.csv",
target.tag(),
target.num_components().unwrap_or(0),
target.value_type() as u8
);
println!("New sensor; creating new file: {file_name}");
let path = directory.join(file_name);
let file = match File::create(path).await {
Ok(file) => file,
Err(e) => {
return Err(e.into());
}
};

let writer = entry.insert(BufWriter::new(file));

// Create header row.
let header = create_header_row(&data);
writer.write_all(&header).await?;

// Create first data row.
let data = create_data_row(since_the_epoch, target, &data);
writer.write_all(&data).await?;

writer.flush().await?;
}
};
}
}
}

fn create_header_row(data: &Version1DataFrame) -> Vec<u8> {
let mut row = String::from("host_time,sensor_tag,num_components,value_type");
match data.value {
SensorData::SystemClockFrequency(_) => {}
SensorData::AccelerometerI16(_) => {}
SensorData::MagnetometerI16(_) => {}
SensorData::TemperatureI16(_) => {}
SensorData::GyroscopeI16(_) => {}
SensorData::HeadingI16(_) => {}
SensorData::EulerAnglesF32(_) => {}
SensorData::OrientationQuaternionF32(_) => {}
SensorData::LinearRanges(_) => {}
SensorData::Identification(_) => {}
}
row.push('\n');
row.as_bytes().into()
}

fn create_data_row(
since_the_epoch: Duration,
target: SensorId,
data: &Version1DataFrame,
) -> Vec<u8> {
let mut row = format!(
"{},{:02X},{},{:02X}",
since_the_epoch.as_secs_f64(),
target.tag(),
target.num_components().unwrap_or(0),
target.value_type() as u8
);
match data.value {
SensorData::SystemClockFrequency(_) => {}
SensorData::AccelerometerI16(_) => {}
SensorData::MagnetometerI16(_) => {}
SensorData::TemperatureI16(_) => {}
SensorData::GyroscopeI16(_) => {}
SensorData::HeadingI16(_) => {}
SensorData::EulerAnglesF32(_) => {}
SensorData::OrientationQuaternionF32(_) => {}
SensorData::LinearRanges(_) => {}
SensorData::Identification(_) => {}
}
row.push('\n');
row.as_bytes().into()
}

async fn decoder(
mut receiver: UnboundedReceiver<Vec<u8>>,
sender: UnboundedSender<Version1DataFrame>,
Expand Down

0 comments on commit 07c94dc

Please sign in to comment.