From 3a87e07739ca2ce36c34863c9076b70abfb52112 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Thu, 5 Dec 2024 18:06:20 +0100 Subject: [PATCH] Validate object store uploads --- .../src/partition/snapshots/repository.rs | 192 ++++++++++++------ 1 file changed, 130 insertions(+), 62 deletions(-) diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index f1c3db10e..a386c5410 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -53,7 +53,7 @@ pub struct SnapshotRepository { } #[serde_as] -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct LatestSnapshot { pub version: SnapshotFormatVersion, @@ -80,6 +80,21 @@ pub struct LatestSnapshot { pub path: String, } +impl LatestSnapshot { + pub fn from_snapshot(snapshot: &PartitionSnapshotMetadata, path: String) -> Self { + LatestSnapshot { + version: snapshot.version, + cluster_name: snapshot.cluster_name.clone(), + node_name: snapshot.node_name.clone(), + partition_id: snapshot.partition_id, + snapshot_id: snapshot.snapshot_id, + created_at: snapshot.created_at.clone(), + min_applied_lsn: snapshot.min_applied_lsn, + path, + } + } +} + impl SnapshotRepository { /// Creates an instance of the repository if a snapshots destination is configured. pub async fn create_if_configured( @@ -96,42 +111,7 @@ impl SnapshotRepository { .inspect(|params| info!("Snapshot destination parameters ignored: {params}")); destination.set_query(None); - // We use the AWS SDK configuration and credentials provider so that the conventional AWS - // environment variables and config files work as expected. The object_store crate has its - // own configuration mechanism which doesn't support many of the AWS conventions. This - // differs quite a lot from the Lambda invoker which uses the AWS SDK, and that would be a - // very surprising inconsistency for customers. This mechanism allows us to infer the region - // and securely obtain session credentials without any hard-coded configuration. - let object_store: Arc = if destination.scheme() == "s3" { - debug!("Using AWS SDK credentials provider"); - let aws_region = aws_config::load_defaults(BehaviorVersion::v2024_03_28()) - .await - .region() - .context("Unable to determine AWS region to use with S3")? - .clone(); - - let store = AmazonS3Builder::new() - .with_url(destination.clone()) - .with_region(aws_region.to_string()) - .with_conditional_put(S3ConditionalPut::ETagMatch) - .with_credentials(Arc::new(AwsSdkCredentialsProvider { - credentials_provider: DefaultCredentialsChain::builder().build().await, - })) - .with_retry(object_store::RetryConfig { - max_retries: 8, - retry_timeout: Duration::from_secs(60), - backoff: object_store::BackoffConfig { - init_backoff: Duration::from_millis(100), - max_backoff: Duration::from_secs(5), - base: 2., - }, - }) - .build()?; - - Arc::new(store) - } else { - object_store::parse_url(&destination)?.0.into() - }; + let object_store = create_object_store_client(destination.clone()).await?; // The prefix must be stripped of any leading slash and, unless it is empty, must end in a // single "/" character. @@ -207,13 +187,9 @@ impl SnapshotRepository { ) -> Result<(), PutSnapshotError> { // A unique snapshot path within the partition prefix. We pad the LSN to ensure correct // lexicographic sorting. - let relative_snapshot_path = format!( - "lsn_{lsn:020}-{snapshot_id}", - lsn = snapshot.min_applied_lsn, - snapshot_id = snapshot.snapshot_id - ); + let snapshot_prefix = Self::get_snapshot_prefix(snapshot); let full_snapshot_path = format!( - "{prefix}{partition_id}/{relative_snapshot_path}", + "{prefix}{partition_id}/{snapshot_prefix}", prefix = self.prefix, partition_id = snapshot.partition_id, ); @@ -294,18 +270,8 @@ impl SnapshotRepository { return Ok(()); } - // Construct the new "latest snapshot" pointer - let latest = LatestSnapshot { - version: snapshot.version, - cluster_name: snapshot.cluster_name.clone(), - node_name: snapshot.node_name.clone(), - partition_id: snapshot.partition_id, - snapshot_id: snapshot.snapshot_id, - created_at: snapshot.created_at.clone(), - min_applied_lsn: snapshot.min_applied_lsn, - path: relative_snapshot_path, - }; - let latest_json = PutPayload::from( + let latest = LatestSnapshot::from_snapshot(snapshot, snapshot_prefix); + let latest = PutPayload::from( serde_json::to_string_pretty(&latest) .map_err(|e| PutSnapshotError::from(e, progress.clone()))?, ); @@ -327,7 +293,7 @@ impl SnapshotRepository { // retrying the entire put_snapshot attempt on object_store::Error::Precondition. let put_result = self .object_store - .put_opts(&latest_path, latest_json, conditions) + .put_opts(&latest_path, latest, conditions) .await .map_err(|e| PutSnapshotError::from(e, progress.clone()))?; @@ -340,6 +306,14 @@ impl SnapshotRepository { Ok(()) } + fn get_snapshot_prefix(snapshot: &PartitionSnapshotMetadata) -> String { + format!( + "lsn_{lsn:020}-{snapshot_id}", + lsn = snapshot.min_applied_lsn, + snapshot_id = snapshot.snapshot_id + ) + } + async fn get_latest_snapshot_metadata_for_update( &self, snapshot: &PartitionSnapshotMetadata, @@ -384,6 +358,46 @@ impl SnapshotRepository { } } +async fn create_object_store_client(destination: Url) -> anyhow::Result> { + // We use the AWS SDK configuration and credentials provider so that the conventional AWS + // environment variables and config files work as expected. The object_store crate has its + // own configuration mechanism which doesn't support many of the AWS conventions. This + // differs quite a lot from the Lambda invoker which uses the AWS SDK, and that would be a + // very surprising inconsistency for customers. This mechanism allows us to infer the region + // and securely obtain session credentials without any hard-coded configuration. + let object_store: Arc = if destination.scheme() == "s3" { + debug!("Using AWS SDK credentials provider"); + let aws_region = aws_config::load_defaults(BehaviorVersion::v2024_03_28()) + .await + .region() + .context("Unable to determine AWS region to use with S3")? + .clone(); + + let store = AmazonS3Builder::new() + .with_url(destination) + .with_region(aws_region.to_string()) + .with_conditional_put(S3ConditionalPut::ETagMatch) + .with_credentials(Arc::new(AwsSdkCredentialsProvider { + credentials_provider: DefaultCredentialsChain::builder().build().await, + })) + .with_retry(object_store::RetryConfig { + max_retries: 8, + retry_timeout: Duration::from_secs(60), + backoff: object_store::BackoffConfig { + init_backoff: Duration::from_millis(100), + max_backoff: Duration::from_secs(5), + base: 2., + }, + }) + .build()?; + + Arc::new(store) + } else { + object_store::parse_url(&destination)?.0.into() + }; + Ok(object_store) +} + #[derive(Clone, Debug)] struct SnapshotUploadProgress { pub full_snapshot_path: String, @@ -516,10 +530,9 @@ impl object_store::CredentialProvider for AwsSdkCredentialsProvider { #[cfg(test)] mod tests { - use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion}; - use restate_types::config::SnapshotsOptions; - use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId}; - use restate_types::logs::{Lsn, SequenceNumber}; + use bytes::Bytes; + use object_store::path::Path; + use object_store::ObjectStore; use std::time::SystemTime; use tempfile::TempDir; use tokio::io::AsyncWriteExt; @@ -529,7 +542,11 @@ mod tests { use tracing_subscriber::{fmt, EnvFilter}; use url::Url; - use super::SnapshotRepository; + use super::{LatestSnapshot, SnapshotRepository}; + use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion}; + use restate_types::config::SnapshotsOptions; + use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId}; + use restate_types::logs::{Lsn, SequenceNumber}; #[tokio::test] async fn test_repository_local() -> anyhow::Result<()> { @@ -655,16 +672,52 @@ mod tests { ); let opts = SnapshotsOptions { - destination: Some(destination.to_string()), + destination: Some(destination.clone()), ..SnapshotsOptions::default() }; + eprintln!("Destination: {}", destination); + + let destination = Url::parse(destination.as_str())?; + let path = destination.path().to_string(); + let object_store = super::create_object_store_client(destination).await?; + let repository = SnapshotRepository::create_if_configured(&opts) .await? .unwrap(); repository.put(&snapshot1, source_dir.clone()).await?; + let snapshot_prefix = SnapshotRepository::get_snapshot_prefix(&snapshot1); + let data = object_store + .get(&Path::from(format!( + "{}/{}/{}/data.sst", + path, snapshot1.partition_id, snapshot_prefix, + ))) + .await?; + assert_eq!(data.bytes().await?, Bytes::from_static(b"snapshot-data")); + + let metadata = object_store + .get(&Path::from(format!( + "{}/{}/{}/metadata.json", + path, snapshot1.partition_id, snapshot_prefix, + ))) + .await?; + let metadata: PartitionSnapshotMetadata = serde_json::from_slice(&metadata.bytes().await?)?; + assert_eq!(snapshot1.snapshot_id, metadata.snapshot_id); + + let latest = object_store + .get(&Path::from(format!( + "{}/{}/latest.json", + path, snapshot1.partition_id, + ))) + .await?; + let latest: LatestSnapshot = serde_json::from_slice(&latest.bytes().await?)?; + assert_eq!( + LatestSnapshot::from_snapshot(&snapshot1, snapshot_prefix), + latest + ); + let snapshot_source = TempDir::new()?; let source_dir = snapshot_source.path().to_path_buf(); @@ -679,6 +732,21 @@ mod tests { repository.put(&snapshot2, source_dir).await?; + let latest = object_store + .get(&Path::from(format!( + "{}/{}/latest.json", + path, snapshot2.partition_id, + ))) + .await?; + let latest: LatestSnapshot = serde_json::from_slice(&latest.bytes().await?)?; + assert_eq!( + LatestSnapshot::from_snapshot( + &snapshot2, + SnapshotRepository::get_snapshot_prefix(&snapshot2) + ), + latest + ); + Ok(()) }