Skip to content

Commit

Permalink
fix: handle edge case of inflight packets
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Dec 11, 2023
1 parent 478a2fb commit 7bd9d29
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 6 deletions.
47 changes: 41 additions & 6 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{sync::Arc, time::Duration};

use bytes::{Bytes, BytesMut};
use flume::{Receiver, RecvError, Sender};
use log::{debug, error, info, trace};
use log::{debug, error, info, trace, warn};
use lz4_flex::frame::FrameEncoder;
use rumqttc::*;
use storage::{PersistenceFile, Storage};
Expand Down Expand Up @@ -58,6 +58,8 @@ pub enum Error {
EmptyStorage,
#[error("Permission denied while accessing persistence directory \"{0}\"")]
Persistence(String),
#[error("Mqtt error {0}")]
Mqtt(#[from] rumqttc::mqttbytes::Error),
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -496,6 +498,18 @@ impl<C: MqttClient> Serializer<C> {

let mut buf = BytesMut::new();
file.read(&mut buf)?;

// `unhandled` contains packets that couldn't be taken out of inflight file and pushed onto network
// during last uplink execution. Merge the files into one and handle them here.
let mut unhandled =
PersistenceFile::new(&self.config.persistence_path, "unhandled".to_string())?;
if unhandled.path().is_file() {
let mut new_buf = BytesMut::new();
unhandled.read(&mut new_buf)?;
buf.extend(new_buf);
unhandled.delete()?;
}

let max_packet_size = self.config.mqtt.max_packet_size;
let client = self.client.clone();

Expand Down Expand Up @@ -555,8 +569,6 @@ impl<C: MqttClient> Serializer<C> {
// indefinitely write to disk to not loose data.
let client = match o {
Ok(c) => c,
// TODO: while we have to transition into crash mode, it's better not to write any inflight packets onto disk.
// This can be achieved with the serializer shutdown mode introduced in bytebeamio/uplink#311
Err(MqttError::Send(Request::Publish(publish))) => break Ok(Status::EventLoopCrash(publish, last_publish_stream)),
Err(e) => unreachable!("Unexpected error: {}", e),
};
Expand Down Expand Up @@ -592,10 +604,33 @@ impl<C: MqttClient> Serializer<C> {
&self.storage_handler,
);

info!("Read and published inflight packets; removing file: {}", path.display());
file.delete()?;
match v {
Ok(Status::EventLoopReady) => {
info!("Read and published inflight packets; removing file: {}", path.display());
file.delete()?;

v
Ok(Status::EventLoopReady)
}
// Write packets yet to be handled back into the unhandled file
Ok(Status::EventLoopCrash(publish, stream)) => {
let mut new_buf = BytesMut::new();
publish.write(&mut new_buf)?;
new_buf.extend(buf);

let mut file =
PersistenceFile::new(&self.config.persistence_path, "unhandled".to_string())?;
warn!(
"Couldn't finish handling inflight packets; Writing them back into: {}",
file.path().display()
);
file.write(&mut new_buf)?;

// TODO: while we have to transition into crash mode, it's better not to write any inflight packets into storages.
// This can be achieved with the serializer shutdown mode introduced in bytebeamio/uplink#311
Ok(Status::EventLoopCrash(publish, stream))
}
v => v,
}
}

async fn normal(&mut self) -> Result<Status, Error> {
Expand Down
1 change: 1 addition & 0 deletions vd-lib
Submodule vd-lib added at cdaec8

0 comments on commit 7bd9d29

Please sign in to comment.