Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(writer): configurable buffer size of unsized write #2143

Merged
merged 12 commits into from
May 6, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion core/src/services/gcs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use http::StatusCode;
use log::debug;
use log::{debug, warn};
use reqsign::GoogleCredentialLoader;
use reqsign::GoogleSigner;
use reqsign::GoogleTokenLoad;
Expand Down Expand Up @@ -119,6 +119,9 @@ pub struct GcsBuilder {
customed_token_loader: Option<Box<dyn GoogleTokenLoad>>,
predefined_acl: Option<String>,
default_storage_class: Option<String>,

/// the fixed size writer uses to flush into underlying storage.
write_fixed_size: Option<usize>,
}

impl GcsBuilder {
Expand Down Expand Up @@ -236,6 +239,20 @@ impl GcsBuilder {
};
self
}

/// The buffer size should be a multiple of 256 KiB (256 x 1024 bytes), unless it's the last chunk that completes the upload.
/// Larger chunk sizes typically make uploads faster, but note that there's a tradeoff between speed and memory usage.
/// It's recommended that you use at least 8 MiB for the chunk size.
/// Reference: [Perform resumable uploads](https://cloud.google.com/storage/docs/performing-resumable-uploads)
pub fn write_fixed_size(&mut self, fixed_buffer_size: usize) -> &mut Self {
if fixed_buffer_size.checked_rem_euclid(256 * 1024).is_none() {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
self.write_fixed_size = Some(fixed_buffer_size);
} else {
warn!("The buffer sized does not meet requirements of GCS resumable uploads, use 8 MB as default");
}

self
}
}

impl Debug for GcsBuilder {
Expand Down Expand Up @@ -346,6 +363,7 @@ impl Builder for GcsBuilder {
credential_loader: cred_loader,
predefined_acl: self.predefined_acl.clone(),
default_storage_class: self.default_storage_class.clone(),
writer_fixed_size: self.write_fixed_size,
}),
};

Expand Down
2 changes: 2 additions & 0 deletions core/src/services/gcs/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct GcsCore {

pub predefined_acl: Option<String>,
pub default_storage_class: Option<String>,

pub writer_fixed_size: Option<usize>,
}

