diff --git a/crates/application/src/snapshot_import/mod.rs b/crates/application/src/snapshot_import/mod.rs index da4f7b29..62615767 100644 --- a/crates/application/src/snapshot_import/mod.rs +++ b/crates/application/src/snapshot_import/mod.rs @@ -197,22 +197,13 @@ struct SnapshotImportExecutor { } impl SnapshotImportExecutor { - async fn parse_and_mark_waiting_for_confirmation( + async fn handle_uploaded_state( &self, snapshot_import: ParsedDocument, ) -> anyhow::Result<()> { + anyhow::ensure!(snapshot_import.state == ImportState::Uploaded); + tracing::info!("Marking snapshot export as WaitingForConfirmation"); let import_id = snapshot_import.id(); - match snapshot_import.state { - ImportState::Uploaded => { - // Can make progress. Continue. - }, - ImportState::Completed { .. } - | ImportState::Failed(..) - | ImportState::InProgress { .. } - | ImportState::WaitingForConfirmation { .. } => { - anyhow::bail!("unexpected state {snapshot_import:?}"); - }, - } self.fail_if_too_old(&snapshot_import)?; match self.info_message_for_import(snapshot_import).await { Ok((info_message, require_manual_confirmation, new_checkpoints)) => { @@ -462,22 +453,15 @@ impl SnapshotImportExecutor { Ok((message_lines, require_manual_confirmation, new_checkpoints)) } - async fn attempt_perform_import_and_mark_done( + async fn handle_in_progress_state( &mut self, snapshot_import: ParsedDocument, ) -> anyhow::Result<()> { + anyhow::ensure!(matches!( + snapshot_import.state, + ImportState::InProgress { .. } + )); let import_id = snapshot_import.id(); - match snapshot_import.state { - ImportState::InProgress { .. } => { - // Can make progress. Continue. - }, - ImportState::Completed { .. } - | ImportState::Failed(..) - | ImportState::Uploaded - | ImportState::WaitingForConfirmation { .. } => { - anyhow::bail!("unexpected state {snapshot_import:?}"); - }, - } match self.attempt_perform_import(snapshot_import).await { Ok((ts, num_rows_written)) => { self.database diff --git a/crates/application/src/snapshot_import/worker.rs b/crates/application/src/snapshot_import/worker.rs index c3a26bd3..5118cd7d 100644 --- a/crates/application/src/snapshot_import/worker.rs +++ b/crates/application/src/snapshot_import/worker.rs @@ -73,27 +73,26 @@ impl SnapshotImportWorker { let status = log_worker_starting("SnapshotImport"); let mut tx = executor.database.begin(Identity::system()).await?; let mut import_model = SnapshotImportModel::new(&mut tx); - if let Some(import_uploaded) = import_model.import_in_state(ImportState::Uploaded).await? { - tracing::info!("Marking snapshot export as WaitingForConfirmation"); - executor - .parse_and_mark_waiting_for_confirmation(import_uploaded) - .await?; - } else if let Some(import_in_progress) = import_model + let import_uploaded = import_model.import_in_state(ImportState::Uploaded).await?; + let import_in_progress = import_model .import_in_state(ImportState::InProgress { progress_message: String::new(), checkpoint_messages: vec![], }) - .await? - { + .await?; + let token = tx.into_token()?; + + if let Some(import_uploaded) = import_uploaded { + executor.handle_uploaded_state(import_uploaded).await?; + } else if let Some(import_in_progress) = import_in_progress { tracing::info!("Executing in-progress snapshot import"); let timer = snapshot_import_timer(); executor - .attempt_perform_import_and_mark_done(import_in_progress) + .handle_in_progress_state(import_in_progress) .await?; timer.finish(); } drop(status); - let token = tx.into_token()?; let subscription = executor.database.subscribe(token).await?; subscription.wait_for_invalidation().await; Ok(())