Skip to content

Commit

Permalink
refactor: reorganize binary helpers and mod config (#324)
Browse files Browse the repository at this point in the history
* refactor: reorganize binary helpers and mod config

* style: fix ambiguous use statements

* refactor: rm `ReadFileError`

* refactor: `use PathBuf`
  • Loading branch information
Devdutt Shenoi authored Feb 14, 2024
1 parent 364fe4c commit 09b3c9f
Show file tree
Hide file tree
Showing 14 changed files with 549 additions and 594 deletions.
4 changes: 2 additions & 2 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{collections::HashMap, fmt::Debug, pin::Pin, sync::Arc, time::Duration}

use super::streams::Streams;
use super::{ActionBridgeShutdown, Package, StreamMetrics};
use crate::base::ActionRoute;
use crate::config::ActionRoute;
use crate::{Action, ActionResponse, Config};

const TUNSHELL_ACTION: &str = "launch_shell";
Expand Down Expand Up @@ -464,7 +464,7 @@ mod tests {
use tokio::{runtime::Runtime, select};

use crate::{
base::{ActionRoute, StreamConfig, StreamMetricsConfig},
config::{ActionRoute, StreamConfig, StreamMetricsConfig},
Action, ActionResponse, Config,
};

Expand Down
3 changes: 1 addition & 2 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ pub use actions_lane::{CtrlTx as ActionsLaneCtrlTx, StatusTx};
use data_lane::DataBridge;
pub use data_lane::{CtrlTx as DataLaneCtrlTx, DataTx};

use super::StreamConfig;
use crate::base::ActionRoute;
use crate::config::{ActionRoute, StreamConfig};
use crate::{Action, ActionResponse, Config};
pub use metrics::StreamMetrics;

Expand Down
3 changes: 1 addition & 2 deletions uplink/src/base/bridge/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use flume::{SendError, Sender};
use log::{debug, trace};
use serde::Serialize;

use crate::base::StreamConfig;

use crate::config::StreamConfig;
use super::{Package, Point, StreamMetrics};

/// Signals status of stream buffer
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/base/bridge/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use log::{error, info, trace};

use super::stream::{self, StreamStatus};
use super::{Point, StreamMetrics};
use crate::base::StreamConfig;
use crate::config::StreamConfig;
use crate::{Config, Package, Stream};

use super::delaymap::DelayMap;
Expand Down
278 changes: 2 additions & 276 deletions uplink/src/base/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,8 @@
use std::cmp::Ordering;
use std::env::current_dir;
use std::path::PathBuf;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, fmt::Debug};
use std::fmt::Debug;
use std::time::{SystemTime, UNIX_EPOCH};

use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};
use tokio::join;

#[cfg(target_os = "linux")]
use crate::collector::journalctl::JournalCtlConfig;
#[cfg(target_os = "android")]
use crate::collector::logcat::LogcatConfig;

use self::bridge::stream::MAX_BATCH_SIZE;
use self::bridge::{ActionsLaneCtrlTx, DataLaneCtrlTx};
use self::mqtt::CtrlTx as MqttCtrlTx;
use self::serializer::CtrlTx as SerializerCtrlTx;
Expand All @@ -24,273 +13,10 @@ pub mod monitor;
pub mod mqtt;
pub mod serializer;

pub const DEFAULT_TIMEOUT: u64 = 60;

#[inline]
fn default_timeout() -> Duration {
Duration::from_secs(DEFAULT_TIMEOUT)
}

#[inline]
fn max_batch_size() -> usize {
MAX_BATCH_SIZE
}

fn default_file_size() -> usize {
10485760 // 10MB
}

fn default_persistence_path() -> PathBuf {
let mut path = current_dir().expect("Couldn't figure out current directory");
path.push(".persistence");
path
}

fn default_download_path() -> PathBuf {
let mut path = current_dir().expect("Couldn't figure out current directory");
path.push(".downloads");
path
}

// Automatically assigns port 5050 for default main app, if left unconfigured
fn default_tcpapps() -> HashMap<String, AppConfig> {
let mut apps = HashMap::new();
apps.insert("main".to_string(), AppConfig { port: 5050, actions: vec![] });

apps
}

pub fn clock() -> u128 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()
}

