Skip to content

Commit

Permalink
fix: miscellaneous fixes (#374)
Browse files Browse the repository at this point in the history
* ignore persisted data if it has wrong device id as well
* enable logging for dependencies
* do not retry downloads indefinitely. stop after 20 retries
* logging related fixes
  • Loading branch information
amokfa authored Dec 16, 2024
1 parent 39737c3 commit b78edca
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 35 deletions.
18 changes: 16 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion uplink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
# built-in collectors
# tunshell
tokio-compat-02 = "0.2.0"
tunshell-client = { git = "https://github.com/bytebeamio/tunshell.git", branch = "android_patch" }
tunshell-client = { git = "https://github.com/bytebeamio/tunshell.git", branch = "bytebeam-changes" }
# simulator
fake = { version = "2.5.0", features = ["derive"] }
rand = { workspace = true }
# downloader
fs2 = "0.4"
trait_enum = "0.5.0"
replace_with = "0.1.7"
futures-util = { workspace = true }
hex = "0.4"
human_bytes = "0.4"
Expand Down
6 changes: 3 additions & 3 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl ActionsBridge {
fn load_actions_routing_cache(persistence: &Path) -> LimitedArrayMap<String, String> {
let save_file = persistence.join("actions_routing_cache.json");
if std::fs::metadata(&save_file).is_err() {
return LimitedArrayMap::new(64);
return LimitedArrayMap::new(32);
}
let mut result = std::fs::read(&save_file)
.context("")
Expand All @@ -108,9 +108,9 @@ impl ActionsBridge {
if let Err(e) = std::fs::remove_file(&save_file) {
log::warn!("Couldn't remove a file in persistence directory: {e}. Does the uplink process have right permissions?");
}
LimitedArrayMap::new(64)
LimitedArrayMap::new(32)
});
result.map.reserve(64);
result.map.reserve(32);
result
}

Expand Down
16 changes: 10 additions & 6 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ use std::{sync::Arc, time::Duration};
use flume::{Receiver, Sender};
use lz4_flex::frame::FrameEncoder;
use pretty_bytes::converter::convert;
use replace_with::replace_with_or_abort;
use rumqttc::*;
use tokio::{select, time::interval};

use crate::config::{Compression, StreamConfig};
use crate::{Config, Package};
pub use metrics::{Metrics, SerializerMetrics, StreamMetrics};
use crate::base::clock;
use crate::base::serializer::storage::{Storage, StorageEnum};
use crate::utils::BTreeCursorMut;

const METRICS_INTERVAL: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -172,7 +174,7 @@ pub struct Serializer<C: MqttClient> {
/// when fetching packets, we sort by this and return the live data that has the most stale data
/// if this isn't done, live data for a high frequency stream can block live data for other streams
live_data_clock: usize,
sorted_storages: BTreeMap<Arc<StreamConfig>, (Box<dyn storage::Storage>, Option<Publish>, usize)>,
sorted_storages: BTreeMap<Arc<StreamConfig>, (StorageEnum, Option<Publish>, usize)>,
ctrl_rx: Receiver<()>,
}

Expand Down Expand Up @@ -210,19 +212,19 @@ impl<C: MqttClient> Serializer<C> {
}
}

