Skip to content

Commit

Permalink
Rename the main snapshot import methods (#31580)
Browse files Browse the repository at this point in the history
The old method names had "and" in the title which is probably not a good
sign.

GitOrigin-RevId: 3730ede8131eaee3b96cfc0bc5e6252bce8f591a
  • Loading branch information
nipunn1313 authored and Convex, Inc. committed Nov 14, 2024
1 parent 4b60f88 commit dba7e83
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 34 deletions.
32 changes: 8 additions & 24 deletions crates/application/src/snapshot_import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,13 @@ struct SnapshotImportExecutor<RT: Runtime> {
}

impl<RT: Runtime> SnapshotImportExecutor<RT> {
async fn parse_and_mark_waiting_for_confirmation(
async fn handle_uploaded_state(
&self,
snapshot_import: ParsedDocument<SnapshotImport>,
) -> anyhow::Result<()> {
anyhow::ensure!(snapshot_import.state == ImportState::Uploaded);
tracing::info!("Marking snapshot export as WaitingForConfirmation");
let import_id = snapshot_import.id();
match snapshot_import.state {
ImportState::Uploaded => {
// Can make progress. Continue.
},
ImportState::Completed { .. }
| ImportState::Failed(..)
| ImportState::InProgress { .. }
| ImportState::WaitingForConfirmation { .. } => {
anyhow::bail!("unexpected state {snapshot_import:?}");
},
}
self.fail_if_too_old(&snapshot_import)?;
match self.info_message_for_import(snapshot_import).await {
Ok((info_message, require_manual_confirmation, new_checkpoints)) => {
Expand Down Expand Up @@ -462,22 +453,15 @@ impl<RT: Runtime> SnapshotImportExecutor<RT> {
Ok((message_lines, require_manual_confirmation, new_checkpoints))
}

async fn attempt_perform_import_and_mark_done(
async fn handle_in_progress_state(
&mut self,
snapshot_import: ParsedDocument<SnapshotImport>,
) -> anyhow::Result<()> {
anyhow::ensure!(matches!(
snapshot_import.state,
ImportState::InProgress { .. }
));
let import_id = snapshot_import.id();
match snapshot_import.state {
ImportState::InProgress { .. } => {
// Can make progress. Continue.
},
ImportState::Completed { .. }
| ImportState::Failed(..)
| ImportState::Uploaded
| ImportState::WaitingForConfirmation { .. } => {
anyhow::bail!("unexpected state {snapshot_import:?}");
},
}
match self.attempt_perform_import(snapshot_import).await {
Ok((ts, num_rows_written)) => {
self.database
Expand Down
19 changes: 9 additions & 10 deletions crates/application/src/snapshot_import/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,26 @@ impl SnapshotImportWorker {
let status = log_worker_starting("SnapshotImport");
let mut tx = executor.database.begin(Identity::system()).await?;
let mut import_model = SnapshotImportModel::new(&mut tx);
if let Some(import_uploaded) = import_model.import_in_state(ImportState::Uploaded).await? {
tracing::info!("Marking snapshot export as WaitingForConfirmation");
executor
.parse_and_mark_waiting_for_confirmation(import_uploaded)
.await?;
} else if let Some(import_in_progress) = import_model
let import_uploaded = import_model.import_in_state(ImportState::Uploaded).await?;
let import_in_progress = import_model
.import_in_state(ImportState::InProgress {
progress_message: String::new(),
checkpoint_messages: vec![],
})
.await?
{
.await?;
let token = tx.into_token()?;

if let Some(import_uploaded) = import_uploaded {
executor.handle_uploaded_state(import_uploaded).await?;
} else if let Some(import_in_progress) = import_in_progress {
tracing::info!("Executing in-progress snapshot import");
let timer = snapshot_import_timer();
executor
.attempt_perform_import_and_mark_done(import_in_progress)
.handle_in_progress_state(import_in_progress)
.await?;
timer.finish();
}
drop(status);
let token = tx.into_token()?;
let subscription = executor.database.subscribe(token).await?;
subscription.wait_for_invalidation().await;
Ok(())
Expand Down

0 comments on commit dba7e83

Please sign in to comment.