Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: mark parallel actions as failed on restart #349

Merged
merged 2 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub enum Error {
FailedCancellation,
#[error("Action cancelled by action_id: {0}")]
Cancelled(String),
#[error("Uplink restarted before action response")]
Restart,
}

pub struct ActionsBridge {
Expand Down Expand Up @@ -232,6 +234,14 @@ impl ActionsBridge {
if let Err(e) = self.save_current_action() {
error!("Failed to save current action: {e}");
}

// NOTE: marks parallel actions still in execution as failed
// (serializer will persist on disk even if network is down)
let parallel_actions: Vec<String> = self.parallel_actions.drain().collect();
for action_id in parallel_actions {
self.forward_action_error(&action_id, Error::Restart).await
}

// NOTE: there might be events still waiting for recv on bridge_rx
self.shutdown_handle.send(()).unwrap();

Expand Down
7 changes: 5 additions & 2 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,15 @@ struct StorageHandler {
impl StorageHandler {
fn new(config: Arc<Config>) -> Result<Self, Error> {
let mut map = BTreeMap::new();
for (stream_name, stream_config) in config.streams.iter() {
let mut streams = config.streams.clone();
// NOTE: persist action_status if not configured otherwise
streams.insert("action_status".into(), config.action_status.clone());
for (stream_name, stream_config) in streams {
let mut storage =
Storage::new(&stream_config.topic, stream_config.persistence.max_file_size);
if stream_config.persistence.max_file_count > 0 {
let mut path = config.persistence_path.clone();
path.push(stream_name);
path.push(&stream_name);

std::fs::create_dir_all(&path).map_err(|_| {
Error::Persistence(config.persistence_path.to_string_lossy().to_string())
Expand Down
1 change: 1 addition & 0 deletions uplink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const DEFAULT_CONFIG: &str = r#"
topic = "/tenants/{tenant_id}/devices/{device_id}/action/status"
batch_size = 1
flush_period = 2
persistence = { max_file_count = 1 } # Ensures action responses are not lost on restarts
priority = 255 # highest priority for quick delivery of action status info to platform
[streams.device_shadow]
Expand Down
Loading