From 02589bf5b8f1b31bcb3151065d30ab5ba61eca25 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 9 Sep 2023 12:47:58 +0530 Subject: [PATCH] fix: simulator `action_status` is not payload --- uplink/src/collector/simulator/data.rs | 38 +++++++------- uplink/src/collector/simulator/mod.rs | 69 ++++++++------------------ 2 files changed, 40 insertions(+), 67 deletions(-) diff --git a/uplink/src/collector/simulator/data.rs b/uplink/src/collector/simulator/data.rs index 12527b54..6691cba8 100644 --- a/uplink/src/collector/simulator/data.rs +++ b/uplink/src/collector/simulator/data.rs @@ -13,6 +13,8 @@ use std::time::Duration; use crate::base::clock; use crate::Payload; +use super::Event; + const RESET_LIMIT: u32 = 1500; #[inline] @@ -60,7 +62,7 @@ pub struct Gps { } impl Gps { - pub async fn simulate(tx: Sender, device: DeviceData) { + pub async fn simulate(tx: Sender, device: DeviceData) { let mut sequence = 0; let mut interval = interval(DataType::Gps.duration()); let path_len = device.path.len() as u32; @@ -74,12 +76,12 @@ impl Gps { trace!("Data Event: {:?}", payload); if let Err(e) = tx - .send_async(Payload { + .send_async(Event::Data(Payload { timestamp: clock() as u64, stream: "gps".to_string(), sequence, payload: json!(payload), - }) + })) .await { error!("{e}"); @@ -194,7 +196,7 @@ pub struct Bms { } impl Bms { - pub async fn simulate(tx: Sender) { + pub async fn simulate(tx: Sender) { let mut sequence = 0; let mut interval = interval(DataType::Bms.duration()); loop { @@ -205,12 +207,12 @@ impl Bms { trace!("Data Event: {:?}", payload); if let Err(e) = tx - .send_async(Payload { + .send_async(Event::Data(Payload { timestamp: clock() as u64, stream: "bms".to_string(), sequence, payload: json!(payload), - }) + })) .await { error!("{e}"); @@ -243,7 +245,7 @@ pub struct Imu { } impl Imu { - pub async fn simulate(tx: Sender) { + pub async fn simulate(tx: Sender) { let mut sequence = 0; let mut interval = interval(DataType::Imu.duration()); loop { @@ -254,12 +256,12 @@ impl Imu { trace!("Data Event: {:?}", payload); if let Err(e) = tx - .send_async(Payload { + .send_async(Event::Data(Payload { timestamp: clock() as u64, stream: "imu".to_string(), sequence, payload: json!(payload), - }) + })) .await { error!("{e}"); @@ -286,7 +288,7 @@ pub struct Motor { } impl Motor { - pub async fn simulate(tx: Sender) { + pub async fn simulate(tx: Sender) { let mut sequence = 0; let mut interval = interval(DataType::Motor.duration()); loop { @@ -297,12 +299,12 @@ impl Motor { trace!("Data Event: {:?}", payload); if let Err(e) = tx - .send_async(Payload { + .send_async(Event::Data(Payload { timestamp: clock() as u64, stream: "motor".to_string(), sequence, payload: json!(payload), - }) + })) .await { error!("{e}"); @@ -335,7 +337,7 @@ pub struct PeripheralState { } impl PeripheralState { - pub async fn simulate(tx: Sender) { + pub async fn simulate(tx: Sender) { let mut sequence = 0; let mut interval = interval(DataType::PeripheralData.duration()); loop { @@ -346,12 +348,12 @@ impl PeripheralState { trace!("Data Event: {:?}", payload); if let Err(e) = tx - .send_async(Payload { + .send_async(Event::Data(Payload { timestamp: clock() as u64, stream: "peripheral_state".to_string(), sequence, payload: json!(payload), - }) + })) .await { error!("{e}"); @@ -381,7 +383,7 @@ pub struct DeviceShadow { } impl DeviceShadow { - pub async fn simulate(tx: Sender) { + pub async fn simulate(tx: Sender) { let mut sequence = 0; let mut interval = interval(DataType::DeviceShadow.duration()); loop { @@ -392,12 +394,12 @@ impl DeviceShadow { trace!("Data Event: {:?}", payload); if let Err(e) = tx - .send_async(Payload { + .send_async(Event::Data(Payload { timestamp: clock() as u64, stream: "device_shadow".to_string(), sequence, payload: json!(payload), - }) + })) .await { error!("{e}"); diff --git a/uplink/src/collector/simulator/mod.rs b/uplink/src/collector/simulator/mod.rs index 10d81ec0..17d91f0e 100644 --- a/uplink/src/collector/simulator/mod.rs +++ b/uplink/src/collector/simulator/mod.rs @@ -1,12 +1,10 @@ use crate::base::bridge::{BridgeTx, Payload}; -use crate::base::{clock, SimulatorConfig}; -use crate::Action; +use crate::base::SimulatorConfig; +use crate::{Action, ActionResponse}; use data::{Bms, DeviceData, DeviceShadow, Gps, Imu, Motor, PeripheralState}; use flume::{bounded, Receiver, Sender}; use log::{error, info}; use rand::Rng; -use serde::Serialize; -use serde_json::json; use thiserror::Error; use tokio::time::interval; use tokio::{select, spawn}; @@ -16,6 +14,11 @@ use std::{fs, io, sync::Arc}; mod data; +pub enum Event { + Data(Payload), + ActionResponse(ActionResponse), +} + #[derive(Error, Debug)] pub enum Error { #[error("Io error {0}")] @@ -26,16 +29,8 @@ pub enum Error { Json(#[from] serde_json::error::Error), } -#[derive(Serialize)] -pub struct ActionResponse { - action_id: String, - state: String, - progress: u8, - errors: Vec, -} - impl ActionResponse { - pub async fn simulate(action: Action, tx: Sender) { + pub async fn simulate(action: Action, tx: Sender) { let action_id = action.action_id; info!("Generating action events for action: {action_id}"); let mut sequence = 0; @@ -43,22 +38,11 @@ impl ActionResponse { // Action response, 10% completion per second for i in 1..10 { - let response = ActionResponse { - action_id: action_id.clone(), - progress: i * 10 + rand::thread_rng().gen_range(0..10), - state: String::from("in_progress"), - errors: vec![], - }; + let progress = i * 10 + rand::thread_rng().gen_range(0..10); sequence += 1; - if let Err(e) = tx - .send_async(Payload { - stream: "action_status".to_string(), - sequence, - payload: json!(response), - timestamp: clock() as u64, - }) - .await - { + let response = ActionResponse::progress(&action_id, "in_progress", progress) + .set_sequence(sequence); + if let Err(e) = tx.send_async(Event::ActionResponse(response)).await { error!("{e}"); break; } @@ -66,22 +50,10 @@ impl ActionResponse { interval.tick().await; } - let response = ActionResponse { - action_id, - progress: 100, - state: String::from("Completed"), - errors: vec![], - }; sequence += 1; - if let Err(e) = tx - .send_async(Payload { - stream: "action_status".to_string(), - sequence, - payload: json!(response), - timestamp: clock() as u64, - }) - .await - { + let response = + ActionResponse::progress(&action_id, "Completed", 100).set_sequence(sequence); + if let Err(e) = tx.send_async(Event::ActionResponse(response)).await { error!("{e}"); } info!("Successfully sent all action responses"); @@ -107,7 +79,7 @@ pub fn new_device_data(path: Arc>) -> DeviceData { DeviceData { path, path_offset: path_index } } -pub fn spawn_data_simulators(device: DeviceData, tx: Sender) { +pub fn spawn_data_simulators(device: DeviceData, tx: Sender) { spawn(Gps::simulate(tx.clone(), device)); spawn(Bms::simulate(tx.clone())); spawn(Imu::simulate(tx.clone())); @@ -134,16 +106,15 @@ pub async fn start( let action = action?; spawn(ActionResponse::simulate(action, tx.clone())); } - p = rx.recv_async() => { - let payload = match p { - Ok(p) => p, + event = rx.recv_async() => { + match event { + Ok(Event::ActionResponse(status)) => bridge_tx.send_action_response(status).await, + Ok(Event::Data(payload)) => bridge_tx.send_payload(payload).await, Err(_) => { error!("All generators have stopped!"); return Ok(()) } }; - - bridge_tx.send_payload(payload).await; } } }