Skip to content

Commit

Permalink
data_transfer: Implement basics data transfer
Browse files Browse the repository at this point in the history
* Implement the necessary to emulate send data between two UWBS.
* Initially, data is copied all at once from controller to controllee.
  Future commits will align with MAC specifications for realistic data
  transfer simulation.
* Updated device sender to accept 'UciPacket', enabling handling of
  both 'ControlPacket' and 'DataPacket'.
* Refactored 'get_session_mut' to 'session_mut'.
* Implemented conversion from 'MacAddress' to 'u64'.
  • Loading branch information
SilverBzH committed Feb 7, 2024
1 parent d947745 commit 336d9ad
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 22 deletions.
44 changes: 34 additions & 10 deletions src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ use tokio::sync::mpsc;
use tokio::time;

use super::session::{Session, MAX_SESSION};
use super::UciPacket;

pub const MAX_DEVICE: usize = 4;

const UCI_VERSION: u16 = 0x0002; // Version 2.0
const MAC_VERSION: u16 = 0x3001; // Version 1.3.0
const PHY_VERSION: u16 = 0x3001; // Version 1.3.0
Expand Down Expand Up @@ -78,7 +80,7 @@ pub struct Device {
/// [UCI] 5. UWBS Device State Machine
state: DeviceState,
sessions: HashMap<u32, Session>,
pub tx: mpsc::Sender<ControlPacket>,
pub tx: mpsc::Sender<UciPacket>,
pica_tx: mpsc::Sender<PicaCommand>,
config: HashMap<DeviceConfigId, Vec<u8>>,
country_code: [u8; 2],
Expand All @@ -89,7 +91,7 @@ pub struct Device {
impl Device {
pub fn new(
device_handle: usize,
tx: mpsc::Sender<ControlPacket>,
tx: mpsc::Sender<UciPacket>,
pica_tx: mpsc::Sender<PicaCommand>,
) -> Self {
let mac_address = {
Expand Down Expand Up @@ -131,16 +133,16 @@ impl Device {
self.set_state(DeviceState::DeviceStateReady);
}

pub fn get_session(&self, session_id: u32) -> Option<&Session> {
pub fn session(&self, session_id: u32) -> Option<&Session> {
self.sessions.get(&session_id)
}

pub fn get_session_mut(&mut self, session_id: u32) -> Option<&mut Session> {
pub fn session_mut(&mut self, session_id: u32) -> Option<&mut Session> {
self.sessions.get_mut(&session_id)
}

pub fn can_start_ranging(&self, peer_session: &Session, session_id: u32) -> bool {
match self.get_session(session_id) {
match self.session(session_id) {
Some(session) => {
session.session_state() == SessionState::SessionStateActive
&& session
Expand All @@ -151,6 +153,28 @@ impl Device {
}
}

pub fn can_start_data_transfer(&self, session_id: u32) -> bool {
match self.session(session_id) {
Some(session) => {
session.session_state() == SessionState::SessionStateActive
&& session.session_type() == SessionType::FiraRangingAndInBandDataSession
&& session.app_config.can_start_data_transfer()
}
None => false,
}
}

pub fn can_receive_data_transfer(&self, session_id: u32) -> bool {
match self.session(session_id) {
Some(session) => {
session.session_state() == SessionState::SessionStateActive
&& session.session_type() == SessionType::FiraRangingAndInBandDataSession
&& session.app_config.can_receive_data_transfer()
}
None => false,
}
}

// The fira norm specify to send a response, then reset, then
// send a notification once the reset is done
fn command_device_reset(&mut self, cmd: DeviceResetCmd) -> DeviceResetRsp {
Expand Down Expand Up @@ -296,7 +320,7 @@ impl Device {
Some(_) => StatusCode::UciStatusSessionDuplicate,
None => {
// Should not fail
self.get_session_mut(session_id).unwrap().init();
self.session_mut(session_id).unwrap().init();
StatusCode::UciStatusOk
}
}
Expand Down Expand Up @@ -371,10 +395,11 @@ impl Device {
}

pub fn data_message_snd(&mut self, data: DataPacket) -> SessionControlNotification {
println!("[{}] data_message_send", self.handle);
match data.specialize() {
DataPacketChild::DataMessageSnd(data_msg_snd) => {
let session_token = data_msg_snd.get_session_handle();
if let Some(session) = self.get_session_mut(session_token) {
if let Some(session) = self.session_mut(session_token) {
session.data_message_snd(data_msg_snd)
} else {
DataTransferStatusNtfBuilder {
Expand Down Expand Up @@ -418,7 +443,6 @@ impl Device {
},
// Handle commands for session management
UciCommandChild::SessionConfigCommand(session_command) => {
// Session commands directly handled at Device level
match session_command.specialize() {
SessionConfigCommandChild::SessionInitCmd(cmd) => {
return self.command_session_init(cmd).into();
Expand Down Expand Up @@ -447,7 +471,7 @@ impl Device {
_ => panic!("Unsupported session command type"),
};

if let Some(session) = self.get_session_mut(session_id) {
if let Some(session) = self.session_mut(session_id) {
// There is a session matching the session_id in the command
// Pass the command through
match session_command.specialize() {
Expand Down Expand Up @@ -498,7 +522,7 @@ impl Device {
}
UciCommandChild::SessionControlCommand(ranging_command) => {
let session_id = ranging_command.get_session_id();
if let Some(session) = self.get_session_mut(session_id) {
if let Some(session) = self.session_mut(session_id) {
// Forward to the proper session
let response = session.ranging_command(ranging_command);
match response.specialize() {
Expand Down
43 changes: 38 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub use mac_address::MacAddress;
use crate::session::RangeDataNtfConfig;

pub type PicaCommandStatus = Result<(), PicaCommandError>;
pub type UciPacket = Vec<u8>;

#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum PicaCommandError {
Expand Down Expand Up @@ -364,7 +365,7 @@ impl Pica {

// Send response packets to the connected UWB host.
Some(packet) = packet_rx.recv() =>
if uci_writer.write(&packet.to_bytes(), &mut pcapng_file).await.is_err() {
if uci_writer.write(&packet, &mut pcapng_file).await.is_err() {
break 'outer
}
}
Expand Down Expand Up @@ -400,9 +401,10 @@ impl Pica {
println!(" session_id={}", session_id);

let device = self.get_device(device_handle).unwrap();
let session = device.get_session(session_id).unwrap();
let session = device.session(session_id).unwrap();

let mut measurements = Vec::new();
let mut peer_device_data_transfer = Vec::new();
session
.get_dst_mac_addresses()
.iter()
Expand Down Expand Up @@ -431,8 +433,33 @@ impl Pica {
assert!(local.0 == remote.0);
measurements.push(make_measurement(mac_address, local, remote));
}
if device.can_start_data_transfer(session_id)
&& peer_device.can_receive_data_transfer(session_id)
{
peer_device_data_transfer.push(peer_device);
}
}
});
// TODO: Data transfer should be limited in size for
// each round of ranging
for peer_device in peer_device_data_transfer.iter() {
peer_device
.tx
.send(
DataMessageRcvBuilder {
application_data: session.data().clone().into(),
data_sequence_number: 0x01,
pbf: PacketBoundaryFlag::Complete,
session_handle: session_id,
source_address: device.mac_address.into(),
status: UciStatusCode::UciStatusOk,
}
.build()
.into(),
)
.await
.unwrap();
}
if session.is_ranging_data_ntf_enabled() != RangeDataNtfConfig::Disable {
device
.tx
Expand All @@ -453,10 +480,16 @@ impl Pica {
.unwrap();

let device = self.get_device_mut(device_handle).unwrap();
let session = device.get_session_mut(session_id).unwrap();
let session = device.session_mut(session_id).unwrap();

session.sequence_number += 1;
}

// TODO: Clean the data only when all the data is transfered
let device = self.get_device_mut(device_handle).unwrap();
let session = device.session_mut(session_id).unwrap();

session.clear_data();
}

async fn uci_data(&mut self, device_handle: usize, data: DataPacket) {
Expand All @@ -482,7 +515,7 @@ impl Pica {
let response: ControlPacket = device.command(cmd).into();
device
.tx
.send(response)
.send(response.to_vec())
.await
.unwrap_or_else(|err| println!("Failed to send UCI command response: {}", err));
}
Expand Down Expand Up @@ -528,7 +561,7 @@ impl Pica {
// corresponding mac_address and session_id.
async fn stop_controlee_ranging(&mut self, mac_address: &MacAddress, session_id: u32) {
if let Some(device) = self.get_device_mut_by_mac(mac_address) {
if let Some(session) = device.get_session_mut(session_id) {
if let Some(session) = device.session_mut(session_id) {
if session.session_state() == SessionState::SessionStateActive {
session.stop_ranging_task();
session.set_state(
Expand Down
25 changes: 25 additions & 0 deletions src/mac_address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ impl From<usize> for MacAddress {
}
}

impl From<MacAddress> for u64 {
fn from(mac_address: MacAddress) -> Self {
match mac_address {
MacAddress::Short(addr) => u16::from_le_bytes(addr) as u64,
MacAddress::Extend(addr) => u64::from_le_bytes(addr),
}
}
}

impl TryFrom<String> for MacAddress {
type Error = Error;
fn try_from(mac_address: String) -> std::result::Result<Self, Error> {
Expand Down Expand Up @@ -144,4 +153,20 @@ mod tests {
);
assert_eq!(extend_mac_address.to_string(), extend_mac_address);
}

#[test]
fn test_short_mac_to_u64() {
let short_mac = MacAddress::Short([0x01, 0x02]);
let result: u64 = short_mac.into();
let expected: u64 = 0x0201;
assert_eq!(result, expected);
}

#[test]
fn test_extend_mac_to_u64() {
let extend_mac = MacAddress::Extend([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]);
let result: u64 = extend_mac.into();
let expected: u64 = 0x0807060504030201;
assert_eq!(result, expected);
}
}
36 changes: 29 additions & 7 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::packets::uci::{self, *};
use crate::{MacAddress, PicaCommand};
use bytes::BytesMut;
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::mpsc;
Expand All @@ -27,6 +28,8 @@ use tokio::time;
use num_derive::{FromPrimitive, ToPrimitive};
use num_traits::FromPrimitive;

use super::UciPacket;

pub const MAX_SESSION: usize = 255;
pub const DEFAULT_RANGING_INTERVAL: Duration = time::Duration::from_millis(200);
pub const DEFAULT_SLOT_DURATION: u16 = 2400; // RTSU unit
Expand Down Expand Up @@ -649,6 +652,14 @@ impl AppConfig {
.contains(&peer_config.device_mac_address)
}

pub fn can_start_data_transfer(&self) -> bool {
self.device_role == DeviceRole::Initiator
}

pub fn can_receive_data_transfer(&self) -> bool {
self.device_role == DeviceRole::Responder
}

fn extend(&mut self, configs: &[AppConfigTlv]) -> Vec<AppConfigStatus> {
if !app_config_has_mandatory_parameters(configs) {
// TODO: What shall we do in this situation?
Expand Down Expand Up @@ -717,12 +728,13 @@ pub struct Session {
/// cf. [UCI] 7.2 Table 13: 4 octets unique random number generated by application
id: u32,
device_handle: usize,
data: BytesMut,

session_type: SessionType,
pub sequence_number: u32,
pub app_config: AppConfig,
ranging_task: Option<JoinHandle<()>>,
tx: mpsc::Sender<ControlPacket>,
tx: mpsc::Sender<UciPacket>,
pica_tx: mpsc::Sender<PicaCommand>,
}

Expand All @@ -731,13 +743,14 @@ impl Session {
id: u32,
session_type: SessionType,
device_handle: usize,
tx: mpsc::Sender<ControlPacket>,
tx: mpsc::Sender<UciPacket>,
pica_tx: mpsc::Sender<PicaCommand>,
) -> Self {
Self {
state: SessionState::SessionStateDeinit,
id,
device_handle,
data: BytesMut::new(),
session_type,
sequence_number: 0,
app_config: AppConfig::default(),
Expand Down Expand Up @@ -781,6 +794,18 @@ impl Session {
self.app_config.rng_data_ntf
}

pub fn data(&self) -> &BytesMut {
&self.data
}

pub fn clear_data(&mut self) {
self.data.clear()
}

pub fn session_type(&self) -> SessionType {
self.session_type
}

pub fn session_state(&self) -> SessionState {
self.state
}
Expand Down Expand Up @@ -1184,6 +1209,7 @@ impl Session {
}

pub fn data_message_snd(&mut self, data: DataMessageSnd) -> SessionControlNotification {
println!("[{}] data_message_snd", self.device_handle);
let session_token = data.get_session_handle();
let uci_sequence_number = data.get_data_sequence_number() as u8;

Expand All @@ -1200,11 +1226,7 @@ impl Session {

assert_eq!(self.id, session_token);

// TODO: perform actual data transfer across devices
println!(
"Data packet received, payload bytes: {:?}",
data.get_application_data()
);
self.data.extend_from_slice(data.get_application_data());

DataCreditNtfBuilder {
credit_availability: CreditAvailability::CreditAvailable,
Expand Down

0 comments on commit 336d9ad

Please sign in to comment.