Skip to content

Commit

Permalink
Enable S3 conditional updates with latest object_store
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Nov 30, 2024
1 parent bfc4130 commit 7a0242b
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 64 deletions.
37 changes: 29 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ datafusion = { version = "42.0.0", default-features = false, features = [
"regex_expressions",
"unicode_expressions",
] }
object_store = { version = "0.11.1"}
datafusion-expr = { version = "42.0.0" }
derive_builder = "0.20.0"
derive_more = { version = "1", features = ["full"] }
Expand Down Expand Up @@ -141,7 +140,7 @@ metrics-exporter-prometheus = { version = "0.15", default-features = false, feat
"async-runtime",
] }
moka = "0.12.5"
object_store = { version = "0.11.1", features = ["aws"] }
object_store = { git = "https://github.com/apache/arrow-rs", rev = "c60ce14bfe144058ce77801ab8f8dec814aa8fe9", features = ["aws"] }
once_cell = "1.18"
opentelemetry = { version = "0.24.0" }
opentelemetry-http = { version = "0.13.0" }
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/worker_api/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub enum SnapshotError {
/// Partition Processor is not fully caught up.
#[error("Partition processor state does not permit snapshotting")]
InvalidState(PartitionId),
#[error("Snapshot destination is not configured")]
RepositoryNotConfigured(PartitionId),
/// Database snapshot export error.
#[error("Snapshot export failed: {1}")]
SnapshotExport(PartitionId, #[source] anyhow::Error),
Expand All @@ -97,6 +99,7 @@ impl SnapshotError {
SnapshotError::PartitionNotFound(partition_id) => *partition_id,
SnapshotError::SnapshotInProgress(partition_id) => *partition_id,
SnapshotError::InvalidState(partition_id) => *partition_id,
SnapshotError::RepositoryNotConfigured(partition_id) => *partition_id,
SnapshotError::SnapshotExport(partition_id, _) => *partition_id,
SnapshotError::SnapshotIo(partition_id, _) => *partition_id,
SnapshotError::RepositoryIo(partition_id, _) => *partition_id,
Expand Down
5 changes: 3 additions & 2 deletions crates/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ restate-service-protocol = { workspace = true, features = ["test-util"] }
restate-storage-api = { workspace = true, features = ["test-util"] }
restate-test-util = { workspace = true, features = ["prost"] }
restate-types = { workspace = true, features = ["test-util"] }
prost = { workspace = true }

rstest = { workspace = true }
googletest = { workspace = true }
prost = { workspace = true }
rocksdb = { workspace = true }
rstest = { workspace = true }
tempfile = { workspace = true }
test-log = { workspace = true }
tracing-subscriber = { workspace = true }
11 changes: 10 additions & 1 deletion crates/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,23 @@ impl Worker {
)
.await?;

let snapshots_options = &config.worker.snapshots;
if snapshots_options.snapshot_interval_num_records.is_some()
&& snapshots_options.destination.is_none()
{
return Err(BuildError::SnapshotRepository(anyhow::anyhow!(
"Periodic snapshot interval set without a specified snapshot destination"
)));
}

let partition_processor_manager = PartitionProcessorManager::new(
health_status,
updateable_config.clone(),
metadata_store_client,
partition_store_manager.clone(),
router_builder,
bifrost,
SnapshotRepository::create(config.common.base_dir(), &config.worker.snapshots)
SnapshotRepository::create(snapshots_options)
.await
.map_err(BuildError::SnapshotRepository)?,
);
Expand Down
Loading

0 comments on commit 7a0242b

Please sign in to comment.