impl Debug for GcsCore {
Expand Down
23 changes: 7 additions & 16 deletions core/src/services/gcs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ pub struct GcsWriter {
location: Option<String>,
written: u64,
buffer: oio::VectorCursor,
buffer_size: usize,
writer_fixed_size: usize,
}

impl GcsWriter {
pub fn new(core: Arc<GcsCore>, path: &str, op: OpWrite) -> Self {
let writer_fixed_size = core.writer_fixed_size.unwrap_or(8 * 1024 * 1024);
GcsWriter {
core,
path: path.to_string(),
Expand All @@ -48,17 +49,7 @@ impl GcsWriter {
location: None,
written: 0,
buffer: oio::VectorCursor::new(),
// The chunk size should be a multiple of 256 KiB
// (256 x 1024 bytes), unless it's the last chunk
// that completes the upload.
//
// Larger chunk sizes typically make uploads faster,
// but note that there's a tradeoff between speed and
// memory usage. It's recommended that you use at least
// 8 MiB for the chunk size.
//
// TODO: allow this value to be configured.
buffer_size: 8 * 1024 * 1024,
writer_fixed_size,
}
}

Expand Down Expand Up @@ -152,16 +143,16 @@ impl oio::Write for GcsWriter {

self.buffer.push(bs);
// Return directly if the buffer is not full
if self.buffer.len() <= self.buffer_size {
if self.buffer.len() <= self.writer_fixed_size {
return Ok(());
}

let bs = self.buffer.peak_exact(self.buffer_size);
let bs = self.buffer.peak_exact(self.writer_fixed_size);

match self.write_part(location, bs).await {
Ok(_) => {
self.buffer.take(self.buffer_size);
self.written += self.buffer_size as u64;
self.buffer.take(self.writer_fixed_size);
self.written += self.writer_fixed_size as u64;
Ok(())
}
Err(e) => {
Expand Down
19 changes: 18 additions & 1 deletion core/src/services/oss/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use async_trait::async_trait;
use bytes::Buf;
use http::StatusCode;
use http::Uri;
use log::debug;
use log::{debug, warn};
use reqsign::AliyunConfig;
use reqsign::AliyunLoader;
use reqsign::AliyunOssSigner;
Expand Down Expand Up @@ -123,6 +123,8 @@ pub struct OssBuilder {
access_key_secret: Option<String>,

http_client: Option<HttpClient>,
/// the size of each part, and the range is 100 KB ~ 5 GB.
write_buffer_size: Option<usize>,
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

impl Debug for OssBuilder {
Expand Down Expand Up @@ -292,6 +294,18 @@ impl OssBuilder {
}
self
}

/// set the minimum value of unsized write, valid values: 100 KB to 5 GB
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
/// Reference: [OSS Multipart upload](https://www.alibabacloud.com/help/en/object-storage-service/latest/multipart-upload-6)
pub fn write_buffer_size(&mut self, write_buffer_size: &str) -> &mut Self {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
let write_buffer_size = write_buffer_size.parse::<usize>().unwrap();
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
if write_buffer_size > 100 * 1024 && write_buffer_size < 5 * 1024 * 1024 * 1024 {
self.write_buffer_size = Some(write_buffer_size);
} else {
warn!("The buffer sized does not meet requirements of OSS multipart upload, use 8 MB as default");
}
self
}
}

impl Builder for OssBuilder {
Expand All @@ -313,6 +327,8 @@ impl Builder for OssBuilder {
.map(|v| builder.server_side_encryption(v));
map.get("server_side_encryption_key_id")
.map(|v| builder.server_side_encryption_key_id(v));
map.get("write_buffer_size")
.map(|v| builder.write_buffer_size(v));
builder
}

Expand Down Expand Up @@ -398,6 +414,7 @@ impl Builder for OssBuilder {
client,
server_side_encryption,
server_side_encryption_key_id,
writer_buffer_size: self.write_buffer_size,
}),
})
}
Expand Down
1 change: 1 addition & 0 deletions core/src/services/oss/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct OssCore {
pub client: HttpClient,
pub loader: AliyunLoader,
pub signer: AliyunOssSigner,
pub writer_buffer_size: Option<usize>,
}

impl Debug for OssCore {
Expand Down
9 changes: 2 additions & 7 deletions core/src/services/oss/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct OssWriter {

impl OssWriter {
pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
let buffer_size = core.writer_buffer_size.unwrap_or(8 * 1024 * 1024);
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
OssWriter {
core,
path: path.to_string(),
Expand All @@ -50,13 +51,7 @@ impl OssWriter {
upload_id: None,
parts: vec![],
buffer: oio::VectorCursor::new(),
// The part size must be 5 MiB to 5 GiB. There is no minimum
// size limit on the last part of your multipart upload.
//
// We pick the default value as 8 MiB for better thoughput.
//
// TODO: allow this value to be configured.
buffer_size: 8 * 1024 * 1024,
buffer_size,
}
}

Expand Down
22 changes: 20 additions & 2 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Buf;
use http::StatusCode;
use log::debug;
use log::{debug, warn};
use md5::Digest;
use md5::Md5;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -304,6 +304,10 @@ pub struct S3Builder {

http_client: Option<HttpClient>,
customed_credential_load: Option<Box<dyn AwsCredentialLoad>>,

/// the part size of s3 multipart upload, which shoule be 5 MiB to 5 GiB.
/// There is no minimum size limit on the last part of your multipart upload
write_buffer_size: Option<usize>,
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
}

impl Debug for S3Builder {
Expand Down Expand Up @@ -704,6 +708,19 @@ impl S3Builder {

endpoint
}

/// set the buffer size of unsized write, valid values: 5 MB to 5 GB.
/// Reference: [Amazon S3 multipart upload limits](https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html)
pub fn write_buffer_size(&mut self, write_buffer_size: &str) -> &mut Self {
let write_buffer_size = write_buffer_size.parse::<usize>().unwrap();
if write_buffer_size > 5 * 1024 * 1024 && write_buffer_size < 5 * 1024 * 1024 * 1024 {
self.write_buffer_size = Some(write_buffer_size);
} else {
warn!("The buffer sized does not meet requirements of AWS multipart upload, use 8 MB as default");
}

self
}
}

impl Builder for S3Builder {
Expand Down Expand Up @@ -871,7 +888,7 @@ impl Builder for S3Builder {
}

let signer = AwsV4Signer::new("s3", &region);

let write_buffer_size = self.write_buffer_size;
debug!("backend build finished");
Ok(S3Backend {
core: Arc::new(S3Core {
Expand All @@ -887,6 +904,7 @@ impl Builder for S3Builder {
signer,
loader,
client,
write_buffer_size,
}),
})
}
Expand Down
1 change: 1 addition & 0 deletions core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct S3Core {
pub signer: AwsV4Signer,
pub loader: AwsLoader,
pub client: HttpClient,
pub write_buffer_size: Option<usize>,
}

impl Debug for S3Core {
Expand Down
9 changes: 2 additions & 7 deletions core/src/services/s3/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct S3Writer {

impl S3Writer {
pub fn new(core: Arc<S3Core>, path: &str, op: OpWrite) -> Self {
let buffer_size = core.write_buffer_size.unwrap_or(8 * 1024 * 1024);
S3Writer {
core,
path: path.to_string(),
Expand All @@ -50,13 +51,7 @@ impl S3Writer {
upload_id: None,
parts: vec![],
buffer: oio::VectorCursor::new(),
// The part size must be 5 MiB to 5 GiB. There is no minimum
// size limit on the last part of your multipart upload.
//
// We pick the default value as 8 MiB for better thoughput.
//
// TODO: allow this value to be configured.
buffer_size: 8 * 1024 * 1024,
buffer_size,
}
}

Expand Down