Skip to content

Commit

Permalink
Reuse allocated upload buffer across files
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 1, 2024
1 parent 7788fb2 commit e541d98
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ impl SnapshotRepository {
);

let mut progress = SnapshotUploadProgress::with_snapshot_path(full_snapshot_path.clone());
let mut buf = BytesMut::new();
for file in &snapshot.files {
let filename = file.name.trim_start_matches("/");
let key = object_store::path::Path::from(format!(
Expand All @@ -232,6 +233,7 @@ impl SnapshotRepository {
local_snapshot_path.join(filename).as_path(),
&key,
&self.object_store,
&mut buf,
)
.await
.map_err(|e| PutSnapshotError::from(e, &progress))?;
Expand Down Expand Up @@ -423,6 +425,7 @@ async fn put_snapshot_object(
file_path: &Path,
key: &object_store::path::Path,
object_store: &Arc<dyn ObjectStore>,
buf: &mut BytesMut,
) -> anyhow::Result<object_store::PutResult> {
debug!(path = ?file_path, "Putting snapshot object from local file");
let mut snapshot = tokio::fs::File::open(file_path).await?;
Expand All @@ -435,15 +438,14 @@ async fn put_snapshot_object(
debug!("Performing multipart upload for {key}");
let mut upload = object_store.put_multipart(key).await?;

let mut buf = BytesMut::new();
let result: anyhow::Result<_> = async {
loop {
let mut len = 0;
buf.reserve(MULTIPART_UPLOAD_CHUNK_SIZE_BYTES);

// Ensure full buffer unless at EOF
while buf.len() < MULTIPART_UPLOAD_CHUNK_SIZE_BYTES {
len = snapshot.read_buf(&mut buf).await?;
len = snapshot.read_buf(buf).await?;
if len == 0 {
break;
}
Expand Down

0 comments on commit e541d98

Please sign in to comment.