#[derive(Debug, Clone, Copy, Deserialize, Serialize, Default, PartialEq, Eq, PartialOrd)]
pub enum Compression {
#[default]
Disabled,
Lz4,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
pub struct StreamConfig {
pub topic: String,
#[serde(default = "max_batch_size", alias = "buf_size")]
pub batch_size: usize,
#[serde(default = "default_timeout")]
#[serde_as(as = "DurationSeconds<u64>")]
/// Duration(in seconds) that bridge collector waits from
/// receiving first element, before the stream gets flushed.
pub flush_period: Duration,
#[serde(default)]
pub compression: Compression,
#[serde(default)]
pub persistence: Persistence,
#[serde(default)]
pub priority: u8,
}

impl Default for StreamConfig {
fn default() -> Self {
Self {
topic: "".to_string(),
batch_size: MAX_BATCH_SIZE,
flush_period: default_timeout(),
compression: Compression::Disabled,
persistence: Persistence::default(),
priority: 0,
}
}
}

impl Ord for StreamConfig {
fn cmp(&self, other: &Self) -> Ordering {
match (self.priority.cmp(&other.priority), self.topic.cmp(&other.topic)) {
(Ordering::Equal, o) => o,
(o, _) => o.reverse(),
}
}
}

impl PartialOrd for StreamConfig {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

#[derive(Debug, Clone, Deserialize, PartialEq, Eq, PartialOrd)]
pub struct Persistence {
#[serde(default = "default_file_size")]
pub max_file_size: usize,
#[serde(default)]
pub max_file_count: usize,
}

impl Default for Persistence {
fn default() -> Self {
Persistence { max_file_size: default_file_size(), max_file_count: 0 }
}
}

#[derive(Debug, Clone, Deserialize)]
pub struct Authentication {
pub ca_certificate: String,
pub device_certificate: String,
pub device_private_key: String,
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct Stats {
pub enabled: bool,
pub process_names: Vec<String>,
pub update_period: u64,
pub stream_size: Option<usize>,
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct SimulatorConfig {
/// path to directory containing files with gps paths to be used in simulation
pub gps_paths: String,
/// actions that are to be routed to simulator
pub actions: Vec<ActionRoute>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct DownloaderConfig {
#[serde(default = "default_download_path")]
pub path: PathBuf,
pub actions: Vec<ActionRoute>,
}

impl Default for DownloaderConfig {
fn default() -> Self {
Self { path: default_download_path(), actions: vec![] }
}
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct InstallerConfig {
pub path: String,
pub actions: Vec<ActionRoute>,
pub uplink_port: u16,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct StreamMetricsConfig {
pub enabled: bool,
pub bridge_topic: String,
pub serializer_topic: String,
pub blacklist: Vec<String>,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: Duration,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct SerializerMetricsConfig {
pub enabled: bool,
pub topic: String,
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: Duration,
}

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct MqttMetricsConfig {
pub enabled: bool,
pub topic: String,
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct AppConfig {
pub port: u16,
#[serde(default)]
pub actions: Vec<ActionRoute>,
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct ConsoleConfig {
pub enabled: bool,
pub port: u16,
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct MqttConfig {
pub max_packet_size: usize,
pub max_inflight: u16,
pub keep_alive: u64,
pub network_timeout: u64,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ActionRoute {
pub name: String,
#[serde(default = "default_timeout")]
#[serde_as(as = "DurationSeconds<u64>")]
pub timeout: Duration,
}

impl From<&ActionRoute> for ActionRoute {
fn from(value: &ActionRoute) -> Self {
value.clone()
}
}

#[derive(Clone, Debug, Deserialize)]
pub struct DeviceShadowConfig {
pub interval: u64,
}

impl Default for DeviceShadowConfig {
fn default() -> Self {
Self { interval: DEFAULT_TIMEOUT }
}
}

#[derive(Debug, Clone, Deserialize, Default)]
pub struct Config {
pub project_id: String,
pub device_id: String,
pub broker: String,
pub port: u16,
#[serde(default)]
pub console: ConsoleConfig,
pub authentication: Option<Authentication>,
#[serde(default = "default_tcpapps")]
pub tcpapps: HashMap<String, AppConfig>,
pub mqtt: MqttConfig,
#[serde(default)]
pub processes: Vec<ActionRoute>,
#[serde(default)]
pub script_runner: Vec<ActionRoute>,
#[serde(skip)]
pub actions_subscription: String,
pub streams: HashMap<String, StreamConfig>,
#[serde(default = "default_persistence_path")]
pub persistence_path: PathBuf,
pub action_status: StreamConfig,
pub stream_metrics: StreamMetricsConfig,
pub serializer_metrics: SerializerMetricsConfig,
pub mqtt_metrics: MqttMetricsConfig,
#[serde(default)]
pub downloader: DownloaderConfig,
pub system_stats: Stats,
pub simulator: Option<SimulatorConfig>,
#[serde(default)]
pub ota_installer: InstallerConfig,
#[serde(default)]
pub device_shadow: DeviceShadowConfig,
#[serde(default)]
pub action_redirections: HashMap<String, String>,
#[serde(default)]
pub ignore_actions_if_no_clients: bool,
#[cfg(target_os = "linux")]
pub logging: Option<JournalCtlConfig>,
#[cfg(target_os = "android")]
pub logging: Option<LogcatConfig>,
}

/// Send control messages to the various components in uplink. Currently this is
/// used only to trigger uplink shutdown. Shutdown signals are sent to all
/// components simultaneously with a join.
Expand Down
6 changes: 2 additions & 4 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ use storage::Storage;
use thiserror::Error;
use tokio::{select, time::interval};

use crate::base::Compression;
use crate::config::{default_file_size, Compression, StreamConfig};
use crate::{Config, Package};
pub use metrics::{Metrics, SerializerMetrics, StreamMetrics};

use super::{default_file_size, StreamConfig};

const METRICS_INTERVAL: Duration = Duration::from_secs(10);

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -877,7 +875,7 @@ mod test {

use super::*;
use crate::base::bridge::stream::Stream;
use crate::base::MqttConfig;
use crate::config::MqttConfig;
use crate::Payload;

#[derive(Clone)]
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/collector/device_shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use log::{error, trace};
use serde::Serialize;

use crate::base::DeviceShadowConfig;
use crate::config::DeviceShadowConfig;
use crate::base::{bridge::BridgeTx, clock};
use crate::Payload;

Expand Down
10 changes: 4 additions & 6 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ use std::{
};
use std::{io::Write, path::PathBuf};

use crate::base::bridge::BridgeTx;
use crate::base::DownloaderConfig;
use crate::{Action, ActionResponse, Config};
use crate::{base::bridge::BridgeTx, config::DownloaderConfig, Action, ActionResponse, Config};

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -442,9 +440,9 @@ mod test {
use std::{collections::HashMap, time::Duration};

use super::*;
use crate::base::{
bridge::{DataTx, StatusTx},
ActionRoute, DownloaderConfig, MqttConfig,
use crate::{
base::bridge::{DataTx, StatusTx},
config::{ActionRoute, DownloaderConfig, MqttConfig},
};

const DOWNLOAD_DIR: &str = "/tmp/uplink_test";
Expand Down
Loading

0 comments on commit 09b3c9f

Please sign in to comment.