From 51b75bf3b7038e0101c05a50e25b7000592f4ff4 Mon Sep 17 00:00:00 2001 From: Nipunn Koorapati Date: Mon, 18 Nov 2024 06:17:49 -0700 Subject: [PATCH] Split file storage into its own file (#31618) GitOrigin-RevId: e44530388e9f84ad1db60f7faa526a31ea946581 --- .../snapshot_import/import_file_storage.rs | 274 +++++++++++++++++ crates/application/src/snapshot_import/mod.rs | 287 +----------------- .../src/snapshot_import/progress.rs | 74 +++++ 3 files changed, 356 insertions(+), 279 deletions(-) create mode 100644 crates/application/src/snapshot_import/import_file_storage.rs create mode 100644 crates/application/src/snapshot_import/progress.rs diff --git a/crates/application/src/snapshot_import/import_file_storage.rs b/crates/application/src/snapshot_import/import_file_storage.rs new file mode 100644 index 00000000..37cd5eb5 --- /dev/null +++ b/crates/application/src/snapshot_import/import_file_storage.rs @@ -0,0 +1,274 @@ +use std::{ + collections::BTreeMap, + pin::Pin, + str::FromStr, +}; + +use anyhow::Context; +use common::{ + components::ComponentPath, + document::{ + CreationTime, + CREATION_TIME_FIELD, + ID_FIELD, + }, + ext::{ + PeekableExt, + TryPeekableExt, + }, + pause::PauseClient, + runtime::Runtime, + types::StorageUuid, +}; +use database::{ + Database, + ImportFacingModel, +}; +use errors::ErrorMetadata; +use file_storage::FileStorage; +use futures::{ + stream::{ + BoxStream, + Peekable, + }, + TryStreamExt, +}; +use headers::{ + ContentLength, + ContentType, +}; +use keybroker::Identity; +use model::{ + file_storage::{ + FILE_STORAGE_TABLE, + FILE_STORAGE_VIRTUAL_TABLE, + }, + snapshot_imports::types::ImportRequestor, +}; +use thousands::Separable; +use usage_tracking::{ + FunctionUsageTracker, + StorageUsageTracker, + UsageCounter, +}; +use value::{ + id_v6::DeveloperDocumentId, + sha256::Sha256Digest, + val, + ConvexObject, + ResolvedDocumentId, + TabletIdAndTableNumber, +}; + +use crate::{ + export_worker::FileStorageZipMetadata, + snapshot_import::{ + import_error::ImportError, + parse::ImportUnit, + progress::{ + add_checkpoint_message, + best_effort_update_progress_message, + }, + }, +}; + +pub async fn import_storage_table( + database: &Database, + file_storage: &FileStorage, + identity: &Identity, + table_id: TabletIdAndTableNumber, + component_path: &ComponentPath, + mut objects: Pin<&mut Peekable>>>, + usage: &dyn StorageUsageTracker, + import_id: Option, + num_to_skip: u64, + requestor: ImportRequestor, + usage_tracking: &UsageCounter, +) -> anyhow::Result<()> { + let snapshot = database.latest_snapshot()?; + let namespace = snapshot + .table_mapping() + .tablet_namespace(table_id.tablet_id)?; + let virtual_table_number = snapshot.table_mapping().tablet_number(table_id.tablet_id)?; + let mut lineno = 0; + let mut storage_metadata = BTreeMap::new(); + while let Some(ImportUnit::Object(exported_value)) = objects + .as_mut() + .try_next_if(|line| matches!(line, ImportUnit::Object(_))) + .await? + { + lineno += 1; + let metadata: FileStorageZipMetadata = serde_json::from_value(exported_value) + .map_err(|e| ImportError::InvalidConvexValue(lineno, e.into()))?; + let id = DeveloperDocumentId::decode(&metadata.id) + .map_err(|e| ImportError::InvalidConvexValue(lineno, e.into()))?; + anyhow::ensure!( + id.table() == virtual_table_number, + ErrorMetadata::bad_request( + "InvalidId", + format!( + "_storage entry has invalid ID {id} ({:?} != {:?})", + id.table(), + virtual_table_number + ) + ) + ); + let content_length = metadata.size.map(|size| ContentLength(size as u64)); + let content_type = metadata + .content_type + .map(|content_type| anyhow::Ok(ContentType::from_str(&content_type)?)) + .transpose() + .map_err(|e| ImportError::InvalidConvexValue(lineno, e))?; + let sha256 = metadata + .sha256 + .map(|sha256| Sha256Digest::from_base64(&sha256)) + .transpose() + .map_err(|e| ImportError::InvalidConvexValue(lineno, e))?; + let storage_id = metadata + .internal_id + .map(|storage_id| { + StorageUuid::from_str(&storage_id).context("Couldn't parse storage_id") + }) + .transpose() + .map_err(|e| ImportError::InvalidConvexValue(lineno, e))?; + let creation_time = metadata + .creation_time + .map(CreationTime::try_from) + .transpose() + .map_err(|e| ImportError::InvalidConvexValue(lineno, e))?; + + storage_metadata.insert( + id, + ( + content_length, + content_type, + sha256, + storage_id, + creation_time, + ), + ); + } + let total_num_files = storage_metadata.len(); + let mut num_files = 0; + while let Some(Ok(ImportUnit::StorageFileChunk(id, _))) = objects.as_mut().peek().await { + let id = *id; + // The or_default means a storage file with a valid id will be imported + // even if it has been explicitly removed from _storage/documents.jsonl, + // to be robust to manual modifications. + let (content_length, content_type, expected_sha256, storage_id, creation_time) = + storage_metadata.remove(&id).unwrap_or_default(); + let file_chunks = objects + .as_mut() + .peeking_take_while(move |unit| match unit { + Ok(ImportUnit::StorageFileChunk(chunk_id, _)) => *chunk_id == id, + Err(_) => true, + Ok(_) => false, + }) + .try_filter_map(|unit| async move { + match unit { + ImportUnit::StorageFileChunk(_, chunk) => Ok(Some(chunk)), + _ => Ok(None), + } + }); + let mut entry = file_storage + .transactional_file_storage + .upload_file(content_length, content_type, file_chunks, expected_sha256) + .await?; + if let Some(storage_id) = storage_id { + entry.storage_id = storage_id; + } + if num_files < num_to_skip { + num_files += 1; + continue; + } + let file_size = entry.size as u64; + database + .execute_with_overloaded_retries( + identity.clone(), + FunctionUsageTracker::new(), + PauseClient::new(), + "snapshot_import_storage_table", + |tx| { + async { + // Assume table numbers of _storage and _file_storage aren't changing with + // this import. + let table_mapping = tx.table_mapping().clone(); + let physical_id = tx + .virtual_system_mapping() + .virtual_id_v6_to_system_resolved_doc_id( + namespace, + &id, + &table_mapping, + )?; + let mut entry_object_map = + BTreeMap::from(ConvexObject::try_from(entry.clone())?); + entry_object_map.insert(ID_FIELD.clone().into(), val!(physical_id)); + if let Some(creation_time) = creation_time { + entry_object_map.insert( + CREATION_TIME_FIELD.clone().into(), + val!(f64::from(creation_time)), + ); + } + let entry_object = ConvexObject::try_from(entry_object_map)?; + ImportFacingModel::new(tx) + .insert(table_id, &FILE_STORAGE_TABLE, entry_object, &table_mapping) + .await?; + Ok(()) + } + .into() + }, + ) + .await?; + let content_type = entry + .content_type + .as_ref() + .map(|ct| ct.parse()) + .transpose()?; + usage.track_storage_call( + component_path.clone(), + requestor.usage_tag(), + entry.storage_id, + content_type, + entry.sha256, + ); + usage_tracking.track_independent_storage_ingress_size( + component_path.clone(), + requestor.usage_tag().to_string(), + file_size, + ); + num_files += 1; + if let Some(import_id) = import_id { + best_effort_update_progress_message( + database, + identity, + import_id, + format!( + "Importing \"_storage\" ({}/{} files)", + num_files.separate_with_commas(), + total_num_files.separate_with_commas() + ), + component_path, + &FILE_STORAGE_VIRTUAL_TABLE, + num_files as i64, + ) + .await; + } + } + if let Some(import_id) = import_id { + add_checkpoint_message( + database, + identity, + import_id, + format!( + "Imported \"_storage\"{} ({} files)", + component_path.in_component_str(), + num_files.separate_with_commas() + ), + component_path, + &FILE_STORAGE_VIRTUAL_TABLE, + num_files as i64, + ) + .await?; + } + Ok(()) +} diff --git a/crates/application/src/snapshot_import/mod.rs b/crates/application/src/snapshot_import/mod.rs index 857b72ff..8ac70b13 100644 --- a/crates/application/src/snapshot_import/mod.rs +++ b/crates/application/src/snapshot_import/mod.rs @@ -5,7 +5,6 @@ use std::{ HashSet, }, pin::Pin, - str::FromStr, sync::Arc, time::Duration, }; @@ -24,14 +23,10 @@ use common::{ document::{ CreationTime, ParsedDocument, - CREATION_TIME_FIELD, ID_FIELD, }, execution_context::ExecutionId, - ext::{ - PeekableExt, - TryPeekableExt, - }, + ext::TryPeekableExt, knobs::{ MAX_IMPORT_AGE, TRANSACTION_MAX_NUM_USER_WRITES, @@ -42,7 +37,6 @@ use common::{ types::{ MemberId, ObjectKey, - StorageUuid, TableName, UdfIdentifier, }, @@ -71,10 +65,6 @@ use futures::{ StreamExt, TryStreamExt, }; -use headers::{ - ContentLength, - ContentType, -}; use keybroker::Identity; use model::{ deployment_audit_log::{ @@ -113,13 +103,10 @@ use thousands::Separable; use usage_tracking::{ CallType, FunctionUsageTracker, - StorageUsageTracker, UsageCounter, }; use value::{ id_v6::DeveloperDocumentId, - sha256::Sha256Digest, - val, ConvexObject, ConvexValue, IdentifierFieldName, @@ -133,7 +120,6 @@ use value::{ }; use crate::{ - export_worker::FileStorageZipMetadata, metrics::log_snapshot_import_age, snapshot_import::{ confirmation::info_message_for_import, @@ -141,11 +127,16 @@ use crate::{ wrap_import_err, ImportError, }, + import_file_storage::import_storage_table, parse::{ parse_objects, ImportUnit, }, prepare_component::prepare_component_for_import, + progress::{ + add_checkpoint_message, + best_effort_update_progress_message, + }, schema_constraints::{ schemas_for_import, ImportSchemaConstraints, @@ -157,8 +148,10 @@ use crate::{ mod confirmation; mod import_error; +mod import_file_storage; mod parse; mod prepare_component; +mod progress; mod schema_constraints; mod table_change; #[cfg(test)] @@ -694,69 +687,6 @@ pub async fn clear_tables( Ok(documents_deleted) } -async fn best_effort_update_progress_message( - database: &Database, - identity: &Identity, - import_id: ResolvedDocumentId, - progress_message: String, - component_path: &ComponentPath, - display_table_name: &TableName, - num_rows_written: i64, -) { - // Ignore errors because it's not worth blocking or retrying if we can't - // send a nice progress message on the first try. - let _result: anyhow::Result<()> = try { - let mut tx = database.begin(identity.clone()).await?; - let mut import_model = SnapshotImportModel::new(&mut tx); - import_model - .update_progress_message( - import_id, - progress_message, - component_path, - display_table_name, - num_rows_written, - ) - .await?; - database - .commit_with_write_source(tx, "snapshot_update_progress_msg") - .await?; - }; -} - -async fn add_checkpoint_message( - database: &Database, - identity: &Identity, - import_id: ResolvedDocumentId, - checkpoint_message: String, - component_path: &ComponentPath, - display_table_name: &TableName, - num_rows_written: i64, -) -> anyhow::Result<()> { - database - .execute_with_overloaded_retries( - identity.clone(), - FunctionUsageTracker::new(), - PauseClient::new(), - "snapshot_import_add_checkpoint_message", - |tx| { - async { - SnapshotImportModel::new(tx) - .add_checkpoint_message( - import_id, - checkpoint_message.clone(), - component_path, - display_table_name, - num_rows_written, - ) - .await - } - .into() - }, - ) - .await?; - Ok(()) -} - async fn import_objects( database: &Database, file_storage: &FileStorage, @@ -1033,207 +963,6 @@ async fn import_tables_table( Ok(()) } -async fn import_storage_table( - database: &Database, - file_storage: &FileStorage, - identity: &Identity, - table_id: TabletIdAndTableNumber, - component_path: &ComponentPath, - mut objects: Pin<&mut Peekable>>>, - usage: &dyn StorageUsageTracker, - import_id: Option, - num_to_skip: u64, - requestor: ImportRequestor, - usage_tracking: &UsageCounter, -) -> anyhow::Result<()> { - let snapshot = database.latest_snapshot()?; - let namespace = snapshot - .table_mapping() - .tablet_namespace(table_id.tablet_id)?; - let virtual_table_number = snapshot.table_mapping().tablet_number(table_id.tablet_id)?; - let mut lineno = 0; - let mut storage_metadata = BTreeMap::new(); - while let Some(ImportUnit::Object(exported_value)) = objects - .as_mut() - .try_next_if(|line| matches!(line, ImportUnit::Object(_))) - .await? - { - lineno += 1; - let metadata: FileStorageZipMetadata = serde_json::from_value(exported_value) - .map_err(|e| ImportError::InvalidConvexValue(lineno, e.into()))?; - let id = DeveloperDocumentId::decode(&metadata.id) - .map_err(|e| ImportError::InvalidConvexValue(lineno, e.into()))?; - anyhow::ensure!( - id.table() == virtual_table_number, - ErrorMetadata::bad_request( - "InvalidId", - format!( - "_storage entry has invalid ID {id} ({:?} != {:?})", - id.table(), - virtual_table_number - ) - ) - ); - let content_length = metadata.size.map(|size| ContentLength(size as u64)); - let content_type = metadata - .content_type - .map(|content_type| anyhow::Ok(ContentType::from_str(&content_type)?)) - .transpose() - .map_err(|e| ImportError::InvalidConvexValue(lineno, e))?; - let sha256 = metadata - .sha256 - .map(|sha256| Sha256Digest::from_base64(&sha256)) - .transpose() - .map_err(|e| ImportError::InvalidConvexValue(lineno, e))?; - let storage_id = metadata - .internal_id - .map(|storage_id| { - StorageUuid::from_str(&storage_id).context("Couldn't parse storage_id") - }) - .transpose() - .map_err(|e| ImportError::InvalidConvexValue(lineno, e))?; - let creation_time = metadata - .creation_time - .map(CreationTime::try_from) - .transpose() - .map_err(|e| ImportError::InvalidConvexValue(lineno, e))?; - - storage_metadata.insert( - id, - ( - content_length, - content_type, - sha256, - storage_id, - creation_time, - ), - ); - } - let total_num_files = storage_metadata.len(); - let mut num_files = 0; - while let Some(Ok(ImportUnit::StorageFileChunk(id, _))) = objects.as_mut().peek().await { - let id = *id; - // The or_default means a storage file with a valid id will be imported - // even if it has been explicitly removed from _storage/documents.jsonl, - // to be robust to manual modifications. - let (content_length, content_type, expected_sha256, storage_id, creation_time) = - storage_metadata.remove(&id).unwrap_or_default(); - let file_chunks = objects - .as_mut() - .peeking_take_while(move |unit| match unit { - Ok(ImportUnit::StorageFileChunk(chunk_id, _)) => *chunk_id == id, - Err(_) => true, - Ok(_) => false, - }) - .try_filter_map(|unit| async move { - match unit { - ImportUnit::StorageFileChunk(_, chunk) => Ok(Some(chunk)), - _ => Ok(None), - } - }); - let mut entry = file_storage - .transactional_file_storage - .upload_file(content_length, content_type, file_chunks, expected_sha256) - .await?; - if let Some(storage_id) = storage_id { - entry.storage_id = storage_id; - } - if num_files < num_to_skip { - num_files += 1; - continue; - } - let file_size = entry.size as u64; - database - .execute_with_overloaded_retries( - identity.clone(), - FunctionUsageTracker::new(), - PauseClient::new(), - "snapshot_import_storage_table", - |tx| { - async { - // Assume table numbers of _storage and _file_storage aren't changing with - // this import. - let table_mapping = tx.table_mapping().clone(); - let physical_id = tx - .virtual_system_mapping() - .virtual_id_v6_to_system_resolved_doc_id( - namespace, - &id, - &table_mapping, - )?; - let mut entry_object_map = - BTreeMap::from(ConvexObject::try_from(entry.clone())?); - entry_object_map.insert(ID_FIELD.clone().into(), val!(physical_id)); - if let Some(creation_time) = creation_time { - entry_object_map.insert( - CREATION_TIME_FIELD.clone().into(), - val!(f64::from(creation_time)), - ); - } - let entry_object = ConvexObject::try_from(entry_object_map)?; - ImportFacingModel::new(tx) - .insert(table_id, &FILE_STORAGE_TABLE, entry_object, &table_mapping) - .await?; - Ok(()) - } - .into() - }, - ) - .await?; - let content_type = entry - .content_type - .as_ref() - .map(|ct| ct.parse()) - .transpose()?; - usage.track_storage_call( - component_path.clone(), - requestor.usage_tag(), - entry.storage_id, - content_type, - entry.sha256, - ); - usage_tracking.track_independent_storage_ingress_size( - component_path.clone(), - requestor.usage_tag().to_string(), - file_size, - ); - num_files += 1; - if let Some(import_id) = import_id { - best_effort_update_progress_message( - database, - identity, - import_id, - format!( - "Importing \"_storage\" ({}/{} files)", - num_files.separate_with_commas(), - total_num_files.separate_with_commas() - ), - component_path, - &FILE_STORAGE_VIRTUAL_TABLE, - num_files as i64, - ) - .await; - } - } - if let Some(import_id) = import_id { - add_checkpoint_message( - database, - identity, - import_id, - format!( - "Imported \"_storage\"{} ({} files)", - component_path.in_component_str(), - num_files.separate_with_commas() - ), - component_path, - &FILE_STORAGE_VIRTUAL_TABLE, - num_files as i64, - ) - .await?; - } - Ok(()) -} - async fn import_single_table( database: &Database, file_storage: &FileStorage, diff --git a/crates/application/src/snapshot_import/progress.rs b/crates/application/src/snapshot_import/progress.rs new file mode 100644 index 00000000..9a9ce169 --- /dev/null +++ b/crates/application/src/snapshot_import/progress.rs @@ -0,0 +1,74 @@ +use common::{ + components::ComponentPath, + pause::PauseClient, + runtime::Runtime, + types::TableName, +}; +use database::Database; +use keybroker::Identity; +use model::snapshot_imports::SnapshotImportModel; +use usage_tracking::FunctionUsageTracker; +use value::ResolvedDocumentId; + +pub async fn best_effort_update_progress_message( + database: &Database, + identity: &Identity, + import_id: ResolvedDocumentId, + progress_message: String, + component_path: &ComponentPath, + display_table_name: &TableName, + num_rows_written: i64, +) { + // Ignore errors because it's not worth blocking or retrying if we can't + // send a nice progress message on the first try. + let _result: anyhow::Result<()> = try { + let mut tx = database.begin(identity.clone()).await?; + let mut import_model = SnapshotImportModel::new(&mut tx); + import_model + .update_progress_message( + import_id, + progress_message, + component_path, + display_table_name, + num_rows_written, + ) + .await?; + database + .commit_with_write_source(tx, "snapshot_update_progress_msg") + .await?; + }; +} + +pub async fn add_checkpoint_message( + database: &Database, + identity: &Identity, + import_id: ResolvedDocumentId, + checkpoint_message: String, + component_path: &ComponentPath, + display_table_name: &TableName, + num_rows_written: i64, +) -> anyhow::Result<()> { + database + .execute_with_overloaded_retries( + identity.clone(), + FunctionUsageTracker::new(), + PauseClient::new(), + "snapshot_import_add_checkpoint_message", + |tx| { + async { + SnapshotImportModel::new(tx) + .add_checkpoint_message( + import_id, + checkpoint_message.clone(), + component_path, + display_table_name, + num_rows_written, + ) + .await + } + .into() + }, + ) + .await?; + Ok(()) +}