diff --git a/Cargo.lock b/Cargo.lock index 4abe12c..527a7a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,6 +98,12 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "corncobs" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9236877021b66ad90f833d8a73a7acb702b985b64c5986682d9f1f1a184f0fb" + [[package]] name = "futures" version = "0.3.30" @@ -436,6 +442,7 @@ version = "0.1.0" dependencies = [ "anyhow", "array_pool", + "corncobs", "tokio", "tokio-serial", ] diff --git a/Cargo.toml b/Cargo.toml index df916e0..efda8fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,5 +8,6 @@ publish = false [dependencies] anyhow = "1.0.86" array_pool = "0.1.2" +corncobs = "0.1.3" tokio = { version = "1.38.0", features = ["full"] } tokio-serial = "5.4.4" diff --git a/src/main.rs b/src/main.rs index 08e6429..0261d11 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,17 @@ -use std::ops::Deref; +extern crate core; use std::sync::Arc; use std::time::Duration; -use array_pool::pool::{ArrayPool, BorrowingSlice}; +use array_pool::pool::ArrayPool; +use corncobs::CobsError; use tokio::io::{self, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio_serial::{DataBits, FlowControl, Parity, SerialPortBuilderExt, SerialStream, StopBits}; +use crate::packet::DataSlice; + +mod packet; + #[tokio::main] async fn main() -> anyhow::Result<()> { let port_name = "/dev/ttyACM0"; @@ -22,7 +27,7 @@ async fn main() -> anyhow::Result<()> { .open_native_async() .expect("Failed to open port"); - let (from_device, mut receiver) = unbounded_channel::(); + let (from_device, mut receiver) = unbounded_channel::(); let (command, to_device) = unbounded_channel::(); // Pool for sharing data @@ -36,9 +41,23 @@ async fn main() -> anyhow::Result<()> { // Main loop for printing input from the serial line. loop { - if let Some(data) = receiver.recv().await { - let data = String::from_utf8_lossy(&data).into_owned(); - print!("{}", data); + if let Some(mut data) = receiver.recv().await { + match corncobs::decode_in_place(&mut data) { + Ok(decoded_length) => { + let data = String::from_utf8_lossy(&data[..decoded_length]); + println!("Received data: {:?}", data); + } + Err(e) => { + match e { + CobsError::Truncated => { + // ignored; this is a synchronization issue + } + CobsError::Corrupt => { + // ignored + } + } + } + } } } } @@ -58,7 +77,7 @@ async fn handle_std_input(command: UnboundedSender) { async fn handle_data_recv( mut port: SerialStream, - from_device: UnboundedSender, + from_device: UnboundedSender, mut to_device: UnboundedReceiver, pool: Arc>, ) -> anyhow::Result<()> { @@ -76,7 +95,7 @@ async fn handle_data_recv( if bytes_read > 0 { let mut slice = pool.rent(bytes_read).map_err(|_| anyhow::Error::msg("failed to borrow from pool"))?; slice[..bytes_read].copy_from_slice(&buf[..bytes_read]); - from_device.send(Packet::new(slice, bytes_read))?; + from_device.send(DataSlice::new(slice, bytes_read))?; } } Err(ref e) if e.kind() == io::ErrorKind::TimedOut => (), @@ -85,28 +104,3 @@ async fn handle_data_recv( } } } - -struct Packet { - buffer: BorrowingSlice, - size: usize, -} - -impl Packet { - pub fn new(buffer: BorrowingSlice, size: usize) -> Self { - Self { buffer, size } - } -} - -impl AsRef<[u8]> for Packet { - fn as_ref(&self) -> &[u8] { - &self.buffer[..self.size] - } -} - -impl Deref for Packet { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - self.as_ref() - } -} diff --git a/src/packet.rs b/src/packet.rs new file mode 100644 index 0000000..be18132 --- /dev/null +++ b/src/packet.rs @@ -0,0 +1,52 @@ +use std::ops::{Deref, DerefMut}; + +use array_pool::pool::BorrowingSlice; + +/// A slice of data transmitted over the wire. +pub struct DataSlice { + buffer: BorrowingSlice, + size: usize, +} + +impl DataSlice { + pub fn new(buffer: BorrowingSlice, size: usize) -> Self { + assert!(buffer.len() >= size); + Self { buffer, size } + } + + #[allow(unused)] + pub fn len(&self) -> usize { + self.size + } + + #[allow(unused)] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl AsRef<[u8]> for DataSlice { + fn as_ref(&self) -> &[u8] { + &self.buffer[..self.size] + } +} + +impl AsMut<[u8]> for DataSlice { + fn as_mut(&mut self) -> &mut [u8] { + &mut self.buffer[..self.size] + } +} + +impl Deref for DataSlice { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.as_ref() + } +} + +impl DerefMut for DataSlice { + fn deref_mut(&mut self) -> &mut Self::Target { + self.as_mut() + } +}