From 7a0242b2c7ab285d7ec2560afca876bbf5da4c95 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Sat, 30 Nov 2024 17:45:02 -0500 Subject: [PATCH] Enable S3 conditional updates with latest object_store --- Cargo.lock | 37 ++- Cargo.toml | 3 +- .../worker_api/partition_processor_manager.rs | 3 + crates/worker/Cargo.toml | 5 +- crates/worker/src/lib.rs | 11 +- .../src/partition/snapshots/repository.rs | 256 +++++++++++++++--- .../worker/src/partition_processor_manager.rs | 34 ++- 7 files changed, 285 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4665d41e0..8ee896c4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2090,7 +2090,7 @@ dependencies = [ "itertools 0.13.0", "log", "num_cpus", - "object_store", + "object_store 0.11.1 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot", "paste", "pin-project-lite", @@ -2134,7 +2134,7 @@ dependencies = [ "instant", "libc", "num_cpus", - "object_store", + "object_store 0.11.1 (registry+https://github.com/rust-lang/crates.io-index)", "paste", "sqlparser", "tokio", @@ -2164,7 +2164,7 @@ dependencies = [ "futures", "hashbrown 0.14.5", "log", - "object_store", + "object_store 0.11.1 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot", "rand", "tempfile", @@ -4635,6 +4635,26 @@ name = "object_store" version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6eb4c22c6154a1e759d7099f9ffad7cc5ef8245f9efbab4a41b92623079c82f3" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "humantime", + "itertools 0.13.0", + "parking_lot", + "percent-encoding", + "snafu 0.8.5", + "tokio", + "tracing", + "url", + "walkdir", +] + +[[package]] +name = "object_store" +version = "0.11.1" +source = "git+https://github.com/apache/arrow-rs?rev=c60ce14bfe144058ce77801ab8f8dec814aa8fe9#c60ce14bfe144058ce77801ab8f8dec814aa8fe9" dependencies = [ "async-trait", "base64 0.22.0", @@ -4647,7 +4667,7 @@ dependencies = [ "md-5", "parking_lot", "percent-encoding", - "quick-xml 0.36.2", + "quick-xml 0.37.1", "rand", "reqwest", "ring", @@ -5517,9 +5537,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.36.2" +version = "0.37.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe" +checksum = "f22f29bdff3987b4d8632ef95fd6424ec7e4e0a57e2f4fc63e489e75357f6a03" dependencies = [ "memchr", "serde", @@ -6146,7 +6166,7 @@ dependencies = [ "hyper 1.4.1", "hyper-util", "metrics", - "object_store", + "object_store 0.11.1 (git+https://github.com/apache/arrow-rs?rev=c60ce14bfe144058ce77801ab8f8dec814aa8fe9)", "once_cell", "opentelemetry", "parking_lot", @@ -7059,7 +7079,7 @@ dependencies = [ "humantime", "itertools 0.13.0", "metrics", - "object_store", + "object_store 0.11.1 (git+https://github.com/apache/arrow-rs?rev=c60ce14bfe144058ce77801ab8f8dec814aa8fe9)", "opentelemetry", "parking_lot", "pin-project", @@ -7087,6 +7107,7 @@ dependencies = [ "restate-types", "restate-wal-protocol", "rstest", + "rust-rocksdb", "schemars", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index ebaee2aa1..630d9d1a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,6 @@ datafusion = { version = "42.0.0", default-features = false, features = [ "regex_expressions", "unicode_expressions", ] } -object_store = { version = "0.11.1"} datafusion-expr = { version = "42.0.0" } derive_builder = "0.20.0" derive_more = { version = "1", features = ["full"] } @@ -141,7 +140,7 @@ metrics-exporter-prometheus = { version = "0.15", default-features = false, feat "async-runtime", ] } moka = "0.12.5" -object_store = { version = "0.11.1", features = ["aws"] } +object_store = { git = "https://github.com/apache/arrow-rs", rev = "c60ce14bfe144058ce77801ab8f8dec814aa8fe9", features = ["aws"] } once_cell = "1.18" opentelemetry = { version = "0.24.0" } opentelemetry-http = { version = "0.13.0" } diff --git a/crates/core/src/worker_api/partition_processor_manager.rs b/crates/core/src/worker_api/partition_processor_manager.rs index 99a229f10..10fec1904 100644 --- a/crates/core/src/worker_api/partition_processor_manager.rs +++ b/crates/core/src/worker_api/partition_processor_manager.rs @@ -80,6 +80,8 @@ pub enum SnapshotError { /// Partition Processor is not fully caught up. #[error("Partition processor state does not permit snapshotting")] InvalidState(PartitionId), + #[error("Snapshot destination is not configured")] + RepositoryNotConfigured(PartitionId), /// Database snapshot export error. #[error("Snapshot export failed: {1}")] SnapshotExport(PartitionId, #[source] anyhow::Error), @@ -97,6 +99,7 @@ impl SnapshotError { SnapshotError::PartitionNotFound(partition_id) => *partition_id, SnapshotError::SnapshotInProgress(partition_id) => *partition_id, SnapshotError::InvalidState(partition_id) => *partition_id, + SnapshotError::RepositoryNotConfigured(partition_id) => *partition_id, SnapshotError::SnapshotExport(partition_id, _) => *partition_id, SnapshotError::SnapshotIo(partition_id, _) => *partition_id, SnapshotError::RepositoryIo(partition_id, _) => *partition_id, diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 93234dcb6..89c85ee4d 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -85,10 +85,11 @@ restate-service-protocol = { workspace = true, features = ["test-util"] } restate-storage-api = { workspace = true, features = ["test-util"] } restate-test-util = { workspace = true, features = ["prost"] } restate-types = { workspace = true, features = ["test-util"] } -prost = { workspace = true } -rstest = { workspace = true } googletest = { workspace = true } +prost = { workspace = true } +rocksdb = { workspace = true } +rstest = { workspace = true } tempfile = { workspace = true } test-log = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index f7730946f..86f60cb12 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -140,6 +140,15 @@ impl Worker { ) .await?; + let snapshots_options = &config.worker.snapshots; + if snapshots_options.snapshot_interval_num_records.is_some() + && snapshots_options.destination.is_none() + { + return Err(BuildError::SnapshotRepository(anyhow::anyhow!( + "Periodic snapshot interval set without a specified snapshot destination" + ))); + } + let partition_processor_manager = PartitionProcessorManager::new( health_status, updateable_config.clone(), @@ -147,7 +156,7 @@ impl Worker { partition_store_manager.clone(), router_builder, bifrost, - SnapshotRepository::create(config.common.base_dir(), &config.worker.snapshots) + SnapshotRepository::create(snapshots_options) .await .map_err(BuildError::SnapshotRepository)?, ); diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index c3243a7bd..6575fd061 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -17,7 +17,7 @@ use aws_config::default_provider::credentials::DefaultCredentialsChain; use aws_config::BehaviorVersion; use aws_credential_types::provider::ProvideCredentials; use bytes::BytesMut; -use object_store::aws::AmazonS3Builder; +use object_store::aws::{AmazonS3Builder, S3ConditionalPut}; use object_store::{MultipartUpload, ObjectStore, PutMode, PutOptions, PutPayload, UpdateVersion}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -33,7 +33,7 @@ use restate_types::logs::Lsn; /// Provides read and write access to the long-term partition snapshot storage destination. /// /// The repository wraps access to an object store "bucket" that contains snapshot metadata and data -/// optimised for efficient retrieval. The bucket layout is split into two top-level prefixes for +/// optimized for efficient retrieval. The bucket layout is split into two top-level prefixes for /// snapshot metadata and data respectively. While full snapshot archives contain all relevant /// metadata, this split layout allows for efficient retrieval of only the metadata upfront. It also /// enables us to evolve the data storage layout independently in the future. @@ -81,29 +81,21 @@ pub struct LatestSnapshot { impl SnapshotRepository { pub async fn create( - base_dir: PathBuf, snapshots_options: &SnapshotsOptions, - ) -> anyhow::Result { + ) -> anyhow::Result> { let destination = if let Some(ref destination) = snapshots_options.destination { - destination.clone() + Url::parse(destination).context("Failed parsing snapshot repository URL")? } else { - base_dir - .join("pp-snapshots") - .into_os_string() - .into_string() - .map(|path| format!("file://{path}")) - .map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))? + return Ok(None); }; - let destination = - Url::parse(&destination).context("Failed parsing snapshot repository URL")?; - - // AWS-specific ergonomics optimization: without explicit configuration, we set up the AWS - // SDK credentials provider so that the conventional environment variables and config - // locations just work. This makes object_store behave similarly to the Lambda invoker. - let object_store: Arc = if destination.scheme() == "s3" - && destination.query().is_none() - && snapshots_options.additional_options.is_empty() - { + + // Ergonomics and security optimization: we use the AWS SDK configuration and credentials + // provider so that the conventional environment variables and config locations just work. + // The object_store crate has its own configuration mechanism which doesn't understand many + // of the AWS conventions and this differs quite a lot from the Lambda integration. This + // mechanism allows us to infer the region and securely obtain session credentials without + // hard-coded configuration. + let object_store: Arc = if destination.scheme() == "s3" { debug!("Using AWS SDK credentials provider"); let aws_region = aws_config::load_defaults(BehaviorVersion::v2024_03_28()) .await @@ -114,6 +106,7 @@ impl SnapshotRepository { let store = AmazonS3Builder::new() .with_url(destination.clone()) .with_region(aws_region.to_string()) + .with_conditional_put(S3ConditionalPut::ETagMatch) .with_credentials(Arc::new(AwsSdkCredentialsProvider { credentials_provider: DefaultCredentialsChain::builder().build().await, })) @@ -121,10 +114,9 @@ impl SnapshotRepository { Arc::new(store) } else { - debug!("Using object_store credentials configuration"); - object_store::parse_url_opts(&destination, &snapshots_options.additional_options)? - .0 - .into() + // This should only be used for file:// destinations at this point. + debug!("Using object_store configuration loading mechanism"); + object_store::parse_url(&destination)?.0.into() }; // prefix must be stripped of any leading slash and, unless zero-length, end in a single "/" character @@ -134,11 +126,11 @@ impl SnapshotRepository { prefix => format!("{}/", prefix.trim_start_matches('/').trim_end_matches('/')), }; - Ok(SnapshotRepository { + Ok(Some(SnapshotRepository { object_store, destination, prefix, - }) + })) } /// Write a partition snapshot to the snapshot repository. @@ -244,10 +236,11 @@ impl SnapshotRepository { debug!( repository_latest_lsn = "unknown", new_snapshot_lsn = ?snapshot.min_applied_lsn, - "Failed to parse stored latest snapshot pointer, refusing to update it: {}", + "Failed to parse stored latest snapshot pointer, refusing to overwrite: {}", e ) - })?; + }) + .map_err(|e| anyhow!("Failed to parse latest snapshot metadata: {}", e))?; Some((latest, version)) } Err(object_store::Error::NotFound { .. }) => { @@ -278,10 +271,15 @@ impl SnapshotRepository { )); } + // The object_store file provider supports create-if-not-exists but not update-version on put + let use_conditional_update = !matches!(self.destination.scheme(), "file"); let latest_json = PutPayload::from(serde_json::to_string_pretty(&latest)?); let conditions = maybe_stored .map(|(_, version)| PutOptions { - mode: PutMode::Update(version), + mode: match use_conditional_update { + true => PutMode::Update(version), + false => PutMode::Overwrite, + }, ..PutOptions::default() }) .unwrap_or_else(|| PutOptions { @@ -313,14 +311,15 @@ impl SnapshotRepository { const MULTIPART_UPLOAD_CHUNK_SIZE_BYTES: usize = 5 * 1024 * 1024; async fn put_snapshot_object( - snapshot_path: &Path, + file_path: &Path, key: &object_store::path::Path, object_store: &Arc, ) -> anyhow::Result { - let mut snapshot = tokio::fs::File::open(snapshot_path).await?; + debug!(path = ?file_path, "Putting snapshot object from local file"); + let mut snapshot = tokio::fs::File::open(file_path).await?; if snapshot.metadata().await?.len() < MULTIPART_UPLOAD_CHUNK_SIZE_BYTES as u64 { - let payload = PutPayload::from(tokio::fs::read(snapshot_path).await?); + let payload = PutPayload::from(tokio::fs::read(file_path).await?); return object_store.put(key, payload).await.map_err(|e| e.into()); } @@ -395,3 +394,194 @@ impl object_store::CredentialProvider for AwsSdkCredentialsProvider { })) } } + +#[cfg(test)] +mod tests { + use std::time::SystemTime; + + use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion}; + use restate_types::config::SnapshotsOptions; + use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId}; + use restate_types::logs::{Lsn, SequenceNumber}; + use tempfile::TempDir; + use tokio::io::AsyncWriteExt; + use tracing::info; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + use tracing_subscriber::{fmt, EnvFilter}; + use url::Url; + + use super::SnapshotRepository; + + #[tokio::test] + async fn test_repository_local() -> anyhow::Result<()> { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let snapshot_source = TempDir::new()?; + let source_dir = snapshot_source.path().to_path_buf(); + + let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?; + data.write_all(b"snapshot-data").await?; + + let snapshot = mock_snapshot_metadata( + "/data.sst".to_owned(), + source_dir.to_string_lossy().to_string(), + ); + + let snapshots_destination = TempDir::new()?; + let opts = SnapshotsOptions { + destination: Some( + Url::from_file_path(snapshots_destination.path()) + .unwrap() + .to_string(), + ), + ..SnapshotsOptions::default() + }; + let repository = SnapshotRepository::create(&opts).await?.unwrap(); + + repository.put(&snapshot, source_dir).await?; + + Ok(()) + } + + #[tokio::test] + async fn test_overwrite_unparsable_latest() -> anyhow::Result<()> { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let snapshot_source = TempDir::new()?; + let source_dir = snapshot_source.path().to_path_buf(); + + let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?; + data.write_all(b"snapshot-data").await?; + + let snapshot = mock_snapshot_metadata( + "/data.sst".to_owned(), + source_dir.to_string_lossy().to_string(), + ); + + let snapshots_destination: TempDir = TempDir::new()?; + let destination_dir = snapshots_destination.path().to_owned(); + let opts = SnapshotsOptions { + destination: Some( + Url::from_file_path(snapshots_destination.path()) + .unwrap() + .to_string(), + ), + ..SnapshotsOptions::default() + }; + let repository = SnapshotRepository::create(&opts).await?.unwrap(); + + // Write invalid JSON to latest.json + let latest_path = destination_dir.join(format!("{}/latest.json", PartitionId::MIN)); + tokio::fs::create_dir_all(latest_path.parent().unwrap()).await?; + info!("Creating file: {:?}", latest_path); + let mut latest = tokio::fs::File::create(&latest_path).await?; + latest.write_all(b"not valid json").await?; + + assert!(repository.put(&snapshot, source_dir).await.is_err()); + + Ok(()) + } + + #[tokio::test] + async fn test_update_existing_snapshot_with_newer() -> anyhow::Result<()> { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let snapshot_source = TempDir::new()?; + let source_dir = snapshot_source.path().to_path_buf(); + + let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?; + data.write_all(b"snapshot-data").await?; + + let mut snapshot1 = mock_snapshot_metadata( + "/data.sst".to_owned(), + source_dir.to_string_lossy().to_string(), + ); + snapshot1.min_applied_lsn = Lsn::new( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_millis() as u64, + ); + + let snapshots_destination = TempDir::new()?; + // #[cfg(not(feature = "s3-integration-test"))] + let opts = SnapshotsOptions { + destination: Some( + Url::from_file_path(snapshots_destination.path()) + .unwrap() + .to_string(), + ), + ..SnapshotsOptions::default() + }; + + // We can't do this due to running tests with --all-features but the following + // code may be used to test conditional updates on S3: + // + // #[cfg(feature = "s3-integration-test")] + // let opts = SnapshotsOptions { + // destination: Some(format!( + // "s3://{}/integration-test", + // std::env::var("RESTATE_S3_INTEGRATION_TEST_BUCKET_NAME") + // .expect("RESTATE_S3_INTEGRATION_TEST_BUCKET_NAME must be set") + // )), + // ..SnapshotsOptions::default() + // }; + + let repository = SnapshotRepository::create(&opts).await?.unwrap(); + + repository.put(&snapshot1, source_dir.clone()).await?; + + let snapshot_source = TempDir::new()?; + let source_dir = snapshot_source.path().to_path_buf(); + + let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?; + data.write_all(b"snapshot-data").await?; + + let mut snapshot2 = mock_snapshot_metadata( + "/data.sst".to_owned(), + source_dir.to_string_lossy().to_string(), + ); + snapshot2.min_applied_lsn = snapshot1.min_applied_lsn.next(); + + repository.put(&snapshot2, source_dir).await?; + + Ok(()) + } + + fn mock_snapshot_metadata(file_name: String, directory: String) -> PartitionSnapshotMetadata { + PartitionSnapshotMetadata { + version: SnapshotFormatVersion::V1, + cluster_name: "test-cluster".to_string(), + node_name: "node".to_string(), + partition_id: PartitionId::MIN, + created_at: humantime::Timestamp::from(SystemTime::now()), + snapshot_id: SnapshotId::new(), + key_range: PartitionKey::MIN..=PartitionKey::MAX, + min_applied_lsn: Lsn::new(1), + db_comparator_name: "leveldb.BytewiseComparator".to_string(), + // this is totally bogus but it doesn't matter since we won't be importing it into RocksDB + files: vec![rocksdb::LiveFile { + column_family_name: "data-0".to_owned(), + name: file_name, + directory, + size: 0, + level: 0, + start_key: Some(vec![0]), + end_key: Some(vec![0xff, 0xff]), + num_entries: 0, + num_deletions: 0, + smallest_seqno: 0, + largest_seqno: 0, + }], + } + } +} diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 91b789266..7027b1f9e 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -103,7 +103,7 @@ pub struct PartitionProcessorManager { pending_snapshots: HashMap, snapshot_export_tasks: FuturesUnordered>, - snapshot_repository: SnapshotRepository, + snapshot_repository: Option, } struct PendingSnapshotTask { @@ -175,7 +175,7 @@ impl PartitionProcessorManager { partition_store_manager: PartitionStoreManager, router_builder: &mut MessageRouterBuilder, bifrost: Bifrost, - snapshot_repository: SnapshotRepository, + snapshot_repository: Option, ) -> Self { let incoming_update_processors = router_builder.subscribe_to_stream(2); let incoming_partition_processor_rpc = router_builder.subscribe_to_stream(128); @@ -702,16 +702,18 @@ impl PartitionProcessorManager { } }; + let snapshot_repository = self.snapshot_repository.clone(); + let Some(snapshot_repository) = snapshot_repository else { + let _ = sender.send(Err(SnapshotError::RepositoryNotConfigured(partition_id))); + return; + }; + if !processor_state.should_publish_snapshots() { let _ = sender.send(Err(SnapshotError::InvalidState(partition_id))); return; } - self.spawn_create_snapshot_task( - partition_id, - self.snapshot_repository.clone(), - Some(sender), - ); + self.spawn_create_snapshot_task(partition_id, snapshot_repository, Some(sender)); } fn on_create_snapshot_task_completed( @@ -783,7 +785,11 @@ impl PartitionProcessorManager { last_applied_lsn = %status.last_applied_log_lsn.unwrap_or(SequenceNumber::INVALID), "Requesting partition snapshot", ); - self.spawn_create_snapshot_task(partition_id, self.snapshot_repository.clone(), None); + self.spawn_create_snapshot_task( + partition_id, + self.snapshot_repository.clone().expect("is some"), // validated on startup + None, + ); } } @@ -924,9 +930,7 @@ enum EventKind { #[cfg(test)] mod tests { - use crate::partition::snapshots::SnapshotRepository; use crate::partition_processor_manager::PartitionProcessorManager; - use crate::BuildError; use googletest::IntoTestResult; use restate_bifrost::providers::memory_loglet; use restate_bifrost::BifrostService; @@ -934,9 +938,7 @@ mod tests { use restate_core::{TaskCenter, TaskKind, TestCoreEnvBuilder}; use restate_partition_store::PartitionStoreManager; use restate_rocksdb::RocksDbManager; - use restate_types::config::{ - CommonOptions, Configuration, RocksDbOptions, SnapshotsOptions, StorageOptions, - }; + use restate_types::config::{CommonOptions, Configuration, RocksDbOptions, StorageOptions}; use restate_types::health::HealthStatus; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::live::{Constant, Live}; @@ -948,7 +950,6 @@ mod tests { use restate_types::protobuf::node::Header; use restate_types::{GenerationalNodeId, Version}; use std::time::Duration; - use tempfile::TempDir; use test_log::test; /// This test ensures that the lifecycle of partition processors is properly managed by the @@ -983,7 +984,6 @@ mod tests { ) .await?; - let snapshots_options = SnapshotsOptions::default(); let partition_processor_manager = PartitionProcessorManager::new( health_status, Live::from_value(Configuration::default()), @@ -991,9 +991,7 @@ mod tests { partition_store_manager, &mut env_builder.router_builder, bifrost, - SnapshotRepository::create(TempDir::new()?.into_path(), &snapshots_options) - .await - .map_err(BuildError::SnapshotRepository)?, + None, ); let env = env_builder.build().await;