fn create_storage_for_stream(&self, config: &StreamConfig) -> Box<dyn storage::Storage> {
fn create_storage_for_stream(&self, config: &StreamConfig) -> StorageEnum {
if config.persistence.max_file_count == 0 {
Box::new(storage::InMemoryStorage::new(config.name.as_str(), config.persistence.max_file_size, self.config.mqtt.max_packet_size))
StorageEnum::InMemory(storage::InMemoryStorage::new(config.name.as_str(), config.persistence.max_file_size, self.config.mqtt.max_packet_size))
} else {
match storage::DirectoryStorage::new(
self.config.persistence_path.join(config.name.as_str()),
config.persistence.max_file_size, config.persistence.max_file_count,
self.config.mqtt.max_packet_size,
) {
Ok(s) => Box::new(s),
Ok(s) => StorageEnum::Directory(s),
Err(e) => {
log::error!("Failed to initialize disk backed storage for {} : {e}, falling back to in memory persistence", config.name);
Box::new(storage::InMemoryStorage::new(config.name.as_str(), config.persistence.max_file_size, self.config.mqtt.max_packet_size))
StorageEnum::InMemory(storage::InMemoryStorage::new(config.name.as_str(), config.persistence.max_file_size, self.config.mqtt.max_packet_size))
}
}
}
Expand Down Expand Up @@ -250,7 +252,9 @@ impl<C: MqttClient> Serializer<C> {
}
Err(storage::StorageReadError::FileSystemError(e)) => {
log::error!("Encountered file system error when reading packet for stream({}): {e}, falling back to in memory persistence", storage.name());
*storage = storage.to_in_memory()
replace_with_or_abort(storage, |s| {
s.to_in_memory()
});
}
Err(storage::StorageReadError::InvalidPacket(e)) => {
log::error!("Found invalid packet when reading from storage for stream({}): {e}", storage.name());
Expand Down
65 changes: 57 additions & 8 deletions uplink/src/base/serializer/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub trait Storage: Send {

/// Consume and convert into an in memory storage
/// Used in case of file system errors
fn to_in_memory(&self) -> Box<dyn Storage>;
fn to_in_memory(self) -> StorageEnum;

fn metrics(&self) -> StorageMetrics;
}
Expand All @@ -38,6 +38,11 @@ pub struct StorageMetrics {
pub lost_files: u32,
}

pub enum StorageEnum {
InMemory(InMemoryStorage),
Directory(DirectoryStorage),
}

////////////////////////////////////////////////////////////////////////////////////////////////////

/// Incoming data is written to `write_buffer`
Expand Down Expand Up @@ -120,9 +125,7 @@ impl Storage for InMemoryStorage {
Ok(())
}

fn to_in_memory(&self) -> Box<dyn Storage> {
Box::new(self.clone())
}
fn to_in_memory(self) -> StorageEnum { StorageEnum::InMemory(self) }

fn metrics(&self) -> StorageMetrics {
let result = StorageMetrics {
Expand Down Expand Up @@ -341,11 +344,11 @@ impl Storage for DirectoryStorage {
return result;
}

fn to_in_memory(&self) -> Box<dyn Storage> {
Box::new(InMemoryStorage {
fn to_in_memory(self) -> StorageEnum {
StorageEnum::InMemory(InMemoryStorage {
name: self.name().to_owned(),
read_buffer: self.read_buffer.clone(),
write_buffer: self.write_buffer.clone(),
read_buffer: self.read_buffer,
write_buffer: self.write_buffer,
buf_size: self.max_file_size,
max_packet_size: self.max_packet_size,
lost_files: self.lost_files,
Expand Down Expand Up @@ -595,4 +598,50 @@ pub mod test {
payload: Bytes::from(i.to_string()),
}
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////

impl Storage for StorageEnum {
fn name(&self) -> &str {
match self {
StorageEnum::InMemory(a) => a.name(),
StorageEnum::Directory(a) => a.name(),
}
}

fn read_packet(&mut self) -> Result<Publish, StorageReadError> {
match self {
StorageEnum::InMemory(a) => a.read_packet(),
StorageEnum::Directory(a) => a.read_packet(),
}
}

fn write_packet(&mut self, packet: Publish) -> Result<(), StorageWriteError> {
match self {
StorageEnum::InMemory(a) => a.write_packet(packet),
StorageEnum::Directory(a) => a.write_packet(packet),
}
}

fn flush(&mut self) -> Result<(), StorageFlushError> {
match self {
StorageEnum::InMemory(a) => a.flush(),
StorageEnum::Directory(a) => a.flush(),
}
}

fn to_in_memory(self) -> StorageEnum {
match self {
StorageEnum::InMemory(a) => a.to_in_memory(),
StorageEnum::Directory(a) => a.to_in_memory(),
}
}

fn metrics(&self) -> StorageMetrics {
match self {
StorageEnum::InMemory(a) => a.metrics(),
StorageEnum::Directory(a) => a.metrics(),
}
}
}
16 changes: 9 additions & 7 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl FileDownloader {
Ok(s) => s,
Err(Error::NoSave) => return,
Err(e) => {
warn!("Couldn't reload current_download: {e}");
warn!("Couldn't reload current_download: {e:?}");
return;
}
};
Expand Down Expand Up @@ -267,7 +267,7 @@ impl FileDownloader {

Ok(_) = shutdown_rx.recv_async(), if !shutdown_rx.is_disconnected() => {
if let Err(e) = state.save(&self.config) {
error!("Error saving current_download: {e}");
error!("Error saving current_download: {e:?}");
}

return DownloadResult::Suspended;
Expand All @@ -291,16 +291,18 @@ impl FileDownloader {

// A download must be retried with Range header when HTTP/reqwest errors are faced
async fn continuous_retry(&self, state: &mut DownloadState) -> Result<(), Error> {
'outer: loop {
'outer: for idx in 1..=20 {
log::info!("download attempt {idx}");
let mut req = self.client.get(&state.current.meta.url);
if let Some(range) = state.retry_range() {
warn!("Retrying download; Continuing to download file from: {range}");
req = req.header("Range", range);
}
let mut stream = match req.send().await {
Ok(s) => s.error_for_status()?.bytes_stream(),
let mut stream = match req.send().await.context("network issue")
.and_then(|s| s.error_for_status().context("request failed") ) {
Ok(s) => s.bytes_stream(),
Err(e) => {
error!("Download failed: {e}");
error!("Download failed: {e:?}");
// Retry after wait
sleep(Duration::from_secs(1)).await;
continue 'outer;
Expand All @@ -324,7 +326,7 @@ impl FileDownloader {
ActionResponse::progress(&self.action_id, "Download Failed", 0)
.add_error(e.to_string());
self.bridge_tx.send_action_response(status).await;
error!("Download failed: {e}");
error!("Download failed: {e:?}");
// Retry after wait
sleep(Duration::from_secs(1)).await;
continue 'outer;
Expand Down
6 changes: 5 additions & 1 deletion uplink/src/collector/journalctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ impl LogLevel {
}
}

fn default_tag() -> String {
"system".to_owned()
}

#[derive(Deserialize)]
struct JournaldEntry {
#[serde(rename = "PRIORITY")]
Expand All @@ -63,7 +67,7 @@ struct JournaldEntry {
#[serde(rename = "__REALTIME_TIMESTAMP")]
log_timestamp: String,

#[serde(rename = "SYSLOG_IDENTIFIER")]
#[serde(rename = "SYSLOG_IDENTIFIER", default = "default_tag")]
tag: String,

#[serde(rename = "MESSAGE")]
Expand Down
8 changes: 3 additions & 5 deletions uplink/src/collector/tunshell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,9 @@ impl TunshellClient {
let session = self.clone();
//TODO(RT): Findout why this is spawned. We want to send other action's with shell?
tokio::spawn(async move {
if let Err(e) = session.session(&action).await {
error!("{e}");
let status = ActionResponse::failure(&action.action_id, e.to_string());
session.bridge.send_action_response(status).await;
}
let _ = session.session(&action).await;
let status = ActionResponse::success(&action.action_id);
session.bridge.send_action_response(status).await;
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl Uplink {
let mqtt_client = mqtt.client();
let ctrl_mqtt = mqtt.ctrl_tx();

let tenant_filter = format!("/tenants/{}/", device_config.project_id);
let tenant_filter = format!("/tenants/{}/devices/{}", device_config.project_id, device_config.device_id);
let (serializer_shutdown_tx, serializer_shutdown_rx) = flume::bounded(1);

let (ctrl_tx, ctrl_rx) = bounded(1);
Expand Down
2 changes: 1 addition & 1 deletion uplink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl CommandLine {
let levels =
match self.modules.clone().into_iter().reduce(|e, acc| format!("{e}={level},{acc}")) {
Some(f) => format!("{f}={level}"),
_ => format!("uplink={level},storage={level}"),
_ => format!("{level}"),
};

let builder = tracing_subscriber::fmt()
Expand Down

0 comments on commit b78edca

Please sign in to comment.