diff --git a/src/dumping.rs b/src/dumping.rs new file mode 100644 index 0000000..a33060f --- /dev/null +++ b/src/dumping.rs @@ -0,0 +1,315 @@ +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::path::PathBuf; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use async_compression::tokio::write::GzipEncoder; +use async_compression::Level; +use serial_sensors_proto::types::LinearRangeInfo; +use serial_sensors_proto::versions::Version1DataFrame; +use serial_sensors_proto::{ + DataFrame, IdentifierCode, ScalarData, SensorData, SensorId, ValueType, Vector3Data, + Vector4Data, +}; +use tokio::fs::File; +use tokio::io::{AsyncWriteExt, BufWriter}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; + +pub async fn dump_raw( + file: File, + mut rx: UnboundedReceiver>, + tx: UnboundedSender>, +) -> color_eyre::Result<()> { + let mut writer = BufWriter::new(file); + loop { + if let Some(data) = rx.recv().await { + writer.write_all(&data).await?; + tx.send(data)?; + } + } +} + +pub async fn dump_raw_gzipped( + file: File, + mut rx: UnboundedReceiver>, + tx: UnboundedSender>, +) -> color_eyre::Result<()> { + let buffered_writer = BufWriter::new(file); + let mut writer = GzipEncoder::with_quality(buffered_writer, Level::Default); + loop { + if let Some(data) = rx.recv().await { + if let Err(e) = writer.write_all(&data).await { + writer.flush().await.ok(); + return Err(e.into()); + } + if let Err(e) = tx.send(data) { + writer.flush().await.ok(); + return Err(e.into()); + } + } + } + + // TODO: Add rendezvous on CTRL-C +} + +pub async fn dump_data( + directory: PathBuf, + mut rx: UnboundedReceiver, +) -> color_eyre::Result<()> { + let mut files: HashMap> = HashMap::new(); + let mut ranges: HashMap = HashMap::new(); + + loop { + let now = SystemTime::now(); + let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); + + if let Some(data) = rx.recv().await { + println!("Data received: {:?}", data); + let target = SensorId::from(&data); + let sdt = map_data(&data.value); + + let ranges = if let SensorData::LinearRanges(ref info) = data.value { + ranges.insert(data.target(), info.clone()); + ranges.get(&data.target()) + } else { + ranges.get(&target.clone()) + }; + + let data_row = match create_data_row(since_the_epoch, &target, &data, ranges) { + None => continue, + Some(data) => data, + }; + + match files.entry(target.clone()) { + Entry::Occupied(mut entry) => { + entry.get_mut().write_all(&data_row).await?; + entry.get_mut().flush().await?; + } + Entry::Vacant(entry) => { + let file_name = format!( + "{}-{}-{}-x{}.csv", + target.tag(), + sdt.0, + value_type_code(target.value_type()), + target.num_components().unwrap_or(0) + ); + println!("New sensor; creating new file: {file_name}"); + let path = directory.join(file_name); + let file = match File::create(path).await { + Ok(file) => file, + Err(e) => { + return Err(e.into()); + } + }; + + // Create header row. + if let Some(header) = create_header_row(&data) { + let writer = entry.insert(BufWriter::new(file)); + writer.write_all(&header).await?; + writer.write_all(&data_row).await?; + writer.flush().await?; + } + } + }; + } + } +} + +#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)] +struct SensorDataType(&'static str); + +fn map_data(data: &SensorData) -> SensorDataType { + match data { + SensorData::SystemClockFrequency(_) => SensorDataType("clock"), + SensorData::AccelerometerI16(_) => SensorDataType("acc"), + SensorData::MagnetometerI16(_) => SensorDataType("mag"), + SensorData::TemperatureI16(_) => SensorDataType("temp"), + SensorData::GyroscopeI16(_) => SensorDataType("gyro"), + SensorData::HeadingI16(_) => SensorDataType("heading"), + SensorData::EulerAnglesF32(_) => SensorDataType("euler"), + SensorData::OrientationQuaternionF32(_) => SensorDataType("quat"), + SensorData::LinearRanges(_) => SensorDataType("lranges"), + SensorData::Identification(_) => SensorDataType("ident"), + } +} + +fn create_header_row(data: &Version1DataFrame) -> Option> { + let mut row = String::from("host_time,device_time,sensor_tag,num_components,value_type"); + match data.value { + SensorData::SystemClockFrequency(_) => row.push_str(",freq"), + SensorData::AccelerometerI16(_) => row.push_str(",x,y,z,converted_x,converted_y,converted_z"), + SensorData::MagnetometerI16(_) => row.push_str(",x,y,z,converted_x,converted_y,converted_z"), + SensorData::TemperatureI16(_) => row.push_str(",temp,converted_temp"), + SensorData::GyroscopeI16(_) => row.push_str(",x,y,z,converted_x,converted_y,converted_z"), + SensorData::HeadingI16(_) => row.push_str(",heading,converted_heading"), + SensorData::EulerAnglesF32(_) => row.push_str(",x,y,z,converted_x,converted_y,converted_z"), + SensorData::OrientationQuaternionF32(_) => row.push_str(",a,b,c,d,converted_a,converted_b,converted_c,converted_d"), + SensorData::LinearRanges(_) => row.push_str(",resolution_bits,scale_op,scale,scale_raw,scale_decimals,offset,offset_raw,offset_decimals"), + SensorData::Identification(_) => row.push_str(",code,value"), + } + row.push('\n'); + Some(row.as_bytes().into()) +} + +fn create_data_row( + since_the_epoch: Duration, + target: &SensorId, + data: &Version1DataFrame, + ranges: Option<&LinearRangeInfo>, +) -> Option> { + let device_time = decode_device_time(data); + let mut row = format!( + "{},{},{:02X},{},{},", + since_the_epoch.as_secs_f64(), + device_time, + target.tag(), + target.num_components().unwrap_or(0), + value_type_code(target.value_type()) + ); + match data.value { + SensorData::SystemClockFrequency(data) => row.push_str(&format!("{}", data.value)), + SensorData::AccelerometerI16(vec) => { + row.push_str(&format!("{},{},{}", vec.x, vec.y, vec.z)); + csv_convert_push_vec3(&mut row, &vec, &ranges) + } + SensorData::MagnetometerI16(vec) => { + row.push_str(&format!("{},{},{}", vec.x, vec.y, vec.z)); + csv_convert_push_vec3(&mut row, &vec, &ranges) + } + SensorData::TemperatureI16(temp) => { + row.push_str(&format!("{}", temp.value)); + csv_convert_push_scalar(&mut row, &temp, &ranges) + } + SensorData::GyroscopeI16(vec) => { + row.push_str(&format!("{},{},{}", vec.x, vec.y, vec.z)); + csv_convert_push_vec3(&mut row, &vec, &ranges) + } + SensorData::HeadingI16(heading) => { + row.push_str(&format!("{}", heading.value)); + csv_convert_push_scalar(&mut row, &heading, &ranges) + } + SensorData::EulerAnglesF32(vec) => { + row.push_str(&format!("{},{},{}", vec.x, vec.y, vec.z)); + csv_convert_push_vec3(&mut row, &vec, &ranges) + } + SensorData::OrientationQuaternionF32(vec) => { + row.push_str(&format!("{},{},{},{}", vec.a, vec.b, vec.c, vec.d)); + csv_convert_push_vec4(&mut row, &vec, &ranges) + } + SensorData::LinearRanges(ref lr) => row.push_str(&format!( + "{},{:02X},{},{},{},{},{},{}", + lr.resolution_bits, + lr.scale_op, + lr.scale as f32 * 10.0_f32.powi(-(lr.scale_decimals as i32)), + lr.scale, + lr.scale_decimals, + lr.offset as f32 * 10.0_f32.powi(-(lr.offset_decimals as i32)), + lr.offset, + lr.offset_decimals + )), + SensorData::Identification(ref ident) => row.push_str(&format!( + "{},{}", + ident_code(ident.code), + std::str::from_utf8(&ident.value).unwrap_or("").trim() + )), + } + row.push('\n'); + Some(row.as_bytes().into()) +} + +fn decode_device_time(data: &Version1DataFrame) -> f32 { + if data.system_secs != u32::MAX { + data.system_secs as f32 + + if data.system_millis != u16::MAX { + data.system_millis as f32 / 1_000.0 + } else { + 0.0 + } + + if data.system_nanos != u16::MAX { + data.system_nanos as f32 / 1_000_000.0 + } else { + 0.0 + } + } else { + 0.0 + } +} + +fn csv_convert_push_scalar( + string: &mut String, + vec: &ScalarData, + ri: &Option<&LinearRangeInfo>, +) { + if let Some(ri) = ri { + let x = ri.convert(vec.value as f32); + string.push_str(&format!(",{}", x)) + } else { + string.push(',') + } +} + +fn csv_convert_push_vec3( + string: &mut String, + vec: &Vector3Data, + ri: &Option<&LinearRangeInfo>, +) where + T: Into + Copy, +{ + if let Some(ri) = ri { + let x = ri.convert(vec.x.into()); + let y = ri.convert(vec.y.into()); + let z = ri.convert(vec.z.into()); + string.push_str(&format!(",{},{},{}", x, y, z)) + } else { + string.push_str(",,,") + } +} + +fn csv_convert_push_vec4( + string: &mut String, + vec: &Vector4Data, + ri: &Option<&LinearRangeInfo>, +) where + T: Into + Copy, +{ + if let Some(ri) = ri { + let a = ri.convert(vec.a.into()); + let b = ri.convert(vec.b.into()); + let c = ri.convert(vec.c.into()); + let d = ri.convert(vec.d.into()); + string.push_str(&format!(",{},{},{},{}", a, b, c, d)) + } else { + string.push_str(",,,,") + } +} + +fn ident_code(code: IdentifierCode) -> &'static str { + match code { + IdentifierCode::Generic => "generic", + IdentifierCode::Maker => "maker", + IdentifierCode::Product => "product", + IdentifierCode::Revision => "revision", + } +} + +fn value_type_code(vt: ValueType) -> &'static str { + match vt { + ValueType::UInt8 => "u8", + ValueType::SInt8 => "i8", + ValueType::UInt16 => "u16", + ValueType::SInt16 => "i16", + ValueType::UInt32 => "u32", + ValueType::SInt32 => "i32", + ValueType::UInt64 => "u64", + ValueType::SInt64 => "i64", + ValueType::UInt128 => "u128", + ValueType::SInt128 => "i128", + ValueType::Float32 => "f32", + ValueType::Float64 => "f64", + ValueType::Q8_8 => "Q8_8", + ValueType::Q16_16 => "Q16_16", + ValueType::Q32_32 => "Q32_32", + ValueType::LinearRange => "lrange", + ValueType::Identifier => "ident", + } +} diff --git a/src/main.rs b/src/main.rs index 4545e29..7b6c233 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,31 +1,22 @@ extern crate core; -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::path::PathBuf; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::Duration; -use async_compression::tokio::write::GzipEncoder; -use async_compression::Level; use clap::Parser; use color_eyre::eyre::Result; -use num_traits::real::Real; pub use ratatui::prelude::*; -use serial_sensors_proto::types::LinearRangeInfo; use serial_sensors_proto::versions::Version1DataFrame; -use serial_sensors_proto::{ - deserialize, DataFrame, DeserializationError, IdentifierCode, ScalarData, SensorData, SensorId, - ValueType, Vector3Data, Vector4Data, -}; +use serial_sensors_proto::{deserialize, DeserializationError}; use tokio::fs::File; -use tokio::io::{self, AsyncReadExt, AsyncWriteExt, BufWriter}; +use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_serial::{DataBits, FlowControl, Parity, SerialPortBuilderExt, SerialStream, StopBits}; use crate::app::App; use crate::cli::{Cli, Commands}; use crate::data_buffer::SensorDataBuffer; +use crate::dumping::{dump_data, dump_raw, dump_raw_gzipped}; use crate::utils::{initialize_logging, initialize_panic_handler}; mod action; @@ -34,6 +25,7 @@ mod cli; mod components; mod config; mod data_buffer; +mod dumping; mod fps_counter; mod tui; mod utils; @@ -116,223 +108,6 @@ async fn main() -> Result<()> { Ok(()) } -async fn dump_raw( - file: File, - mut rx: UnboundedReceiver>, - tx: UnboundedSender>, -) -> Result<()> { - let mut writer = BufWriter::new(file); - loop { - if let Some(data) = rx.recv().await { - writer.write_all(&data).await?; - tx.send(data)?; - } - } -} - -async fn dump_raw_gzipped( - file: File, - mut rx: UnboundedReceiver>, - tx: UnboundedSender>, -) -> Result<()> { - let buffered_writer = BufWriter::new(file); - let mut writer = GzipEncoder::with_quality(buffered_writer, Level::Default); - loop { - if let Some(data) = rx.recv().await { - if let Err(e) = writer.write_all(&data).await { - writer.flush().await.ok(); - return Err(e.into()); - } - if let Err(e) = tx.send(data) { - writer.flush().await.ok(); - return Err(e.into()); - } - } - } - - // TODO: Add rendezvous on CTRL-C -} - -async fn dump_data(directory: PathBuf, mut rx: UnboundedReceiver) -> Result<()> { - let mut files: HashMap> = HashMap::new(); - let mut ranges: HashMap = HashMap::new(); - - loop { - let now = SystemTime::now(); - let since_the_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards"); - - if let Some(data) = rx.recv().await { - println!("Data received: {:?}", data); - let target = SensorId::from(&data); - let sdt = map_data(&data.value); - - let ranges = if let SensorData::LinearRanges(ref info) = data.value { - ranges.insert(data.target(), info.clone()); - ranges.get(&data.target()) - } else { - ranges.get(&target.clone()) - }; - - let data_row = match create_data_row(since_the_epoch, &target, &data, ranges) { - None => continue, - Some(data) => data, - }; - - match files.entry(target.clone()) { - Entry::Occupied(mut entry) => { - entry.get_mut().write_all(&data_row).await?; - entry.get_mut().flush().await?; - } - Entry::Vacant(entry) => { - let file_name = format!( - "{}-{}-{}-x{}.csv", - target.tag(), - sdt.0, - value_type_code(target.value_type()), - target.num_components().unwrap_or(0) - ); - println!("New sensor; creating new file: {file_name}"); - let path = directory.join(file_name); - let file = match File::create(path).await { - Ok(file) => file, - Err(e) => { - return Err(e.into()); - } - }; - - // Create header row. - if let Some(header) = create_header_row(&data) { - let writer = entry.insert(BufWriter::new(file)); - writer.write_all(&header).await?; - writer.write_all(&data_row).await?; - writer.flush().await?; - } - } - }; - } - } -} - -#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)] -struct SensorDataType(&'static str); - -fn map_data(data: &SensorData) -> SensorDataType { - match data { - SensorData::SystemClockFrequency(_) => SensorDataType("clock"), - SensorData::AccelerometerI16(_) => SensorDataType("acc"), - SensorData::MagnetometerI16(_) => SensorDataType("mag"), - SensorData::TemperatureI16(_) => SensorDataType("temp"), - SensorData::GyroscopeI16(_) => SensorDataType("gyro"), - SensorData::HeadingI16(_) => SensorDataType("heading"), - SensorData::EulerAnglesF32(_) => SensorDataType("euler"), - SensorData::OrientationQuaternionF32(_) => SensorDataType("quat"), - SensorData::LinearRanges(_) => SensorDataType("lranges"), - SensorData::Identification(_) => SensorDataType("ident"), - } -} - -fn create_header_row(data: &Version1DataFrame) -> Option> { - let mut row = String::from("host_time,device_time,sensor_tag,num_components,value_type"); - match data.value { - SensorData::SystemClockFrequency(_) => row.push_str(",freq"), - SensorData::AccelerometerI16(_) => row.push_str(",x,y,z,converted_x,converted_y,converted_z"), - SensorData::MagnetometerI16(_) => row.push_str(",x,y,z,converted_x,converted_y,converted_z"), - SensorData::TemperatureI16(_) => row.push_str(",temp,converted_temp"), - SensorData::GyroscopeI16(_) => row.push_str(",x,y,z,converted_x,converted_y,converted_z"), - SensorData::HeadingI16(_) => row.push_str(",heading,converted_heading"), - SensorData::EulerAnglesF32(_) => row.push_str(",x,y,z,converted_x,converted_y,converted_z"), - SensorData::OrientationQuaternionF32(_) => row.push_str(",a,b,c,d,converted_a,converted_b,converted_c,converted_d"), - SensorData::LinearRanges(_) => row.push_str(",resolution_bits,scale_op,scale,scale_raw,scale_decimals,offset,offset_raw,offset_decimals"), - SensorData::Identification(_) => row.push_str(",code,value"), - } - row.push('\n'); - Some(row.as_bytes().into()) -} - -fn create_data_row( - since_the_epoch: Duration, - target: &SensorId, - data: &Version1DataFrame, - ranges: Option<&LinearRangeInfo>, -) -> Option> { - let device_time = decode_device_time(data); - let mut row = format!( - "{},{},{:02X},{},{},", - since_the_epoch.as_secs_f64(), - device_time, - target.tag(), - target.num_components().unwrap_or(0), - value_type_code(target.value_type()) - ); - match data.value { - SensorData::SystemClockFrequency(data) => row.push_str(&format!("{}", data.value)), - SensorData::AccelerometerI16(vec) => { - row.push_str(&format!("{},{},{}", vec.x, vec.y, vec.z)); - csv_convert_push_vec3(&mut row, &vec, &ranges) - } - SensorData::MagnetometerI16(vec) => { - row.push_str(&format!("{},{},{}", vec.x, vec.y, vec.z)); - csv_convert_push_vec3(&mut row, &vec, &ranges) - } - SensorData::TemperatureI16(temp) => { - row.push_str(&format!("{}", temp.value)); - csv_convert_push_scalar(&mut row, &temp, &ranges) - } - SensorData::GyroscopeI16(vec) => { - row.push_str(&format!("{},{},{}", vec.x, vec.y, vec.z)); - csv_convert_push_vec3(&mut row, &vec, &ranges) - } - SensorData::HeadingI16(heading) => { - row.push_str(&format!("{}", heading.value)); - csv_convert_push_scalar(&mut row, &heading, &ranges) - } - SensorData::EulerAnglesF32(vec) => { - row.push_str(&format!("{},{},{}", vec.x, vec.y, vec.z)); - csv_convert_push_vec3(&mut row, &vec, &ranges) - } - SensorData::OrientationQuaternionF32(vec) => { - row.push_str(&format!("{},{},{},{}", vec.a, vec.b, vec.c, vec.d)); - csv_convert_push_vec4(&mut row, &vec, &ranges) - } - SensorData::LinearRanges(ref lr) => row.push_str(&format!( - "{},{:02X},{},{},{},{},{},{}", - lr.resolution_bits, - lr.scale_op, - lr.scale as f32 * 10.0.powi(-(lr.scale_decimals as i32)), - lr.scale, - lr.scale_decimals, - lr.offset as f32 * 10.0.powi(-(lr.offset_decimals as i32)), - lr.offset, - lr.offset_decimals - )), - SensorData::Identification(ref ident) => row.push_str(&format!( - "{},{}", - ident_code(ident.code), - std::str::from_utf8(&ident.value).unwrap_or("").trim() - )), - } - row.push('\n'); - Some(row.as_bytes().into()) -} - -fn decode_device_time(data: &Version1DataFrame) -> f32 { - if data.system_secs != u32::MAX { - data.system_secs as f32 - + if data.system_millis != u16::MAX { - data.system_millis as f32 / 1_000.0 - } else { - 0.0 - } - + if data.system_nanos != u16::MAX { - data.system_nanos as f32 / 1_000_000.0 - } else { - 0.0 - } - } else { - 0.0 - } -} - async fn decoder( mut receiver: UnboundedReceiver>, sender: UnboundedSender, @@ -423,82 +198,3 @@ impl Drop for RecvObserver { println!("Receive loop finished"); } } - -fn csv_convert_push_scalar( - string: &mut String, - vec: &ScalarData, - ri: &Option<&LinearRangeInfo>, -) { - if let Some(ri) = ri { - let x = ri.convert(vec.value as f32); - string.push_str(&format!(",{}", x)) - } else { - string.push(',') - } -} - -fn csv_convert_push_vec3( - string: &mut String, - vec: &Vector3Data, - ri: &Option<&LinearRangeInfo>, -) where - T: Into + Copy, -{ - if let Some(ri) = ri { - let x = ri.convert(vec.x.into()); - let y = ri.convert(vec.y.into()); - let z = ri.convert(vec.z.into()); - string.push_str(&format!(",{},{},{}", x, y, z)) - } else { - string.push_str(",,,") - } -} - -fn csv_convert_push_vec4( - string: &mut String, - vec: &Vector4Data, - ri: &Option<&LinearRangeInfo>, -) where - T: Into + Copy, -{ - if let Some(ri) = ri { - let a = ri.convert(vec.a.into()); - let b = ri.convert(vec.b.into()); - let c = ri.convert(vec.c.into()); - let d = ri.convert(vec.d.into()); - string.push_str(&format!(",{},{},{},{}", a, b, c, d)) - } else { - string.push_str(",,,,") - } -} - -fn ident_code(code: IdentifierCode) -> &'static str { - match code { - IdentifierCode::Generic => "generic", - IdentifierCode::Maker => "maker", - IdentifierCode::Product => "product", - IdentifierCode::Revision => "revision", - } -} - -fn value_type_code(vt: ValueType) -> &'static str { - match vt { - ValueType::UInt8 => "u8", - ValueType::SInt8 => "i8", - ValueType::UInt16 => "u16", - ValueType::SInt16 => "i16", - ValueType::UInt32 => "u32", - ValueType::SInt32 => "i32", - ValueType::UInt64 => "u64", - ValueType::SInt64 => "i64", - ValueType::UInt128 => "u128", - ValueType::SInt128 => "i128", - ValueType::Float32 => "f32", - ValueType::Float64 => "f64", - ValueType::Q8_8 => "Q8_8", - ValueType::Q16_16 => "Q16_16", - ValueType::Q32_32 => "Q32_32", - ValueType::LinearRange => "lrange", - ValueType::Identifier => "ident", - } -}