From 336d9ad59c10d3abc3815ab6157cbbc3ade74786 Mon Sep 17 00:00:00 2001 From: Charlie Boutier Date: Tue, 6 Feb 2024 23:38:31 +0000 Subject: [PATCH] data_transfer: Implement basics data transfer * Implement the necessary to emulate send data between two UWBS. * Initially, data is copied all at once from controller to controllee. Future commits will align with MAC specifications for realistic data transfer simulation. * Updated device sender to accept 'UciPacket', enabling handling of both 'ControlPacket' and 'DataPacket'. * Refactored 'get_session_mut' to 'session_mut'. * Implemented conversion from 'MacAddress' to 'u64'. --- src/device.rs | 44 ++++++++++++++++++++++++++++++++++---------- src/lib.rs | 43 ++++++++++++++++++++++++++++++++++++++----- src/mac_address.rs | 25 +++++++++++++++++++++++++ src/session.rs | 36 +++++++++++++++++++++++++++++------- 4 files changed, 126 insertions(+), 22 deletions(-) diff --git a/src/device.rs b/src/device.rs index a1612bc..445649c 100644 --- a/src/device.rs +++ b/src/device.rs @@ -25,8 +25,10 @@ use tokio::sync::mpsc; use tokio::time; use super::session::{Session, MAX_SESSION}; +use super::UciPacket; pub const MAX_DEVICE: usize = 4; + const UCI_VERSION: u16 = 0x0002; // Version 2.0 const MAC_VERSION: u16 = 0x3001; // Version 1.3.0 const PHY_VERSION: u16 = 0x3001; // Version 1.3.0 @@ -78,7 +80,7 @@ pub struct Device { /// [UCI] 5. UWBS Device State Machine state: DeviceState, sessions: HashMap, - pub tx: mpsc::Sender, + pub tx: mpsc::Sender, pica_tx: mpsc::Sender, config: HashMap>, country_code: [u8; 2], @@ -89,7 +91,7 @@ pub struct Device { impl Device { pub fn new( device_handle: usize, - tx: mpsc::Sender, + tx: mpsc::Sender, pica_tx: mpsc::Sender, ) -> Self { let mac_address = { @@ -131,16 +133,16 @@ impl Device { self.set_state(DeviceState::DeviceStateReady); } - pub fn get_session(&self, session_id: u32) -> Option<&Session> { + pub fn session(&self, session_id: u32) -> Option<&Session> { self.sessions.get(&session_id) } - pub fn get_session_mut(&mut self, session_id: u32) -> Option<&mut Session> { + pub fn session_mut(&mut self, session_id: u32) -> Option<&mut Session> { self.sessions.get_mut(&session_id) } pub fn can_start_ranging(&self, peer_session: &Session, session_id: u32) -> bool { - match self.get_session(session_id) { + match self.session(session_id) { Some(session) => { session.session_state() == SessionState::SessionStateActive && session @@ -151,6 +153,28 @@ impl Device { } } + pub fn can_start_data_transfer(&self, session_id: u32) -> bool { + match self.session(session_id) { + Some(session) => { + session.session_state() == SessionState::SessionStateActive + && session.session_type() == SessionType::FiraRangingAndInBandDataSession + && session.app_config.can_start_data_transfer() + } + None => false, + } + } + + pub fn can_receive_data_transfer(&self, session_id: u32) -> bool { + match self.session(session_id) { + Some(session) => { + session.session_state() == SessionState::SessionStateActive + && session.session_type() == SessionType::FiraRangingAndInBandDataSession + && session.app_config.can_receive_data_transfer() + } + None => false, + } + } + // The fira norm specify to send a response, then reset, then // send a notification once the reset is done fn command_device_reset(&mut self, cmd: DeviceResetCmd) -> DeviceResetRsp { @@ -296,7 +320,7 @@ impl Device { Some(_) => StatusCode::UciStatusSessionDuplicate, None => { // Should not fail - self.get_session_mut(session_id).unwrap().init(); + self.session_mut(session_id).unwrap().init(); StatusCode::UciStatusOk } } @@ -371,10 +395,11 @@ impl Device { } pub fn data_message_snd(&mut self, data: DataPacket) -> SessionControlNotification { + println!("[{}] data_message_send", self.handle); match data.specialize() { DataPacketChild::DataMessageSnd(data_msg_snd) => { let session_token = data_msg_snd.get_session_handle(); - if let Some(session) = self.get_session_mut(session_token) { + if let Some(session) = self.session_mut(session_token) { session.data_message_snd(data_msg_snd) } else { DataTransferStatusNtfBuilder { @@ -418,7 +443,6 @@ impl Device { }, // Handle commands for session management UciCommandChild::SessionConfigCommand(session_command) => { - // Session commands directly handled at Device level match session_command.specialize() { SessionConfigCommandChild::SessionInitCmd(cmd) => { return self.command_session_init(cmd).into(); @@ -447,7 +471,7 @@ impl Device { _ => panic!("Unsupported session command type"), }; - if let Some(session) = self.get_session_mut(session_id) { + if let Some(session) = self.session_mut(session_id) { // There is a session matching the session_id in the command // Pass the command through match session_command.specialize() { @@ -498,7 +522,7 @@ impl Device { } UciCommandChild::SessionControlCommand(ranging_command) => { let session_id = ranging_command.get_session_id(); - if let Some(session) = self.get_session_mut(session_id) { + if let Some(session) = self.session_mut(session_id) { // Forward to the proper session let response = session.ranging_command(ranging_command); match response.specialize() { diff --git a/src/lib.rs b/src/lib.rs index 3aaa9cb..d53ffda 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,6 +45,7 @@ pub use mac_address::MacAddress; use crate::session::RangeDataNtfConfig; pub type PicaCommandStatus = Result<(), PicaCommandError>; +pub type UciPacket = Vec; #[derive(Error, Debug, Clone, PartialEq, Eq)] pub enum PicaCommandError { @@ -364,7 +365,7 @@ impl Pica { // Send response packets to the connected UWB host. Some(packet) = packet_rx.recv() => - if uci_writer.write(&packet.to_bytes(), &mut pcapng_file).await.is_err() { + if uci_writer.write(&packet, &mut pcapng_file).await.is_err() { break 'outer } } @@ -400,9 +401,10 @@ impl Pica { println!(" session_id={}", session_id); let device = self.get_device(device_handle).unwrap(); - let session = device.get_session(session_id).unwrap(); + let session = device.session(session_id).unwrap(); let mut measurements = Vec::new(); + let mut peer_device_data_transfer = Vec::new(); session .get_dst_mac_addresses() .iter() @@ -431,8 +433,33 @@ impl Pica { assert!(local.0 == remote.0); measurements.push(make_measurement(mac_address, local, remote)); } + if device.can_start_data_transfer(session_id) + && peer_device.can_receive_data_transfer(session_id) + { + peer_device_data_transfer.push(peer_device); + } } }); + // TODO: Data transfer should be limited in size for + // each round of ranging + for peer_device in peer_device_data_transfer.iter() { + peer_device + .tx + .send( + DataMessageRcvBuilder { + application_data: session.data().clone().into(), + data_sequence_number: 0x01, + pbf: PacketBoundaryFlag::Complete, + session_handle: session_id, + source_address: device.mac_address.into(), + status: UciStatusCode::UciStatusOk, + } + .build() + .into(), + ) + .await + .unwrap(); + } if session.is_ranging_data_ntf_enabled() != RangeDataNtfConfig::Disable { device .tx @@ -453,10 +480,16 @@ impl Pica { .unwrap(); let device = self.get_device_mut(device_handle).unwrap(); - let session = device.get_session_mut(session_id).unwrap(); + let session = device.session_mut(session_id).unwrap(); session.sequence_number += 1; } + + // TODO: Clean the data only when all the data is transfered + let device = self.get_device_mut(device_handle).unwrap(); + let session = device.session_mut(session_id).unwrap(); + + session.clear_data(); } async fn uci_data(&mut self, device_handle: usize, data: DataPacket) { @@ -482,7 +515,7 @@ impl Pica { let response: ControlPacket = device.command(cmd).into(); device .tx - .send(response) + .send(response.to_vec()) .await .unwrap_or_else(|err| println!("Failed to send UCI command response: {}", err)); } @@ -528,7 +561,7 @@ impl Pica { // corresponding mac_address and session_id. async fn stop_controlee_ranging(&mut self, mac_address: &MacAddress, session_id: u32) { if let Some(device) = self.get_device_mut_by_mac(mac_address) { - if let Some(session) = device.get_session_mut(session_id) { + if let Some(session) = device.session_mut(session_id) { if session.session_state() == SessionState::SessionStateActive { session.stop_ranging_task(); session.set_state( diff --git a/src/mac_address.rs b/src/mac_address.rs index 320631a..115c64e 100644 --- a/src/mac_address.rs +++ b/src/mac_address.rs @@ -49,6 +49,15 @@ impl From for MacAddress { } } +impl From for u64 { + fn from(mac_address: MacAddress) -> Self { + match mac_address { + MacAddress::Short(addr) => u16::from_le_bytes(addr) as u64, + MacAddress::Extend(addr) => u64::from_le_bytes(addr), + } + } +} + impl TryFrom for MacAddress { type Error = Error; fn try_from(mac_address: String) -> std::result::Result { @@ -144,4 +153,20 @@ mod tests { ); assert_eq!(extend_mac_address.to_string(), extend_mac_address); } + + #[test] + fn test_short_mac_to_u64() { + let short_mac = MacAddress::Short([0x01, 0x02]); + let result: u64 = short_mac.into(); + let expected: u64 = 0x0201; + assert_eq!(result, expected); + } + + #[test] + fn test_extend_mac_to_u64() { + let extend_mac = MacAddress::Extend([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]); + let result: u64 = extend_mac.into(); + let expected: u64 = 0x0807060504030201; + assert_eq!(result, expected); + } } diff --git a/src/session.rs b/src/session.rs index cafcb20..29f91a6 100644 --- a/src/session.rs +++ b/src/session.rs @@ -18,6 +18,7 @@ use crate::packets::uci::{self, *}; use crate::{MacAddress, PicaCommand}; +use bytes::BytesMut; use std::collections::HashMap; use std::time::Duration; use tokio::sync::mpsc; @@ -27,6 +28,8 @@ use tokio::time; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::FromPrimitive; +use super::UciPacket; + pub const MAX_SESSION: usize = 255; pub const DEFAULT_RANGING_INTERVAL: Duration = time::Duration::from_millis(200); pub const DEFAULT_SLOT_DURATION: u16 = 2400; // RTSU unit @@ -649,6 +652,14 @@ impl AppConfig { .contains(&peer_config.device_mac_address) } + pub fn can_start_data_transfer(&self) -> bool { + self.device_role == DeviceRole::Initiator + } + + pub fn can_receive_data_transfer(&self) -> bool { + self.device_role == DeviceRole::Responder + } + fn extend(&mut self, configs: &[AppConfigTlv]) -> Vec { if !app_config_has_mandatory_parameters(configs) { // TODO: What shall we do in this situation? @@ -717,12 +728,13 @@ pub struct Session { /// cf. [UCI] 7.2 Table 13: 4 octets unique random number generated by application id: u32, device_handle: usize, + data: BytesMut, session_type: SessionType, pub sequence_number: u32, pub app_config: AppConfig, ranging_task: Option>, - tx: mpsc::Sender, + tx: mpsc::Sender, pica_tx: mpsc::Sender, } @@ -731,13 +743,14 @@ impl Session { id: u32, session_type: SessionType, device_handle: usize, - tx: mpsc::Sender, + tx: mpsc::Sender, pica_tx: mpsc::Sender, ) -> Self { Self { state: SessionState::SessionStateDeinit, id, device_handle, + data: BytesMut::new(), session_type, sequence_number: 0, app_config: AppConfig::default(), @@ -781,6 +794,18 @@ impl Session { self.app_config.rng_data_ntf } + pub fn data(&self) -> &BytesMut { + &self.data + } + + pub fn clear_data(&mut self) { + self.data.clear() + } + + pub fn session_type(&self) -> SessionType { + self.session_type + } + pub fn session_state(&self) -> SessionState { self.state } @@ -1184,6 +1209,7 @@ impl Session { } pub fn data_message_snd(&mut self, data: DataMessageSnd) -> SessionControlNotification { + println!("[{}] data_message_snd", self.device_handle); let session_token = data.get_session_handle(); let uci_sequence_number = data.get_data_sequence_number() as u8; @@ -1200,11 +1226,7 @@ impl Session { assert_eq!(self.id, session_token); - // TODO: perform actual data transfer across devices - println!( - "Data packet received, payload bytes: {:?}", - data.get_application_data() - ); + self.data.extend_from_slice(data.get_application_data()); DataCreditNtfBuilder { credit_availability: CreditAvailability::CreditAvailable,