Skip to content

Commit

Permalink
fix: reset all jobs when starting up
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Jun 5, 2024
1 parent feba99a commit 20a38d1
Showing 1 changed file with 36 additions and 3 deletions.
39 changes: 36 additions & 3 deletions modules/importer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ pub mod report;
pub mod sbom;

use crate::{
model::{Importer, ImporterConfiguration},
model::{Importer, ImporterConfiguration, State},
server::report::{Report, ScannerError},
service::ImporterService,
};
use std::path::PathBuf;
use std::time::Duration;
use std::{path::PathBuf, time::Duration};
use time::OffsetDateTime;
use tokio::time::MissedTickBehavior;
use tracing::instrument;
Expand Down Expand Up @@ -48,6 +47,7 @@ impl From<Report> for RunOutput {
}
}

/// Single node, single process importer processor.
struct Server {
db: Database,
storage: DispatchBackend,
Expand All @@ -59,6 +59,8 @@ impl Server {
async fn run(&self) -> anyhow::Result<()> {
let service = ImporterService::new(self.db.clone());

self.reset_all_jobs(&service).await?;

let mut interval = tokio::time::interval(Duration::from_secs(1));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

Expand Down Expand Up @@ -155,6 +157,37 @@ impl Server {

Ok(Some(result))
}

/// Reset all jobs back into non-running state.
///
/// This is intended when the application starts up, to reset stale job states. Making it
/// possible to re-run them when they are due.
///
/// **NOTE:** we can only do this as we're intended to be a single-process worker.
async fn reset_all_jobs(&self, service: &ImporterService) -> anyhow::Result<()> {
for importer in service.list().await? {
if importer.data.state == State::Running {
log::info!(
"Cleaning up stale importer job during startup: {} (since: {})",
importer.name,
importer.data.last_change
);
service
.update_finish(
&importer.name,
None,
// either use the last run, or fall back to the last time the state changed
importer.data.last_run.unwrap_or(importer.data.last_change),
Some("Import cancelled".into()),
None,
None,
)
.await?;
}
}

Ok(())
}
}

/// check if we need to run or skip the importer
Expand Down

0 comments on commit 20a38d1

Please sign in to comment.