From e541d98588620020dfaac7dff39853b12eec8d73 Mon Sep 17 00:00:00 2001 From: Pavel Tcholakov Date: Sun, 1 Dec 2024 10:42:00 +0100 Subject: [PATCH] Reuse allocated upload buffer across files --- crates/worker/src/partition/snapshots/repository.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index 13e6e9b1b..68798bf18 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -220,6 +220,7 @@ impl SnapshotRepository { ); let mut progress = SnapshotUploadProgress::with_snapshot_path(full_snapshot_path.clone()); + let mut buf = BytesMut::new(); for file in &snapshot.files { let filename = file.name.trim_start_matches("/"); let key = object_store::path::Path::from(format!( @@ -232,6 +233,7 @@ impl SnapshotRepository { local_snapshot_path.join(filename).as_path(), &key, &self.object_store, + &mut buf, ) .await .map_err(|e| PutSnapshotError::from(e, &progress))?; @@ -423,6 +425,7 @@ async fn put_snapshot_object( file_path: &Path, key: &object_store::path::Path, object_store: &Arc, + buf: &mut BytesMut, ) -> anyhow::Result { debug!(path = ?file_path, "Putting snapshot object from local file"); let mut snapshot = tokio::fs::File::open(file_path).await?; @@ -435,7 +438,6 @@ async fn put_snapshot_object( debug!("Performing multipart upload for {key}"); let mut upload = object_store.put_multipart(key).await?; - let mut buf = BytesMut::new(); let result: anyhow::Result<_> = async { loop { let mut len = 0; @@ -443,7 +445,7 @@ async fn put_snapshot_object( // Ensure full buffer unless at EOF while buf.len() < MULTIPART_UPLOAD_CHUNK_SIZE_BYTES { - len = snapshot.read_buf(&mut buf).await?; + len = snapshot.read_buf(buf).await?; if len == 0 { break; }